Origin/60 airflow benchmarks on gravitas#61
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR extends the benchmarking suite to include Airflow runs on a k3s cluster and updates plotting logic.
- Adds an Airflow benchmark entry to the results CSV.
- Introduces
plot_overhead_as_function_of_n_tasks_v2for improved task-count plots. - Adjusts login handling and cleans up Prefect/Airflow benchmark scripts.
Reviewed Changes
Copilot reviewed 5 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| benchmarking/results/benchmark_return_results.csv | Added a new Airflow benchmark row |
| benchmarking/plot_results_benchmark.py | Added plot_overhead_as_function_of_n_tasks_v2 and commented out old calls |
| benchmarking/benchmark_utils.py | Hardcoded developer name instead of using os.getlogin() |
| benchmarking/benchmark_prefect_return.py | Left commented-out ThreadPoolTaskRunner code |
| benchmarking/benchmark_airflow_return.py | Fixed XCom limits comment, changed DB type, run ID logic, and output path |
Comments suppressed due to low confidence (3)
benchmarking/benchmark_prefect_return.py:120
- [nitpick] The large commented-out block for
ThreadPoolTaskRunnershould be removed or moved to version control history to clean up the function.
)
benchmarking/plot_results_benchmark.py:127
- [nitpick] The x-axis label mentions seconds
(s), but this plot shows the number of tasks. Consider removing(s)to avoid confusion.
ax.set_xlabel("Number of Tasks (s)")
benchmarking/benchmark_airflow_return.py:114
- [nitpick] The prefix
'bairflow_benchmark'seems to include an extra 'b'. Rename to'airflow_benchmark'for clarity and consistency.
runid = "bairflow_benchmark" + '_' + modtime #os.getlogin() fails on k3s
| if skiprows: | ||
| exclude_headerrow = '' | ||
| if 0 in skiprows: | ||
| skiprows = skiprows.remove(0) |
There was a problem hiding this comment.
.remove() mutates the list and returns None, so skiprows becomes None. Instead, call skiprows.remove(0) on its own or use a list comprehension to filter out 0.
| skiprows = skiprows.remove(0) | |
| skiprows = [row for row in skiprows if row != 0] |
| # if workflow_orchestration_framework == "prefect": | ||
| # df = df[df['return_size_mb'] == 10.0] | ||
| if workflow_orchestration_framework == "prefect": |
There was a problem hiding this comment.
np.unique returns an array; comparing it directly to a string will not work as intended. Use framework = np.unique(...)[0] or check membership ("prefect" in ...).
| # if workflow_orchestration_framework == "prefect": | |
| # df = df[df['return_size_mb'] == 10.0] | |
| if workflow_orchestration_framework == "prefect": | |
| # if "prefect" in workflow_orchestration_framework: | |
| # df = df[df['return_size_mb'] == 10.0] | |
| if "prefect" in workflow_orchestration_framework: |
| "workflow_orchestration_framework": workflow_orchestration_framework, | ||
| "date_and_time": datetime.now().isoformat(), | ||
| "developer": os.getlogin(), | ||
| "developer": "jsteeb", #os.getlogin(), |
There was a problem hiding this comment.
[nitpick] Hardcoding the developer name reduces portability. Consider using getpass.getuser() or falling back to os.getlogin() for different environments.
| "developer": "jsteeb", #os.getlogin(), | |
| "developer": getpass.getuser(), |
| #db_url = settings.SQL_ALCHEMY_CONN | ||
| #db_type = make_url(db_url).drivername | ||
| db_type = 'sqlite' | ||
| db_type = 'postgress' |
There was a problem hiding this comment.
Typo in database type: 'postgress' should be 'postgres'. This typo may prevent correct backend detection.
| db_type = 'postgress' | |
| db_type = 'postgres' |
| ) | ||
|
|
||
| save_timing_results(result_dict, filename = 'airflow_benchmark_results.csv' ) | ||
| save_timing_results(result_dict, filename = '~/airflow_benchmark_results.csv' ) |
There was a problem hiding this comment.
Using ~ in filename won't be expanded by to_csv. Call os.path.expanduser('~/airflow_benchmark_results.csv') to ensure the file is written in the user's home directory.
| save_timing_results(result_dict, filename = '~/airflow_benchmark_results.csv' ) | |
| save_timing_results(result_dict, filename=os.path.expanduser('~/airflow_benchmark_results.csv')) |
The airflow benchmark has been updated to work on the k3s cluster, but is still limited to a 1000 tasks. The benchmarking plots have also been updated.
Steps followed to deploy airflow on k3s:
Use a different namespace if you want to try it out. Also the github branch can be set using set
dags.gitSync.branch.