Skip to content

Commit 83c694f

Browse files
authored
Merge pull request #5026 from broadinstitute/airflow-deprecation
Airflow deprecation
2 parents a41cdf2 + d395a5f commit 83c694f

File tree

8 files changed

+240
-546
lines changed

8 files changed

+240
-546
lines changed

seqr/utils/search/add_data_utils.py

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
from collections import defaultdict, OrderedDict
22
from django.contrib.auth.models import User
33
from django.db.models import F
4+
import json
5+
import requests
46
from typing import Callable
57

68
from reference_data.models import GeneInfo, GENOME_VERSION_LOOKUP
79
from seqr.models import Sample, Individual, Project
8-
from seqr.utils.communication_utils import send_project_notification
10+
from seqr.utils.communication_utils import send_project_notification, safe_post_to_slack
911
from seqr.utils.file_utils import does_file_exist
1012
from seqr.utils.logging_utils import SeqrLogger
13+
from seqr.utils.middleware import ErrorsWarningsException
1114
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
1215
from seqr.views.utils.export_utils import write_multiple_files
1316
from seqr.views.utils.pedigree_info_utils import JsonConstants
14-
from settings import SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL, BASE_URL, ANVIL_UI_URL, \
15-
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL
17+
from settings import SEQR_SLACK_DATA_ALERTS_NOTIFICATION_CHANNEL, BASE_URL, ANVIL_UI_URL, PIPELINE_RUNNER_SERVER, \
18+
SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL, LOADING_DATASETS_DIR
1619

1720
logger = SeqrLogger(__name__)
1821

@@ -62,40 +65,58 @@ def update_airtable_loading_tracking_status(project, status, additional_update=N
6265
update={'Status': status, **(additional_update or {})},
6366
)
6467

65-
def _format_loading_pipeline_variables(
66-
projects: list[Project], genome_version: str, dataset_type: str, sample_type: str = None, **kwargs
67-
):
68+
def trigger_data_loading(projects: list[Project], individual_ids: list[int], sample_type: str, dataset_type: str,
69+
genome_version: str, data_path: str, user: User, raise_error: bool = False, skip_expect_tdr_metrics: bool = True,
70+
skip_validation: bool = False, skip_check_sex_and_relatedness: bool = True, vcf_sample_id_map=None,
71+
success_message: str = None, error_message: str = None, success_slack_channel: str = SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL):
6872
variables = {
6973
'projects_to_run': sorted([p.guid for p in projects]) if projects else None,
70-
'dataset_type': _dag_dataset_type(sample_type, dataset_type),
74+
'dataset_type': _loading_dataset_type(sample_type, dataset_type),
7175
'reference_genome': GENOME_VERSION_LOOKUP[genome_version],
72-
**kwargs
76+
'callset_path': data_path,
77+
'sample_type': sample_type,
7378
}
74-
if sample_type:
75-
variables['sample_type'] = sample_type
76-
return variables
77-
78-
def prepare_data_loading_request(projects: list[Project], individual_ids: list[int], sample_type: str, dataset_type: str, genome_version: str,
79-
data_path: str, user: User, load_data_dir: str, raise_pedigree_error: bool = False,
80-
skip_validation: bool = False, skip_check_sex_and_relatedness: bool = False, vcf_sample_id_map=None):
81-
variables = _format_loading_pipeline_variables(
82-
projects,
83-
genome_version,
84-
dataset_type,
85-
sample_type,
86-
callset_path=data_path,
87-
)
88-
if skip_validation:
89-
variables['skip_validation'] = True
90-
if skip_check_sex_and_relatedness:
91-
variables['skip_check_sex_and_relatedness'] = True
92-
file_path = _get_pedigree_path(load_data_dir, genome_version, sample_type, dataset_type)
93-
_upload_data_loading_files(individual_ids, vcf_sample_id_map or {}, user, file_path, raise_pedigree_error)
94-
_write_gene_id_file(load_data_dir, user)
95-
return variables, file_path
79+
bool_variables = {
80+
'skip_validation': skip_validation,
81+
'skip_check_sex_and_relatedness': skip_check_sex_and_relatedness,
82+
'skip_expect_tdr_metrics': skip_expect_tdr_metrics,
83+
}
84+
variables.update({k: v for k, v in bool_variables.items() if v})
85+
file_path = _get_pedigree_path(genome_version, sample_type, dataset_type)
86+
_upload_data_loading_files(individual_ids, vcf_sample_id_map or {}, user, file_path, raise_error)
87+
_write_gene_id_file(user)
88+
89+
response = requests.post(f'{PIPELINE_RUNNER_SERVER}/loading_pipeline_enqueue', json=variables, timeout=60)
90+
success = True
91+
try:
92+
response.raise_for_status()
93+
logger.info('Triggered loading pipeline', user, detail=variables)
94+
except requests.HTTPError as e:
95+
success = False
96+
error = str(e)
97+
if response.status_code == 409:
98+
error = 'Loading pipeline is already running. Wait for it to complete and resubmit'
99+
e = ErrorsWarningsException([error])
100+
if raise_error:
101+
raise e
102+
else:
103+
logger.warning(f'Error triggering loading pipeline: {error}', user, detail=variables)
104+
safe_post_to_slack(
105+
SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL,
106+
f'{error_message}: {error}\nLoading pipeline should be triggered with:\n```{json.dumps(variables, indent=4)}```',
107+
)
108+
109+
if success_message and (success or success_slack_channel != SEQR_SLACK_LOADING_NOTIFICATION_CHANNEL):
110+
safe_post_to_slack(success_slack_channel, '\n\n'.join([
111+
success_message,
112+
f'Pedigree files have been uploaded to {file_path}',
113+
f'Loading pipeline is triggered with:\n```{json.dumps(variables, indent=4)}```',
114+
]))
115+
116+
return success
96117

