Skip to content

Commit 76b6b06

Browse files
committed
db module, consumer update, etc
1 parent 82db350 commit 76b6b06

File tree

6 files changed

+73
-4
lines changed

6 files changed

+73
-4
lines changed

Dockerfile.consumer

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ RUN apt-get update \
77
&& apt-get -y autoremove \
88
&& apt-get -y clean
99

10-
RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http
10+
RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http psycopg2-binary
1111
# Above, `--break-system-packages` flag overrides the
1212
# "This environment is externally managed" error that calling pip
1313
# would otherwise incur here.

dev-scripts/check_export_tracker.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1+
# Usage:
2+
# docker compose run tools python dev/check_export_tracker.py
3+
# To dump contents of export_tracker to stdout, run:
4+
# docker compose run tools python dev/check_export_tracker.py dump
5+
16
import os
7+
import sys
28
import psycopg2
39
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
410

@@ -25,6 +31,7 @@
2531
except Exception as e:
2632
print(str(e))
2733

34+
rslt = None
2835
try:
2936
curs.execute('select * from export_tracker')
3037
rslt = curs.fetchall()
@@ -36,3 +43,6 @@
3643
print(str(e))
3744

3845
print('==================================================')
46+
47+
if len(sys.argv) == 2 and sys.argv[1] == 'dump':
48+
print(rslt)

docker-compose.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ services:
9696
environment:
9797
AWS_PROFILE: localstack
9898
AWS_ENDPOINT_URL: http://localstack:4566
99+
PGHOST: db
100+
PGUSER: ${POSTGRES_USERNAME:-senzing}
101+
PGPASSWORD: ${POSTGRES_PASSWORD:-senzing}
99102
Q_URL: http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/sqs-senzing-local-ingest
100103
SENZING_ENGINE_CONFIGURATION_JSON: >-
101104
{
@@ -122,6 +125,9 @@ services:
122125
db:
123126
condition: service_healthy
124127
environment:
128+
PGHOST: db
129+
PGUSER: ${POSTGRES_USERNAME:-senzing}
130+
PGPASSWORD: ${POSTGRES_PASSWORD:-senzing}
125131
SENZING_ENGINE_CONFIGURATION_JSON: >-
126132
{
127133
"PIPELINE": {
@@ -146,6 +152,9 @@ services:
146152
environment:
147153
AWS_PROFILE: localstack
148154
AWS_ENDPOINT_URL: http://localstack:4566
155+
PGHOST: db
156+
PGUSER: ${POSTGRES_USERNAME:-senzing}
157+
PGPASSWORD: ${POSTGRES_PASSWORD:-senzing}
149158
S3_BUCKET_NAME: sqs-senzing-local-export
150159
SENZING_ENGINE_CONFIGURATION_JSON: >-
151160
{

middleware/consumer.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import otel
1515
import util
16+
import db
1617

1718
try:
1819
log.info('Importing senzing_core library . . .')
@@ -190,8 +191,12 @@ def clean_up(signum, frm):
190191
success_status = otel.SUCCESS
191192
log.debug(SZ_TAG + 'Successful add_record having ReceiptHandle: '
192193
+ receipt_handle)
193-
# TODO send affected entity IDs to tracker
194+
195+
# Save affected entity IDs to tracker table for exporting later.
194196
affected = util.parse_affected_entities_resp(resp)
197+
log.debug(SZ_TAG + 'Affected entities: ' + str(affected))
198+
for entity_id in affected: db.add_entity_id(entity_id)
199+
195200
except KeyError as ke:
196201
log.error(fmterr(ke))
197202
make_msg_visible(sqs, Q_URL, receipt_handle)

middleware/db.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,46 @@
11
import psycopg2
2+
3+
from loglib import *
4+
log = retrieve_logger()
5+
6+
EXPORT_STATUS_TODO = 1
7+
EXPORT_STATUS_IN_PROGRESS = 2
8+
EXPORT_STATUS_DONE = 3
9+
10+
_params = {
11+
'dbname': 'sqs_entity_resolution',
12+
'user': os.environ['PGUSER'],
13+
'password': os.environ['PGPASSWORD'],
14+
'host': os.environ['PGHOST'],
15+
'port':'5432'}
16+
17+
_conn = psycopg2.connect(**_params)
18+
_curs = _conn.cursor()
19+
20+
def add_entity_id(entity_id):
21+
'''Inserts an entity_id into export_tracker with initial status of
22+
EXPORT_STATUS_TODO.'''
23+
if type(entity_id) is not int: raise TypeError
24+
log.debug(f'Entity ID: {entity_id}')
25+
try:
26+
_curs.execute(
27+
'insert into export_tracker (entity_id, export_status) values (%s, %s)',
28+
[entity_id, EXPORT_STATUS_TODO])
29+
_conn.commit()
30+
except Exception as e:
31+
_conn.rollback()
32+
log.error(f'Failure to insert for entity ID: {entity_id}')
33+
34+
def tag_todo_as_in_progress_and_retrieve():
35+
'''This function does two things:
36+
1. For all rows with status of EXPORT_STATUS_TODO, updates them
37+
to be EXPORT_STATUS_IN_PROGRESS.
38+
2. Returns a *distinct* (no duplicates) list of those entity IDs.'''
39+
...
40+
41+
def tag_in_progress_as_done(export_id):
42+
'''For all rows with status of EXPORT_STATUS_IN_PROGRESS, updates them
43+
to be EXPORT_STATUS_DONE and collectively updates their export_id value.
44+
In Practice, export_id will usually be the name of the output file that
45+
was exported.'''
46+
...

sql/create-export-tracker-table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ CREATE DATABASE sqs_entity_resolution
77
CREATE TABLE public.export_tracker
88
(
99
ts timestamp without time zone NOT NULL default current_timestamp,
10-
entity_id character varying NOT NULL,
11-
export_status integer NOT NULL DEFAULT 0,
10+
entity_id bigint NOT NULL,
11+
export_status smallint NOT NULL DEFAULT 0,
1212
export_id character varying
1313
)

0 commit comments

Comments
 (0)