Skip to content

Commit 12627f1

Browse files
committed
initial impl of multipart S3 uploads + more
1 parent 616386f commit 12627f1

File tree

3 files changed

+120
-34
lines changed

3 files changed

+120
-34
lines changed

dev-scripts/check_export_tracker.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
# Usage:
1+
# Basic usage:
22
# docker compose run tools python dev/check_export_tracker.py
3-
# To dump contents of export_tracker to stdout, run:
3+
# To dump contents of export_tracker to stdout:
44
# docker compose run tools python dev/check_export_tracker.py dump
5+
# Choose which export_status (1, 2, or 3) to dump (default is 1):
6+
# docker compose run tools python dev/check_export_tracker.py dump 2
57

68
import os
79
import sys
@@ -44,6 +46,12 @@
4446

4547
print('==================================================')
4648

47-
if len(sys.argv) == 2 and sys.argv[1] == 'dump':
49+
if len(sys.argv) > 1 and sys.argv[1] == 'dump':
4850
print(rslt)
4951
print(f'Total: {len(rslt)}')
52+
export_status = sys.argv[2] if len(sys.argv) > 2 else 1 # 1 == EXPORT_STATUS_TODO
53+
curs.execute('select distinct(entity_id) from export_tracker where export_status = %s',
54+
export_status)
55+
out = list(map(lambda x: x[0], curs.fetchall()))
56+
print(type(out))
57+
print(out)

middleware/db.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,31 @@ def add_entity_id(entity_id):
2929
_conn.commit()
3030
except Exception as e:
3131
_conn.rollback()
32-
log.error(f'Failure to insert for entity ID: {entity_id}')
32+
log.error(fmterr(e))
3333

3434
def tag_todo_as_in_progress_and_retrieve():
3535
'''This function does two things:
3636
1. For all rows with status of EXPORT_STATUS_TODO, updates them
3737
to be EXPORT_STATUS_IN_PROGRESS.
3838
2. Returns a *distinct* (no duplicates) list of those entity IDs.'''
39-
...
39+
out = []
40+
log.debug('tag_todo_as_in_progress_and_retrieve called.')
41+
try:
42+
_curs.execute(
43+
'update export_tracker set export_status = %s where export_status = %s',
44+
[EXPORT_STATUS_TODO, EXPORT_STATUS_IN_PROGRESS])
45+
log.debug('db update ran ok.')
46+
_curs.execute(
47+
'select distinct(entity_id) from export_tracker where export_status = %s',
48+
[EXPORT_STATUS_IN_PROGESS])
49+
out = list(map(lambda x: x[0], _curs.fetchall()))
50+
log.debug('db select distinct ran ok.')
51+
_conn.commit()
52+
log.debug('db commit ran ok.')
53+
return out
54+
except Exception as e:
55+
_conn.rollback()
56+
log.error(fmterr(e))
4057

