Skip to content

Commit 0a68a59

Browse files
authored
feat: Redoer middleware (#25)
1 parent e89d030 commit 0a68a59

File tree

6 files changed

+182
-2
lines changed

6 files changed

+182
-2
lines changed

.trivyignore.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ misconfigurations:
33
- id: AVD-DS-0026
44
paths:
55
- Dockerfile.consumer # Jira -67
6+
- Dockerfile.redoer # Jira -67
67
- Dockerfile.exporter # ephemeral container, healthcheck not necessary
78
- Dockerfile.tools

Dockerfile.redoer

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
FROM senzing/senzingsdk-runtime:4.0.0
2+
3+
USER root
4+
5+
RUN apt-get update \
6+
&& apt-get -y install --no-install-recommends python3 python3-pip python3-boto3 \
7+
&& apt-get -y autoremove \
8+
&& apt-get -y clean
9+
10+
WORKDIR /app
11+
COPY middleware/* .
12+
13+
# Add a new user and switch to it.
14+
RUN useradd -m -u 1001 senzing
15+
USER senzing
16+
17+
ENV PYTHONPATH=/opt/senzing/er/sdk/python:/app
18+
19+
# Flush buffer - helps with print statements.
20+
ENV PYTHONUNBUFFERED=1
21+
22+
CMD ["python3", "redoer.py"]

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,16 @@ simultaneously as needed):
186186

187187
`LOG_LEVEL` is optional; defaults to `INFO`.
188188

189+
### Redoer
190+
191+
Similar to the consumer, the redoer is also a continually-running process.
192+
193+
```bash
194+
docker compose run --env AWS_PROFILE=localstack --env LOG_LEVEL=debug redoer
195+
```
196+
197+
`LOG_LEVEL` is optional; defaults to `INFO`.
198+
189199
### Exporter
190200

191201
Spinning up the exporter middleware (this is intended to be an ephemeral

docker-compose.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@ services:
108108
# Note: `.aws` mount might not be needed later.
109109
- ~/.aws:/home/senzing/.aws
110110

111+
redoer:
112+
build:
113+
context: .
114+
dockerfile: Dockerfile.redoer
115+
depends_on:
116+
- db
117+
environment:
118+
AWS_ENDPOINT_URL: http://localstack:4566
119+
SENZING_ENGINE_CONFIGURATION_JSON: >-
120+
{
121+
"PIPELINE": {
122+
"CONFIGPATH": "/etc/opt/senzing",
123+
"LICENSESTRINGBASE64": "${SENZING_LICENSE_BASE64_ENCODED}",
124+
"RESOURCEPATH": "/opt/senzing/er/resources",
125+
"SUPPORTPATH": "/opt/senzing/data"
126+
},
127+
"SQL": {
128+
"BACKEND": "SQL",
129+
"CONNECTION": "postgresql://${POSTGRES_USERNAME:-senzing}:${POSTGRES_PASSWORD:-senzing}@db:5432:${POSTGRES_DB:-G2}/?sslmode=disable"
130+
}
131+
}
132+
volumes:
133+
- ~/.aws:/home/senzing/.aws
134+
111135
exporter:
112136
build:
113137
context: .

middleware/consumer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
log.error(e)
1818
sys.exit(1)
1919

20-
# TODO add DLQ logic (see DLG_TAG logging)
21-
2220
Q_URL = os.environ['Q_URL']
2321
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
2422

middleware/redoer.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import json
2+
import os
3+
import time
4+
import sys
5+
import boto3
6+
import senzing as sz
7+
8+
from loglib import *
9+
log = retrieve_logger()
10+
11+
try:
12+
log.info('Importing senzing_core library . . .')
13+
import senzing_core as sz_core
14+
log.info('Imported senzing_core successfully.')
15+
except Exception as e:
16+
log.error('Importing senzing_core library failed.')
17+
log.error(e)
18+
sys.exit(1)
19+
20+
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
21+
22+
# How long to wait before attempting next Senzing op.
23+
WAIT_SECONDS = int(os.environ.get('WAIT_SECONDS', 10))
24+
25+
#-------------------------------------------------------------------------------
26+
27+
def go():
28+
'''Starts the Redoer process; runs indefinitely.'''
29+
30+
sz_eng = None
31+
try:
32+
sz_factory = sz_core.SzAbstractFactoryCore("ERS", SZ_CONFIG)
33+
34+
# Init senzing engine object.
35+
# Senzing engine object cannot be passed around between functions,
36+
# else it will be eagerly cleaned up / destroyed and no longer usable.
37+
sz_eng = sz_factory.create_engine()
38+
log.info(SZ_TAG + 'Senzing engine object instantiated.')
39+
except sz.SzError as sz_err:
40+
log.error(SZ_TAG + str(sz_err))
41+
sys.exit(1)
42+
except Exception as e:
43+
log.error(str(e))
44+
sys.exit(1)
45+
46+
log.info('Starting primary loop.')
47+
48+
# Approach:
49+
# - We don't try to both 'get' and 'process' in a single loop; instead we
50+
# 'get', then 'continue' to the next loop; the have_rcd flag is used to
51+
# facilitate this.
52+
# - Each Senzing call (3 distinct calls) is couched in its own try-except block for
53+
# robustness.
54+
tally = None
55+
have_rcd = 0
56+
rcd = None
57+
while 1:
58+
try:
59+
60+
if have_rcd:
61+
try:
62+
sz_eng.process_redo_record(rcd)
63+
rcd = None # <-- TODO this op might not be necessary
64+
have_rcd = 0
65+
log.debug(SZ_TAG + 'Successfully redid one record.')
66+
continue
67+
except sz.SzRetryableError as sz_ret_err:
68+
# We'll try to process this record again.
69+
log.error(SZ_TAG + str(sz_ret_err))
70+
time.sleep(WAIT_SECONDS)
71+
continue
72+
except sz.SzError as sz_err:
73+
log.error(SZ_TAG + str(sz_err))
74+
sys.exit(1)
75+
76+
else:
77+
try:
78+
tally = sz_eng.count_redo_records()
79+
log.debug(SZ_TAG + 'Current redo count: ' + str(tally))
80+
except sz.SzRetryableError as sz_ret_err:
81+
log.error(SZ_TAG + str(sz_ret_err))
82+
time.sleep(WAIT_SECONDS)
83+
continue
84+
except sz.SzError as sz_err:
85+
log.error(SZ_TAG + str(sz_err))
86+
sys.exit(1)
87+
88+
if tally:
89+
90+
try:
91+
rcd = sz_eng.get_redo_record()
92+
if rcd:
93+
have_rcd = 1
94+
# At this point, rcd var holds a record, and have_rcd flag
95+
# raised. Will process in the next loop.
96+
log.debug(SZ_TAG + 'Retrieved 1 record via get_redo_record()')
97+
else:
98+
log.debug(SZ_TAG + 'Redo count was greater than 0, but got '
99+
+ 'nothing from get_redo_record')
100+
except sz.SzRetryableError as sz_ret_err:
101+
log.error(SZ_TAG + str(sz_ret_err))
102+
except sz.SzError as sz_err:
103+
log.error(SZ_TAG + str(sz_err))
104+
sys.exit(1)
105+
106+
else:
107+
log.debug('No redo records. Will wait ' + str(WAIT_SECONDS) + ' seconds.')
108+
time.sleep(WAIT_SECONDS)
109+
110+
except Exception as e:
111+
log.error(str(e))
112+
sys.exit(1)
113+
114+
#-------------------------------------------------------------------------------
115+
116+
def main():
117+
log.info('====================')
118+
log.info(' ------->')
119+
log.info(' REDOER ')
120+
log.info(' STARTED')
121+
log.info(' ------->')
122+
log.info('====================')
123+
go()
124+
125+
if __name__ == '__main__': main()

0 commit comments

Comments
 (0)