Funded by the EU.
This component is responsible for transferring the finished data files from the workflow to the downstream components in an appropriate form. For CF, they are uploaded to an OpenSearch database hosted on the cluster; For AR_XR, a list of S3 paths for relevant image files is given.
The main function is post_pro/app.py:run, which spawns a KafkaConsumer and hands it a handler implemented in post_pro/execution.py:PostProcessorExecution. Upon receiving a message, it will do the following:
-
Download all valid data files from the S3 bucket it is connected to; this includes
.csv,.json,.xml,.xlsx, etc (this may be seen inpost_pro/pipeline.py:FileKind), as well as.tarand.ziparchives. The latter are extracted, and their contents treated as if they were in the S3 bucket. The archives may have a nested internal structure. -
Upload the downloaded and extracted files to OpenSearch. In order for this to happen, the files must be in some sort of tabular form. Each file is assigned a (hopefully unique) index; indices must be strictly alphanumeric and lowercase, so they are generated by taking the filepath, removing all alphanumeric characters and converting to lowercase. For files originating in archives, the name of the archive file is attached to the front. For example:
- File
<s3_bucket_name>/store/b10/b10data-0.csvwill have the indexstoreb10b10data0csv - File
<s3_bucket_name>/Training.xmlwill have the indextrainingxml - File
<s3_bucket_name>/archive.tar.xz:Region2/pr0.dbfwill have the indexarchivetarxzregion2pr0dbf
- File