97118

98-
def _dag_dataset_type(sample_type: str, dataset_type: str):
119+
def _loading_dataset_type(sample_type: str, dataset_type: str):
99120
return 'GCNV' if dataset_type == Sample.DATASET_TYPE_SV_CALLS and sample_type == Sample.SAMPLE_TYPE_WES \
100121
else dataset_type
101122

@@ -132,9 +153,9 @@ def _upload_data_loading_files(individual_ids: list[int], vcf_sample_id_map: dic
132153
raise e
133154

134155

135-
def _write_gene_id_file(load_data_dir, user):
156+
def _write_gene_id_file(user):
136157
file_name = 'db_id_to_gene_id'
137-
if does_file_exist(f'{load_data_dir}/{file_name}.csv.gz'):
158+
if does_file_exist(f'{LOADING_DATASETS_DIR}/{file_name}.csv.gz'):
138159
return
139160

140161
gene_data_loaded = (GeneInfo.objects.filter(gencode_release=int(GeneInfo.CURRENT_VERSION)).exists() and
@@ -146,12 +167,12 @@ def _write_gene_id_file(load_data_dir, user):
146167
)
147168
gene_data = GeneInfo.objects.all().values('gene_id', db_id=F('id')).order_by('id')
148169
file_config = (file_name, ['db_id', 'gene_id'], gene_data)
149-
write_multiple_files([file_config], load_data_dir, user, file_format='csv', gzip_file=True)
170+
write_multiple_files([file_config], LOADING_DATASETS_DIR, user, file_format='csv', gzip_file=True)
150171

151172

152-
def _get_pedigree_path(pedigree_dir: str, genome_version: str, sample_type: str, dataset_type: str):
153-
dag_dataset_type = _dag_dataset_type(sample_type, dataset_type)
154-
return f'{pedigree_dir}/{GENOME_VERSION_LOOKUP[genome_version]}/{dag_dataset_type}/pedigrees/{sample_type}'
173+
def _get_pedigree_path(genome_version: str, sample_type: str, dataset_type: str):
174+
loading_dataset_type = _loading_dataset_type(sample_type, dataset_type)
175+
return f'{LOADING_DATASETS_DIR}/{GENOME_VERSION_LOOKUP[genome_version]}/{loading_dataset_type}/pedigrees/{sample_type}'
155176

156177

157178
def get_loading_samples_validator(vcf_samples: list[str], loaded_individual_ids: list[int], sample_source: str,

seqr/views/apis/anvil_workspace_api.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from seqr.models import Project, Family, CAN_EDIT, Sample, IgvSample
1414
from seqr.views.react_app import render_app_html
1515
from seqr.views.utils.airtable_utils import AirtableSession, ANVIL_REQUEST_TRACKING_TABLE
16-
from seqr.views.utils.airflow_utils import trigger_airflow_data_loading
1716
from seqr.views.utils.json_to_orm_utils import create_model_from_json
1817
from seqr.views.utils.json_utils import create_json_response
1918
from seqr.views.utils.file_utils import load_uploaded_file
@@ -23,7 +22,7 @@
2322
from seqr.views.utils.individual_utils import add_or_update_individuals_and_families
2423
from seqr.utils.communication_utils import send_html_email
2524
from seqr.utils.file_utils import list_files
26-
from seqr.utils.search.add_data_utils import get_loading_samples_validator
25+
from seqr.utils.search.add_data_utils import get_loading_samples_validator, trigger_data_loading
2726
from seqr.utils.vcf_utils import validate_vcf_and_get_samples, get_vcf_list
2827
from seqr.utils.logging_utils import SeqrLogger
2928
from seqr.utils.middleware import ErrorsWarningsException
@@ -275,7 +274,7 @@ def _trigger_add_workspace_data(project, pedigree_records, user, data_path, samp
275274
f"({GENOME_VERSION_LOOKUP.get(project.genome_version)}) from AnVIL workspace *{project.workspace_namespace}/{project.workspace_name}* at "
276275
f"{data_path} to seqr project <{_get_seqr_project_url(project)}|*{project.name}*> (guid: {project.guid})"
277276
)
278-
trigger_success = trigger_airflow_data_loading(
277+
trigger_success = trigger_data_loading(
279278
[project], individual_ids, sample_type, Sample.DATASET_TYPE_VARIANT_CALLS, project.genome_version, data_path, user=user, success_message=success_message,
280279
success_slack_channel=SEQR_SLACK_ANVIL_DATA_LOADING_CHANNEL, error_message=f'ERROR triggering AnVIL loading for project {project.guid}',
281280
)

0 commit comments

Comments
 (0)