4158
def tag_in_progress_as_done(export_id):
4259
'''For all rows with status of EXPORT_STATUS_IN_PROGRESS, updates them

middleware/exporter.py

Lines changed: 90 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,14 @@
3232
FOLDER_NAME = os.environ.get('FOLDER_NAME', 'exporter-outputs')
3333
RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel
3434

35-
EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS
35+
EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS
36+
37+
FULL_EXPORT_MODE = 'TODO' # grab from env var
38+
39+
# The output file is accumulated chunk by chunk from Senzing; this is how
40+
# many bytes we put together before sending those combined chunks as a 'part'
41+
# to S3 via multipart S3 upload.
42+
BYTES_PER_PART = (1024 ** 2) * 10
3643

3744
#-------------------------------------------------------------------------------
3845

@@ -93,9 +100,6 @@ def go():
93100
log.info('Finished OTel setup.')
94101
# end OTel setup #
95102

96-
# init buffer
97-
buff = io.BytesIO()
98-
99103
# Retrieve output from sz into buff
100104
# sz will export JSONL lines; we add the chars necessary to make
101105
# the output as a whole be a single JSON blob.
@@ -104,41 +108,98 @@ def go():
104108
start = time.perf_counter()
105109
success_status = otel.FAILURE # initial default state
106110

111+
# For multipart S3 upload, S3 will hand back to us an etag for each
112+
# part we upload to it. We need to accumulate the part IDs (which we
113+
# set) and the etags (which S3 gives) and provide it all to S3 at the very
114+
# end when wrapping up the upload.
115+
# This is a list of maps:
116+
part_ids_and_tags = []
117+
118+
FETCH_COMPLETE = False
119+
120+
key = FOLDER_NAME + '/' + build_output_filename()
121+
upload_id = None
107122
try:
108123
export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS)
109124
log.info(SZ_TAG + 'Obtained export_json_entity_report handle.')
110-
buff.write('['.encode('utf-8'))
125+
126+
mup_resp = s3.create_multipart_upload(
127+
Bucket=S3_BUCKET_NAME,
128+
ContentType='application/jsonl',
129+
Key=key)
130+
131+
upload_id = mup_resp['UploadId']
132+
log.debug(f'Initialized a multipart S3 upload. UploadId: {upload_id}')
133+
print(mup_resp)
134+
135+
part_id = 0
136+
137+
# Each loop we put together a "part" of the file and send it to S3.
111138
while 1:
112-
log.debug(SZ_TAG + 'Fetching chunk...')
113-
chunk = sz_eng.fetch_next(export_handle)
114-
if not chunk:
139+
# init buffer
140+
buff = io.BytesIO()
141+
# Set up the ID for this particular part.
142+
part_id += 1
143+
# Inner loop lets us accumulate up to BYTES_PER_PART before
144+
# sending off to S3.
145+
while 1:
146+
log.debug(SZ_TAG + 'Fetching chunk...')
147+
chunk = sz_eng.fetch_next(export_handle)
148+
if not chunk:
149+
FETCH_COMPLETE = True
150+
log.debug('Fetch from Senzing complete.')
151+
else:
152+
buff.write(chunk.encode('utf-8'))
153+
log.debug('Wrote chunk to buffer.')
154+
# Send this part to S3, and save the etag it gives us back.
155+
if buff.getbuffer().nbytes >= BYTES_PER_PART or FETCH_COMPLETE:
156+
log.debug(f'Preparing and uploading part {part_id} to S3.')
157+
# rewind to start of buff
158+
buff.seek(0)
159+
buff.flush()
160+
resp = s3.upload_part(
161+
Bucket=S3_BUCKET_NAME,
162+
Key=key,
163+
UploadId=upload_id,
164+
PartNumber=part_id,
165+
Body=buff.read())
166+
log.debug(f'Sent part {part_id} to S3. ETag: {resp["ETag"]}')
167+
part_ids_and_tags.append({
168+
'PartNumber': part_id,
169+
'ETag': resp['ETag']})
170+
# We start wtih a new buff obj at next iteration.
171+
buff.close()
172+
break
173+
# end inner while
174+
if FETCH_COMPLETE:
115175
break
116-
buff.write(chunk.encode('utf-8'))
117-
log.debug('Wrote chunk to buffer.')
118-
buff.write(','.encode('utf-8'))
176+
# end outer while
177+
119178
sz_eng.close_export_report(export_handle)
120-
log.info(SZ_TAG + 'Closed export handle.')
121-
buff.seek(-1, os.SEEK_CUR) # toss out last comma
122-
buff.write(']'.encode('utf-8'))
123-
log.info('Total bytes exported/buffered: ' + str(buff.getbuffer().nbytes))
179+
log.info(SZ_TAG + 'Closed Senzing export handle.')
180+
# Wrap up the S3 upload via complete_multipart_upload
181+
rslt = s3.complete_multipart_upload(
182+
Bucket=S3_BUCKET_NAME,
183+
Key=key,
184+
MultipartUpload={'Parts':part_ids_and_tags},
185+
UploadId=upload_id)
186+
log.info('Finished uploading all parts to S3. All done.')
187+
log.info(f'Full path in S3: {key}')
188+
124189
except sz.SzError as err:
125190
log.error(SZ_TAG + fmterr(err))
191+
if upload_id:
192+
s3.abort_multipart_upload(
193+
Bucket=S3_BUCKET_NAME,
194+
Key=key,
195+
UploadId=upload_id)
126196
except Exception as e:
127197
log.error(fmterr(e))
128-
129-
# rewind buffer
130-
buff.seek(0)
131-
buff.flush()
132-
133-
# write buff to S3 using upload_fileobj
134-
full_path = FOLDER_NAME + '/' + build_output_filename()
135-
log.info(AWS_TAG + 'About to upload JSON file ' + full_path + ' to S3 ...')
136-
try:
137-
s3.upload_fileobj(buff, S3_BUCKET_NAME, full_path)
138-
log.info(AWS_TAG + 'Successfully uploaded file.')
139-
success_status = otel.SUCCESS
140-
except Exception as e:
141-
log.error(AWS_TAG + fmterr(e))
198+
if upload_id:
199+
s3.abort_multipart_upload(
200+
Bucket=S3_BUCKET_NAME,
201+
Key=key,
202+
UploadId=upload_id)
142203

143204
finish = time.perf_counter()
144205
otel_exp_counter.add(1,

0 commit comments

Comments
 (0)