-
Notifications
You must be signed in to change notification settings - Fork 6
Enhanced scan report upload speed #1227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
prquinlan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments added to ensure we have a mechanism to manage large inputs and to have a batching strategy.
| conn = pg_hook.get_conn() | ||
| cursor = conn.cursor() | ||
| try: | ||
| execute_values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be good to have some sanity check and batching on this input, at the moment I think this could be any size and we probably want a batch of a certain size.
Previously pg_hook.insert_rows did this automatically so we would want something similar to ensure it is robust.
| conn = pg_hook.get_conn() | ||
| cursor = conn.cursor() | ||
| try: | ||
| execute_values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here as well.
| conn = pg_hook.get_conn() | ||
| cursor = conn.cursor() | ||
| try: | ||
| execute_values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this replaces insert_rows with a direct psycopg2 call. Could you clarify why we're bypassing the Airflow hook rather than tuning its batching on the insert_rows parameters?
I thought that where's we left it on Tuesday - so curious why you have moved on to reimplement Airflow's logic?
Using psycopg2 directly adds a second db access pattern, and we should balance the trade-off we are accepting by going to the driver level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After trying fast_executemany yesterday, it wasnt working. I did some research and found that you cannot use fast_executemany with Airflow’s PostgresHook because it is a pyodbc feature intended for SQL Server, while the PostgresHook uses psycopg2, which does not support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that's interesting, what do you mean by it's not working?
I'm surprised as this is from the Postgres provider documentation: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/hooks/postgres/index.html
And is very much in the Postgres code: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_modules/airflow/providers/postgres/hooks/postgres.html#PostgresHook.insert_rows
And in psycopg2: https://www.psycopg.org/docs/extras.html#fast-execution-helpers
Do you have an error message?
There is also more than fast_executemany to tune - so understanding what is not working before we drop down would be good.
|
@AndyRae @prquinlan I have decided to simplify the logic by keeping the initial implementation. I will use I would suggest we test it on dev and see the outcome.
|
|
Yeah, I think this is the best way. I expect you to see more improvements when you move to dev, as the latency of each round is higher. Maybe also first worth trying different commit_every locally - if you increment that up, do you get incremental improvements? |
Yes that is possible. The Upload is taking place as we speak on dev. I am monitoring it to see how long it takes to complete then we can see if we should increase the commit_every |
Perfect. I would try the different batch sizes locally, to find a good setting, then test on dev. |
That is interesting - what do the Airflow logs say they are running on the database? |
The logs say all was successful, and it was written with a batch of 3000 as instructed. |
| "value_description", | ||
| ], | ||
| fast_executemany=True, | ||
| commit_every=0, # commit every row to avoid transaction overhead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be helpful to have this as an environment variable in config, so we tune through configuration rather than deployment.


⚡️ Optimization
PR Description
The bulk inserts in update_temp_data_dictionary_table() and create_temp_field_values_table() were using pg_hook.run() in loops, which meant one INSERT per row. For large scan reports with thousands of records, this was slow and could cause DAG timeouts.
This PR switched to psycopg2.extras.execute_values() to batch all rows into a single INSERT, which is much faster and avoids timeout issues.
Related Issues or other material
Related #1226
Closes #1226
Screenshots, example outputs/behaviour etc.
Below is a screenshot of the new record time it takes to upload the scan report and data dictionary