diff --git a/.eslintrc.json b/.eslintrc.json index 3271d4e..14aaeb8 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -24,6 +24,7 @@ "semi": "off", "@typescript-eslint/semi": "off", "@typescript-eslint/no-unsafe-assignment": "off", + "@typescript-eslint/no-non-null-assertion": "off", "simple-import-sort/imports": "error", "simple-import-sort/exports": "error", "sort-imports": "off", @@ -31,6 +32,7 @@ "import/no-cycle": "error", "import/newline-after-import": "error", "import/no-duplicates": "error", + "@typescript-eslint/no-misused-promises": "off", // Does not correctly resolve @types when trying to import // type definitions only. TypeScript already performs this check. "import/no-unresolved": "off", diff --git a/.gitignore b/.gitignore index 28dd1e5..fd352ae 100644 --- a/.gitignore +++ b/.gitignore @@ -42,7 +42,6 @@ htmlcov/ .nox/ .coverage .coverage.* -coverage/ .cache nosetests.xml coverage.xml @@ -141,6 +140,9 @@ cython_debug/ # ts-server related node_modules/ dist/ -.DS_Store/ -.eslintcache/ -uploads/ \ No newline at end of file +.DS_Store +.eslintcache +uploads/ +.env +dump.rdb +coverage/ \ No newline at end of file diff --git a/package.json b/package.json index f0e1c86..7bbfabb 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "eslint-plugin-import": "2.22.1", "eslint-plugin-security": "1.4.0", "eslint-plugin-simple-import-sort": "7.0.0", + "ioredis": "4.26.0", "jest": "26.6.3", "lint-staged": "10.2.2", "npm-run-all": "4.1.5", diff --git a/server/config/index.ts b/server/config/index.ts index a09b805..062900b 100644 --- a/server/config/index.ts +++ b/server/config/index.ts @@ -16,6 +16,18 @@ const redisConfig = { namespace: env .get('REDIS_KEY_PREFIX') .default('ax') + .asString(), + streamName: env + .get('REDIS_STREAM_NAME') + .required() + .asString(), + consumerGroupName: env + .get('REDIS_CONSUMER_GROUP_NAME') + .required() + .asString(), + consumerName: env + .get('REDIS_CONSUMER_NAME') + .required() .asString() } diff --git a/server/consumer.test.ts b/server/consumer.test.ts new file mode 100644 index 0000000..68baea7 --- /dev/null +++ b/server/consumer.test.ts @@ -0,0 +1,99 @@ +import Redis from 'ioredis' + +import { redisConfig } from './config' +import { Consumer } from './consumer' +import { runSafely } from './runSafely' + +describe('consumer', () => { + const redis = new Redis() + + beforeAll(async () => { + await redis.flushall() + }) + + afterAll(async () => { + await redis.flushall() + }) + + // describe('faulty behaviour', () => { + // const consumer = new Consumer() + + // it('stopConsumer should not throw even if there is no stream and no group', async () => { + // const consumer = new Consumer() + + // await runSafely(async () => { + // await consumer.stopConsumer() + // }) + // }) + + // it('destry should not throw even if there is no stream and no group', async () => { + // await runSafely(async () => { + // await expect(consumer.destroyConsumerGroup()).resolves.toEqual(0) + // }) + // }) + + // it('should destry group while gruop being created', async () => { + // await runSafely(async () => { + // await consumer.startConsumer() + // await expect(consumer.destroyConsumerGroup()).resolves.toEqual(0) + // }) + // }) + // }) + + // describe('destroyConsumerGroup', () => { + // beforeAll(async() => { + // await redis.flushall() + // await redis.xadd(redisConfig.streamName, '*', 'pageNumber', '0', 'content', '') + // }) + + // it('should not throw when there is no group', async () => { + // await runSafely(async () => { + // const consumer = new Consumer() + + // await expect(consumer.destroyConsumerGroup()).resolves.toEqual(0) + // }) + // }) + + // it('should destroy group', async () => { + // await runSafely(async () => { + // const consumer = new Consumer() + + // await consumer.startConsumer() + // await expect(consumer.destroyConsumerGroup()).resolves.toEqual(1) + // }) + // }) + // }) + + describe('startConsumer', () => { + beforeAll(async() => { + await redis.flushall() + await redis.xadd(redisConfig.streamName, '*', 'pageNumber', '0', 'content', '') + }) + + it('should create cnosumer', async () => { + await runSafely(async () => { + const consumer = new Consumer() + + await consumer.startConsumer() + const info = await redis.xinfo('CONSUMERS', redisConfig.streamName, redisConfig.consumerGroupName) + + expect(info[0]![1]).toEqual('alexisConsumer') + await expect(consumer.destroyConsumerGroup()).resolves.toEqual(1) + }) + }) + + it('should not throw when cnosumer was already created', async () => { + await runSafely(async () => { + const consumer = new Consumer() + + await consumer.startConsumer() + await consumer.startConsumer() + await consumer.startConsumer() + const info = await redis.xinfo('CONSUMERS', redisConfig.streamName, redisConfig.consumerGroupName) + + expect(info[0]![1]).toEqual('alexisConsumer') + await expect(consumer.destroyConsumerGroup()).resolves.toEqual(1) + }) + }) + }) +}) diff --git a/server/consumer.ts b/server/consumer.ts new file mode 100644 index 0000000..e198dc6 --- /dev/null +++ b/server/consumer.ts @@ -0,0 +1,140 @@ +import Redis from 'ioredis' + +import { redisConfig } from './config' +import { runSafely } from './runSafely' + +export class Consumer { + private redis = new Redis() + private destroyResolver: ((value?: unknown) => void) | null + private consumerStoppedPromiseResolver: ((value?: unknown) => void) | null + private stopPromise: Promise | null + private consumingStarted = false + private beingCreated = false + private counter = 0 + + private manageTimeouts = ( + callBack: () => Promise, + timeOutDuration: number, + resolver: ((value?: unknown) => void) | null + ) => { + if (resolver) { + resolver() + } else { + setTimeout(callBack, timeOutDuration) + } + + return Promise.resolve() + } + + private streamExists = async () => { + return (await this.redis.xlen(redisConfig.streamName)) !== 0 + } + + private start = async () => { + await runSafely(async () => { + const result = await this.redis.xreadgroup( + 'GROUP', + redisConfig.consumerGroupName, + redisConfig.consumerName, + 'BLOCK', + '1000', + 'COUNT', + '1', + 'STREAMS', + redisConfig.streamName, + '>' + ) + + if (!result) { + return this.manageTimeouts(this.start, 10, this.consumerStoppedPromiseResolver) + } + + if (result[0]![1].length === 0) { + return this.manageTimeouts(this.start, 10, this.consumerStoppedPromiseResolver) + } + + const lastID = result[0]![1]![0]![0] + + const content = result[0]![1]![0]![1]![3]! + + console.log(lastID) + console.log(content) + + if (this.counter == 5) { + this.counter = 0 + await this.stopConsumer() + } + + console.log(this.counter++) + + await this.redis.xack(redisConfig.streamName, redisConfig.consumerGroupName, lastID as string) + + return this.manageTimeouts(this.start, 0, this.consumerStoppedPromiseResolver) + }) + } + + constructor() { + this.consumerStoppedPromiseResolver = null + this.destroyResolver = null + this.stopPromise = null + } + + destroyConsumerGroup = async () => { + await this.stopConsumer() + + if (this.beingCreated) { + await new Promise((resolve) => { + (this.destroyResolver as (value?: unknown) => void) = resolve + }) + + this.destroyResolver = null + } + + if (await this.streamExists()) { + return await this.redis.xgroup('DESTROY', redisConfig.streamName, redisConfig.consumerGroupName) + } + + return 0 + } + + startConsumer = async () => { + if (this.consumingStarted) { + return + } + + if (await this.streamExists()) { + await this.redis.xgroup('DESTROY', redisConfig.streamName, redisConfig.consumerGroupName) + await this.redis.xgroup('CREATE', redisConfig.streamName, redisConfig.consumerGroupName, '0-0') + // makes sure group was not destroyed meanwhile being created + if (this.destroyResolver) { + return this.manageTimeouts(this.startConsumer, 10, this.destroyResolver) + } + } else { + return this.manageTimeouts(this.startConsumer, 10, this.destroyResolver) + } + + this.counter = 0 + this.consumingStarted = true + + return this.start() + } + + stopConsumer = () => { + if (!this.consumingStarted) { + return + } + + const stopConsumerInternal = async () => { + await new Promise((resolve) => { + this.consumerStoppedPromiseResolver = resolve + }) + + this.consumingStarted = false + this.consumerStoppedPromiseResolver = null + } + + this.stopPromise = this.stopPromise || stopConsumerInternal() + + return this.stopPromise + } +} diff --git a/server/dev.env b/server/dev.env index 64812cc..2eb48c8 100644 --- a/server/dev.env +++ b/server/dev.env @@ -2,5 +2,8 @@ REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD=SystemAdminPassword12345 SERVER_LOG_LEVEL=DEBUG +SERVER_SESSION_SECRET=g03pQP0F8q3KMKh1VPpWMIhUJ9k1xBN2 NODE_ENV=development -SERVER_SESSION_SECRET=g03pQP0F8q3KMKh1VPpWMIhUJ9k1xBN2 \ No newline at end of file +REDIS_STREAM_NAME=alexisStream +REDIS_CONSUMER_GROUP_NAME=alexisGroup +REDIS_CONSUMER_NAME=alexisConsumer diff --git a/server/index.ts b/server/index.ts index 9d5988e..b738af3 100644 --- a/server/index.ts +++ b/server/index.ts @@ -1,7 +1,12 @@ import { loadModel } from './answering/inference' import { app } from './app' +import { Consumer } from './consumer' import logger from './lib/log' +const consumer = new Consumer() + +void consumer.startConsumer() + void loadModel().then(() => app.listen(app.get('port'), () => { logger.info('App is running on http://localhost:%d in %s mode', app.get('port'), app.get('env')) diff --git a/server/keyFinder.ts b/server/keyFinder.ts new file mode 100644 index 0000000..2807d74 --- /dev/null +++ b/server/keyFinder.ts @@ -0,0 +1 @@ +export const keyFinder: Map = new Map() \ No newline at end of file diff --git a/server/lib/pdf.test.ts b/server/lib/pdf.test.ts index 846fba0..094026e 100644 --- a/server/lib/pdf.test.ts +++ b/server/lib/pdf.test.ts @@ -4,14 +4,16 @@ import { getText } from './pdf' describe('pdf', () => { describe('getText', () => { - it('should extract text from PDF', async () => { - const pdfContent = fs.readFileSync('./test-files/EffectiveAggregateDesign.pdf') - - for await (const obj of getText(pdfContent)) { - expect(obj).toHaveProperty('page') - expect(obj).toHaveProperty('content') - } + it('should extract text from PDF', () => { + fs.readFile(`./test-files/EffectiveAggregateDesign.pdf`, async (err, data) => { + expect(err).toBeNull() + for await (const obj of getText(data)) { + expect(obj).toHaveProperty('page') + expect(obj).toHaveProperty('content') + } + }) }) + it('should not extract from invalid PDF', () => { const pdfContent = Buffer.from('hello world') diff --git a/server/load.ts b/server/load.ts new file mode 100644 index 0000000..91f5092 --- /dev/null +++ b/server/load.ts @@ -0,0 +1,28 @@ +import fs from 'fs' +import Redis from 'ioredis' + +import { redisConfig } from './config' +import { keyFinder } from './keyFinder' +import logger from './lib/log' +import { getText } from './lib/pdf' + +const redis = new Redis() + +export const loadPdf = async (data: Buffer) => { + await redis.flushall() + for await (const page of getText(data)) { + const pageNumber = page.page.toString() + const content = page.content + + if (keyFinder.get(pageNumber)) { + logger.error(`key ${pageNumber} already exists. Duplicate keys are not allowed`) + } + keyFinder.set(pageNumber, await redis.xadd(redisConfig.streamName, '*', 'pageNumber', pageNumber, 'content', content)) + } +} + +export const loadPdfFromUrl = (url: string) => { + fs.readFile(url, (_err, data) => { + void loadPdf(data) + }) +} diff --git a/server/runSafely.ts b/server/runSafely.ts new file mode 100644 index 0000000..f1348a6 --- /dev/null +++ b/server/runSafely.ts @@ -0,0 +1,7 @@ +export const runSafely = async (callback: () => Promise) => { + try { + return await callback(); + } catch (err) { + throw new Error(err); + } +} \ No newline at end of file diff --git a/server/startRedis.sh b/server/startRedis.sh new file mode 100644 index 0000000..189cd09 --- /dev/null +++ b/server/startRedis.sh @@ -0,0 +1,8 @@ +docker image pull redislabs/redismod + +docker run \ + -p 6379:6379 \ + -v `pwd`/redis:/data \ + redislabs/redismod \ + --loadmodule /usr/lib/redis/modules/redisearch.so \ + --dir /data \ No newline at end of file