@@ -5,7 +5,7 @@ version 1.0
55# 1- A list of all pairs of samples w/ a kinship score greater than a certain threshold (default: 0.1)
66# 2- A list of samples that would need to be removed to remove relatedness cofounds from the full cohort
77
8- ## Copyright Broad Institute, 2023
8+ ## Copyright Broad Institute, 2025
99##
1010## This WDL pipeline processes a set of files:
1111## - HQ_variants.vcf: Full VCF with genotypes of the full cohort. This is produced by AoU ancestry workflow (https://github.com/broadinstitute/aou-ancestry)
@@ -35,7 +35,6 @@ struct RuntimeAttr {
3535 Int ? max_retries
3636}
3737
38-
3938workflow run_relatedness {
4039 input {
4140 # Analysis Parameters
@@ -53,6 +52,8 @@ workflow run_relatedness {
5352 String executor_memory # Memory assigned to each Spark executor
5453 String driver_memory # Memory assigned to the Spark driver
5554 String reference_genome # Reference genome identifier (e.g., "hg38")
55+ Int max_idle = 60 # in minutes
56+ Int max_age = 1440 # in minutes
5657
5758
5859 # Cluster Parameters
@@ -66,8 +67,10 @@ workflow run_relatedness {
6667 # VM Parameters
6768 String hail_docker = "us.gcr.io/broad-dsde-methods/lichtens/hail_dataproc_wdl:1.1" # Docker image with Hail and Google Cloud SDK
6869 #String hail_docker = "gcr.io/broad-dsde-methods/aou-auxiliary/hail_dataproc_wdl:0.2.125" # Docker image with Hail and Google Cloud SDK
70+
71+ String hail_docker_maximal_independent_set = "hailgenetics/hail:0.2.67" # Docker image to use with for the maximal independent set task
6972 }
70- String pipeline_version ="aou_9.0 .0"
73+ String pipeline_version ="aou_9.1 .0"
7174
7275 call run_relatedness_task {
7376 # Task inputs mirror workflow inputs
@@ -92,11 +95,20 @@ workflow run_relatedness {
9295 submission_script = submission_script ,
9396 hail_docker = hail_docker ,
9497 region = region ,
98+ max_idle = max_idle ,
99+ max_age = max_age
100+ }
101+
102+ call run_maximal_independent_set {
103+ input :
104+ relatedness_tsv = run_relatedness_task .relatedness ,
105+ task_identifier = task_identifier ,
106+ hail_docker_maximal_independent_set = hail_docker_maximal_independent_set
95107 }
96108
97109 output {
98110 File relatedness = run_relatedness_task .relatedness
99- File relatedness_flagged_samples = run_relatedness_task .relatedness_flagged_samples
111+ File relatedness_flagged_samples = run_maximal_independent_set .relatedness_flagged_samples
100112 }
101113}
102114
@@ -124,6 +136,8 @@ task run_relatedness_task {
124136 RuntimeAttr ? runtime_attr_override
125137 String gcs_subnetwork_name
126138 String hail_docker
139+ Int max_idle
140+ Int max_age
127141 }
128142
129143 RuntimeAttr runtime_default = object {
@@ -160,9 +174,19 @@ task run_relatedness_task {
160174 print("Running python code...")
161175 import hail as hl
162176 import os
177+ import sys
163178 import uuid
164179 import re
165180 from google.cloud import dataproc_v1 as dataproc
181+ from datetime import datetime
182+
183+ def popen_read_checked(cmd):
184+ with os.popen(cmd) as stream:
185+ output = stream.read()
186+ status = stream.close() # returns None on success, exit code << 8 on failure
187+ if status is not None: # means command failed
188+ raise RuntimeError(f"Command failed with exit code {status >> 8}: {cmd}\n{output}")
189+ return output
166190
167191 # Function to replace unacceptable characters with '-'
168192 def sanitize_label(label):
@@ -205,7 +229,7 @@ task run_relatedness_task {
205229 --region ~{region } --project ~{gcs_project } --service-account {account}
206230 --worker-machine-type n1-standard-4
207231 --master-machine-type n1-highmem-32
208- --max-idle=60m --max-age=1440m
232+ --max-idle=~{ max_idle }m --max-age=~{ max_age }m
209233 --subnet=projects/~{gcs_project }/regions/~{region }/subnetworks/~{gcs_subnetwork_name }
210234 --enable-component-gateway
211235 {cluster_name}
@@ -249,21 +273,20 @@ task run_relatedness_task {
249273
250274 # Replace newline characters with spaces and remove extra spaces
251275 submit_cmd = ' '.join(submit_cmd.split())
252- os.popen(submit_cmd).read()
276+ result = popen_read_checked(submit_cmd)
277+ print(result)
253278
254- print("Copying results out of staging bucket...")
279+ print("Copying relatedness results out of staging bucket...")
255280 staging_cmd = f'gsutil cp -r gs://{cluster_staging_bucket}/{cluster_name}/~{task_identifier }_relatedness.tsv ./~{task_identifier }_relatedness.tsv'
256281 print(staging_cmd)
257- os.popen(staging_cmd).read()
258-
259- staging_cmd = f'gsutil cp -r gs://{cluster_staging_bucket}/{cluster_name}/~{task_identifier }_relatedness_flagged_samples.tsv ./~{task_identifier }_relatedness_flagged_samples.tsv'
260- print(staging_cmd)
261- os.popen(staging_cmd).read()
262-
282+ result = popen_read_checked(staging_cmd)
283+ print(result)
263284
264285 break
265286
266287 except Exception as e:
288+ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
289+ print(timestamp + " Exception raised!")
267290 print(e)
268291 raise
269292 finally:
@@ -276,10 +299,8 @@ task run_relatedness_task {
276299 echo "Complete"
277300 >>>
278301
279-
280302 output {
281303 File relatedness = "~{task_identifier }_relatedness.tsv"
282- File relatedness_flagged_samples = "~{task_identifier }_relatedness_flagged_samples.tsv"
283304 }
284305
285306 runtime {
@@ -293,3 +314,73 @@ task run_relatedness_task {
293314 bootDiskSizeGb : select_first ([runtime_override .boot_disk_gb , runtime_default .boot_disk_gb ])
294315 }
295316}
317+ task run_maximal_independent_set {
318+ input {
319+ File relatedness_tsv
320+ String task_identifier
321+ String hail_docker_maximal_independent_set
322+ RuntimeAttr ? runtime_attr_override
323+ }
324+
325+ RuntimeAttr runtime_default = object {
326+ # Default runtime attributes
327+ mem_gb : 127 , disk_gb : 100 , cpu_cores : 8 , preemptible_tries : 0 , max_retries : 0 , boot_disk_gb : 10
328+ }
329+ RuntimeAttr runtime_override = select_first ([runtime_attr_override , runtime_default ])
330+
331+ command <<<
332+ set -euxo pipefail
333+
334+ python3 << EOF
335+
336+ # Start Hail
337+ import pandas as pd
338+ import numpy as np
339+ import hail as hl
340+ from pathlib import Path
341+
342+ # Read the table as a checkpoint
343+ rel = hl.import_table("~{relatedness_tsv }")
344+ rel = rel.key_by("i.s", "j.s")
345+
346+ # Run maximal independent set and tag the samples we would drop
347+ # https://hail.is/docs/0.2/methods/misc.html?highlight=maximal_independent_set#hail.methods.maximal_independent_set
348+ related_samples_to_remove = hl.maximal_independent_set(rel["i.s"], rel["j.s"], False)
349+ related_samples_to_remove = related_samples_to_remove.rename({"node":"sample_id"})
350+
351+ # Output a list of samples that could be removed
352+ related_samples_to_remove = related_samples_to_remove.flatten()
353+ related_samples_to_remove.export("~{task_identifier }.relatedness_flagged_samples.tsv")
354+
355+ # Output some information about where the output file is, to help debugging with the execution manager.
356+ #
357+ cwd = Path.cwd()
358+ print("Current working directory:", cwd)
359+
360+ # List files and directories in cwd
361+ print("Directory contents:")
362+ for entry in cwd.iterdir():
363+ print(" -", entry.name)
364+
365+ print("Done")
366+
367+ EOF
368+ echo "bash PWD:"
369+ echo $PWD
370+ >>>
371+
372+ output {
373+ File relatedness_flagged_samples = "~{task_identifier }.relatedness_flagged_samples.tsv"
374+ }
375+
376+ runtime {
377+ # Runtime settings for the task
378+ memory : select_first ([runtime_override .mem_gb , runtime_default .mem_gb ]) + " GB"
379+ disks : "local-disk " + select_first ([runtime_override .disk_gb , runtime_default .disk_gb ]) + " SSD"
380+ cpu : select_first ([runtime_override .cpu_cores , runtime_default .cpu_cores ])
381+ preemptible : select_first ([runtime_override .preemptible_tries , runtime_default .preemptible_tries ])
382+ maxRetries : select_first ([runtime_override .max_retries , runtime_default .max_retries ])
383+ docker : hail_docker_maximal_independent_set
384+ bootDiskSizeGb : select_first ([runtime_override .boot_disk_gb , runtime_default .boot_disk_gb ])
385+ }
386+ }
0 commit comments