Skip to content

Redis #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
"semi": "off",
"@typescript-eslint/semi": "off",
"@typescript-eslint/no-unsafe-assignment": "off",
"@typescript-eslint/no-non-null-assertion": "off",
"simple-import-sort/imports": "error",
"simple-import-sort/exports": "error",
"sort-imports": "off",
"import/first": "error",
"import/no-cycle": "error",
"import/newline-after-import": "error",
"import/no-duplicates": "error",
"@typescript-eslint/no-misused-promises": "off",
// Does not correctly resolve @types when trying to import
// type definitions only. TypeScript already performs this check.
"import/no-unresolved": "off",
Expand Down
10 changes: 6 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ htmlcov/
.nox/
.coverage
.coverage.*
coverage/
.cache
nosetests.xml
coverage.xml
Expand Down Expand Up @@ -141,6 +140,9 @@ cython_debug/
# ts-server related
node_modules/
dist/
.DS_Store/
.eslintcache/
uploads/
.DS_Store
.eslintcache
uploads/
.env
dump.rdb
coverage/
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"eslint-plugin-import": "2.22.1",
"eslint-plugin-security": "1.4.0",
"eslint-plugin-simple-import-sort": "7.0.0",
"ioredis": "4.26.0",
"jest": "26.6.3",
"lint-staged": "10.2.2",
"npm-run-all": "4.1.5",
Expand Down
12 changes: 12 additions & 0 deletions server/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ const redisConfig = {
namespace: env
.get('REDIS_KEY_PREFIX')
.default('ax')
.asString(),
streamName: env
.get('REDIS_STREAM_NAME')
.required()
.asString(),
consumerGroupName: env
.get('REDIS_CONSUMER_GROUP_NAME')
.required()
.asString(),
consumerName: env
.get('REDIS_CONSUMER_NAME')
.required()
.asString()
}

Expand Down
99 changes: 99 additions & 0 deletions server/consumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import Redis from 'ioredis'

import { redisConfig } from './config'
import { Consumer } from './consumer'
import { runSafely } from './runSafely'

describe('consumer', () => {
const redis = new Redis()

beforeAll(async () => {
await redis.flushall()
})

afterAll(async () => {
await redis.flushall()
})

// describe('faulty behaviour', () => {
// const consumer = new Consumer()

// it('stopConsumer should not throw even if there is no stream and no group', async () => {
// const consumer = new Consumer()

// await runSafely(async () => {
// await consumer.stopConsumer()
// })
// })

// it('destry should not throw even if there is no stream and no group', async () => {
// await runSafely(async () => {
// await expect(consumer.destroyConsumerGroup()).resolves.toEqual(0)
// })
// })

// it('should destry group while gruop being created', async () => {
// await runSafely(async () => {
// await consumer.startConsumer()
// await expect(consumer.destroyConsumerGroup()).resolves.toEqual(0)
// })
// })
// })

// describe('destroyConsumerGroup', () => {
// beforeAll(async() => {
// await redis.flushall()
// await redis.xadd(redisConfig.streamName, '*', 'pageNumber', '0', 'content', '')
// })

// it('should not throw when there is no group', async () => {
// await runSafely(async () => {
// const consumer = new Consumer()

// await expect(consumer.destroyConsumerGroup()).resolves.toEqual(0)
// })
// })

// it('should destroy group', async () => {
// await runSafely(async () => {
// const consumer = new Consumer()

// await consumer.startConsumer()
// await expect(consumer.destroyConsumerGroup()).resolves.toEqual(1)
// })
// })
// })

describe('startConsumer', () => {
beforeAll(async() => {
await redis.flushall()
await redis.xadd(redisConfig.streamName, '*', 'pageNumber', '0', 'content', '')
})

it('should create cnosumer', async () => {
await runSafely(async () => {
const consumer = new Consumer()

await consumer.startConsumer()
const info = await redis.xinfo('CONSUMERS', redisConfig.streamName, redisConfig.consumerGroupName)

expect(info[0]![1]).toEqual('alexisConsumer')
await expect(consumer.destroyConsumerGroup()).resolves.toEqual(1)
})
})

it('should not throw when cnosumer was already created', async () => {
await runSafely(async () => {
const consumer = new Consumer()

await consumer.startConsumer()
await consumer.startConsumer()
await consumer.startConsumer()
const info = await redis.xinfo('CONSUMERS', redisConfig.streamName, redisConfig.consumerGroupName)

expect(info[0]![1]).toEqual('alexisConsumer')
await expect(consumer.destroyConsumerGroup()).resolves.toEqual(1)
})
})
})
})
140 changes: 140 additions & 0 deletions server/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import Redis from 'ioredis'

import { redisConfig } from './config'
import { runSafely } from './runSafely'

