3232FOLDER_NAME = os .environ .get ('FOLDER_NAME' , 'exporter-outputs' )
3333RUNTIME_ENV = os .environ .get ('RUNTIME_ENV' , 'unknown' ) # For OTel
3434
35- EXPORT_FLAGS = sz .SzEngineFlags .SZ_EXPORT_DEFAULT_FLAGS
35+ EXPORT_FLAGS = sz .SzEngineFlags .SZ_EXPORT_DEFAULT_FLAGS
36+
37+ FULL_EXPORT_MODE = 'TODO' # grab from env var
38+
39+ # The output file is accumulated chunk by chunk from Senzing; this is how
40+ # many bytes we put together before sending those combined chunks as a 'part'
41+ # to S3 via multipart S3 upload.
42+ BYTES_PER_PART = (1024 ** 2 ) * 10
3643
3744#-------------------------------------------------------------------------------
3845
@@ -93,9 +100,6 @@ def go():
93100 log .info ('Finished OTel setup.' )
94101 # end OTel setup #
95102
96- # init buffer
97- buff = io .BytesIO ()
98-
99103 # Retrieve output from sz into buff
100104 # sz will export JSONL lines; we add the chars necessary to make
101105 # the output as a whole be a single JSON blob.
@@ -104,41 +108,98 @@ def go():
104108 start = time .perf_counter ()
105109 success_status = otel .FAILURE # initial default state
106110
111+ # For multipart S3 upload, S3 will hand back to us an etag for each
112+ # part we upload to it. We need to accumulate the part IDs (which we
113+ # set) and the etags (which S3 gives) and provide it all to S3 at the very
114+ # end when wrapping up the upload.
115+ # This is a list of maps:
116+ part_ids_and_tags = []
117+
118+ FETCH_COMPLETE = False
119+
120+ key = FOLDER_NAME + '/' + build_output_filename ()
121+ upload_id = None
107122 try :
108123 export_handle = sz_eng .export_json_entity_report (EXPORT_FLAGS )
109124 log .info (SZ_TAG + 'Obtained export_json_entity_report handle.' )
110- buff .write ('[' .encode ('utf-8' ))
125+
126+ mup_resp = s3 .create_multipart_upload (
127+ Bucket = S3_BUCKET_NAME ,
128+ ContentType = 'application/jsonl' ,
129+ Key = key )
130+
131+ upload_id = mup_resp ['UploadId' ]
132+ log .debug (f'Initialized a multipart S3 upload. UploadId: { upload_id } ' )
133+ print (mup_resp )
134+
135+ part_id = 0
136+
137+ # Each loop we put together a "part" of the file and send it to S3.
111138 while 1 :
112- log .debug (SZ_TAG + 'Fetching chunk...' )
113- chunk = sz_eng .fetch_next (export_handle )
114- if not chunk :
139+ # init buffer
140+ buff = io .BytesIO ()
141+ # Set up the ID for this particular part.
142+ part_id += 1
143+ # Inner loop lets us accumulate up to BYTES_PER_PART before
144+ # sending off to S3.
145+ while 1 :
146+ log .debug (SZ_TAG + 'Fetching chunk...' )
147+ chunk = sz_eng .fetch_next (export_handle )
148+ if not chunk :
149+ FETCH_COMPLETE = True
150+ log .debug ('Fetch from Senzing complete.' )
151+ else :
152+ buff .write (chunk .encode ('utf-8' ))
153+ log .debug ('Wrote chunk to buffer.' )
154+ # Send this part to S3, and save the etag it gives us back.
155+ if buff .getbuffer ().nbytes >= BYTES_PER_PART or FETCH_COMPLETE :
156+ log .debug (f'Preparing and uploading part { part_id } to S3.' )
157+ # rewind to start of buff
158+ buff .seek (0 )
159+ buff .flush ()
160+ resp = s3 .upload_part (
161+ Bucket = S3_BUCKET_NAME ,
162+ Key = key ,
163+ UploadId = upload_id ,
164+ PartNumber = part_id ,
165+ Body = buff .read ())
166+ log .debug (f'Sent part { part_id } to S3. ETag: { resp ["ETag" ]} ' )
167+ part_ids_and_tags .append ({
168+ 'PartNumber' : part_id ,
169+ 'ETag' : resp ['ETag' ]})
170+ # We start wtih a new buff obj at next iteration.
171+ buff .close ()
172+ break
173+ # end inner while
174+ if FETCH_COMPLETE :
115175 break
116- buff .write (chunk .encode ('utf-8' ))
117- log .debug ('Wrote chunk to buffer.' )
118- buff .write (',' .encode ('utf-8' ))
176+ # end outer while
177+
119178 sz_eng .close_export_report (export_handle )
120- log .info (SZ_TAG + 'Closed export handle.' )
121- buff .seek (- 1 , os .SEEK_CUR ) # toss out last comma
122- buff .write (']' .encode ('utf-8' ))
123- log .info ('Total bytes exported/buffered: ' + str (buff .getbuffer ().nbytes ))
179+ log .info (SZ_TAG + 'Closed Senzing export handle.' )
180+ # Wrap up the S3 upload via complete_multipart_upload
181+ rslt = s3 .complete_multipart_upload (
182+ Bucket = S3_BUCKET_NAME ,
183+ Key = key ,
184+ MultipartUpload = {'Parts' :part_ids_and_tags },
185+ UploadId = upload_id )
186+ log .info ('Finished uploading all parts to S3. All done.' )
187+ log .info (f'Full path in S3: { key } ' )
188+
124189 except sz .SzError as err :
125190 log .error (SZ_TAG + fmterr (err ))
191+ if upload_id :
192+ s3 .abort_multipart_upload (
193+ Bucket = S3_BUCKET_NAME ,
194+ Key = key ,
195+ UploadId = upload_id )
126196 except Exception as e :
127197 log .error (fmterr (e ))
128-
129- # rewind buffer
130- buff .seek (0 )
131- buff .flush ()
132-
133- # write buff to S3 using upload_fileobj
134- full_path = FOLDER_NAME + '/' + build_output_filename ()
135- log .info (AWS_TAG + 'About to upload JSON file ' + full_path + ' to S3 ...' )
136- try :
137- s3 .upload_fileobj (buff , S3_BUCKET_NAME , full_path )
138- log .info (AWS_TAG + 'Successfully uploaded file.' )
139- success_status = otel .SUCCESS
140- except Exception as e :
141- log .error (AWS_TAG + fmterr (e ))
198+ if upload_id :
199+ s3 .abort_multipart_upload (
200+ Bucket = S3_BUCKET_NAME ,
201+ Key = key ,
202+ UploadId = upload_id )
142203
143204 finish = time .perf_counter ()
144205 otel_exp_counter .add (1 ,
0 commit comments