export class Consumer {
private redis = new Redis()
private destroyResolver: ((value?: unknown) => void) | null
private consumerStoppedPromiseResolver: ((value?: unknown) => void) | null
private stopPromise: Promise<unknown> | null
private consumingStarted = false
private beingCreated = false
private counter = 0

private manageTimeouts = (
callBack: () => Promise<unknown>,
timeOutDuration: number,
resolver: ((value?: unknown) => void) | null
) => {
if (resolver) {
resolver()
} else {
setTimeout(callBack, timeOutDuration)
}

return Promise.resolve()
}

private streamExists = async () => {
return (await this.redis.xlen(redisConfig.streamName)) !== 0
}

private start = async () => {
await runSafely(async () => {
const result = await this.redis.xreadgroup(
'GROUP',
redisConfig.consumerGroupName,
redisConfig.consumerName,
'BLOCK',
'1000',
'COUNT',
'1',
'STREAMS',
redisConfig.streamName,
'>'
)

if (!result) {
return this.manageTimeouts(this.start, 10, this.consumerStoppedPromiseResolver)
}

if (result[0]![1].length === 0) {
return this.manageTimeouts(this.start, 10, this.consumerStoppedPromiseResolver)
}

const lastID = result[0]![1]![0]![0]

const content = result[0]![1]![0]![1]![3]!

console.log(lastID)
console.log(content)

if (this.counter == 5) {
this.counter = 0
await this.stopConsumer()
}

console.log(this.counter++)

await this.redis.xack(redisConfig.streamName, redisConfig.consumerGroupName, lastID as string)

return this.manageTimeouts(this.start, 0, this.consumerStoppedPromiseResolver)
})
}

constructor() {
this.consumerStoppedPromiseResolver = null
this.destroyResolver = null
this.stopPromise = null
}

destroyConsumerGroup = async () => {
await this.stopConsumer()

if (this.beingCreated) {
await new Promise((resolve) => {
(this.destroyResolver as (value?: unknown) => void) = resolve
})

this.destroyResolver = null
}

if (await this.streamExists()) {
return await this.redis.xgroup('DESTROY', redisConfig.streamName, redisConfig.consumerGroupName)
}

return 0
}

startConsumer = async () => {
if (this.consumingStarted) {
return
}

if (await this.streamExists()) {
await this.redis.xgroup('DESTROY', redisConfig.streamName, redisConfig.consumerGroupName)
await this.redis.xgroup('CREATE', redisConfig.streamName, redisConfig.consumerGroupName, '0-0')
// makes sure group was not destroyed meanwhile being created
if (this.destroyResolver) {
return this.manageTimeouts(this.startConsumer, 10, this.destroyResolver)
}
} else {
return this.manageTimeouts(this.startConsumer, 10, this.destroyResolver)
}

this.counter = 0
this.consumingStarted = true

return this.start()
}

stopConsumer = () => {
if (!this.consumingStarted) {
return
}

const stopConsumerInternal = async () => {
await new Promise((resolve) => {
this.consumerStoppedPromiseResolver = resolve
})

this.consumingStarted = false
this.consumerStoppedPromiseResolver = null
}

this.stopPromise = this.stopPromise || stopConsumerInternal()

return this.stopPromise
}
}
5 changes: 4 additions & 1 deletion server/dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=SystemAdminPassword12345
SERVER_LOG_LEVEL=DEBUG
SERVER_SESSION_SECRET=g03pQP0F8q3KMKh1VPpWMIhUJ9k1xBN2
NODE_ENV=development
SERVER_SESSION_SECRET=g03pQP0F8q3KMKh1VPpWMIhUJ9k1xBN2
REDIS_STREAM_NAME=alexisStream
REDIS_CONSUMER_GROUP_NAME=alexisGroup
REDIS_CONSUMER_NAME=alexisConsumer
5 changes: 5 additions & 0 deletions server/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { loadModel } from './answering/inference'
import { app } from './app'
import { Consumer } from './consumer'
import logger from './lib/log'

const consumer = new Consumer()

void consumer.startConsumer()

void loadModel().then(() =>
app.listen(app.get('port'), () => {
logger.info('App is running on http://localhost:%d in %s mode', app.get('port'), app.get('env'))
Expand Down
1 change: 1 addition & 0 deletions server/keyFinder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const keyFinder: Map<string, string> = new Map<string, string>()
16 changes: 9 additions & 7 deletions server/lib/pdf.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import { getText } from './pdf'

describe('pdf', () => {
describe('getText', () => {
it('should extract text from PDF', async () => {
const pdfContent = fs.readFileSync('./test-files/EffectiveAggregateDesign.pdf')

for await (const obj of getText(pdfContent)) {
expect(obj).toHaveProperty('page')
expect(obj).toHaveProperty('content')
}
it('should extract text from PDF', () => {
fs.readFile(`./test-files/EffectiveAggregateDesign.pdf`, async (err, data) => {
expect(err).toBeNull()
for await (const obj of getText(data)) {
expect(obj).toHaveProperty('page')
expect(obj).toHaveProperty('content')
}
})
})

it('should not extract from invalid PDF', () => {
const pdfContent = Buffer.from('hello world')

Expand Down
28 changes: 28 additions & 0 deletions server/load.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import fs from 'fs'
import Redis from 'ioredis'

import { redisConfig } from './config'
import { keyFinder } from './keyFinder'
import logger from './lib/log'
import { getText } from './lib/pdf'

const redis = new Redis()

export const loadPdf = async (data: Buffer) => {
await redis.flushall()
for await (const page of getText(data)) {
const pageNumber = page.page.toString()
const content = page.content

if (keyFinder.get(pageNumber)) {
logger.error(`key ${pageNumber} already exists. Duplicate keys are not allowed`)
}
keyFinder.set(pageNumber, await redis.xadd(redisConfig.streamName, '*', 'pageNumber', pageNumber, 'content', content))
}
}

export const loadPdfFromUrl = (url: string) => {
fs.readFile(url, (_err, data) => {
void loadPdf(data)
})
}
7 changes: 7 additions & 0 deletions server/runSafely.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const runSafely = async (callback: () => Promise<unknown>) => {
try {
return await callback();
} catch (err) {
throw new Error(err);
}
}
Loading