From b60c5638cc4844d9c34dd8ebd8fea43abde5b793 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Tue, 19 Dec 2023 17:19:21 +0100 Subject: [PATCH 1/7] now msa runners are running in an asynchronised way --- alphafold/data/pipeline.py | 108 ++++++++++++++++++++++++------------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/alphafold/data/pipeline.py b/alphafold/data/pipeline.py index a90eb5776..6f80a1643 100644 --- a/alphafold/data/pipeline.py +++ b/alphafold/data/pipeline.py @@ -26,7 +26,7 @@ from alphafold.data.tools import hmmsearch from alphafold.data.tools import jackhmmer import numpy as np - +import asyncio # Internal import (7716). FeatureDict = MutableMapping[str, np.ndarray] @@ -147,19 +147,8 @@ def __init__(self, self.uniref_max_hits = uniref_max_hits self.use_precomputed_msas = use_precomputed_msas - def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: - """Runs alignment tools on the input sequence and creates features.""" - with open(input_fasta_path) as f: - input_fasta_str = f.read() - input_seqs, input_descs = parsers.parse_fasta(input_fasta_str) - if len(input_seqs) != 1: - raise ValueError( - f'More than one input sequence found in {input_fasta_path}.') - input_sequence = input_seqs[0] - input_description = input_descs[0] - num_res = len(input_sequence) - - uniref90_out_path = os.path.join(msa_output_dir, 'uniref90_hits.sto') + async def run_jackhmmer_uniref90(self, input_fasta_path, uniref90_out_path): + """An async function that runs alignment against uniref90""" jackhmmer_uniref90_result = run_msa_tool( msa_runner=self.jackhmmer_uniref90_runner, input_fasta_path=input_fasta_path, @@ -167,7 +156,14 @@ def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: msa_format='sto', use_precomputed_msas=self.use_precomputed_msas, max_sto_sequences=self.uniref_max_hits) - mgnify_out_path = os.path.join(msa_output_dir, 'mgnify_hits.sto') + msa_for_templates = jackhmmer_uniref90_result['sto'] + msa_for_templates = parsers.deduplicate_stockholm_msa(msa_for_templates) + msa_for_templates = parsers.remove_empty_columns_from_stockholm_msa( + msa_for_templates) + return jackhmmer_uniref90_result, msa_for_templates + + async def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): + """An async function that runs msa alignment against mgnify database""" jackhmmer_mgnify_result = run_msa_tool( msa_runner=self.jackhmmer_mgnify_runner, input_fasta_path=input_fasta_path, @@ -175,12 +171,67 @@ def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: msa_format='sto', use_precomputed_msas=self.use_precomputed_msas, max_sto_sequences=self.mgnify_max_hits) + return jackhmmer_mgnify_result - msa_for_templates = jackhmmer_uniref90_result['sto'] - msa_for_templates = parsers.deduplicate_stockholm_msa(msa_for_templates) - msa_for_templates = parsers.remove_empty_columns_from_stockholm_msa( - msa_for_templates) + async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): + """An async function that runs msa alignment against bfd database""" + if self._use_small_bfd: + bfd_out_path = os.path.join(msa_output_dir, 'small_bfd_hits.sto') + jackhmmer_small_bfd_result = run_msa_tool( + msa_runner=self.jackhmmer_small_bfd_runner, + input_fasta_path=input_fasta_path, + msa_out_path=bfd_out_path, + msa_format='sto', + use_precomputed_msas=self.use_precomputed_msas) + bfd_msa = parsers.parse_stockholm(jackhmmer_small_bfd_result['sto']) + else: + bfd_out_path = os.path.join(msa_output_dir, 'bfd_uniref_hits.a3m') + hhblits_bfd_uniref_result = run_msa_tool( + msa_runner=self.hhblits_bfd_uniref_runner, + input_fasta_path=input_fasta_path, + msa_out_path=bfd_out_path, + msa_format='a3m', + use_precomputed_msas=self.use_precomputed_msas) + bfd_msa = parsers.parse_a3m(hhblits_bfd_uniref_result['a3m']) + + return bfd_msa + + async def run_all_msa_runners(self, input_fasta_path:str, + uniref90_out_path:str, mgnify_out_path:str, + msa_output_dir:str): + """An async function that creates all async tasks and run them in parallel""" + task1 = asyncio.create_task(self.run_jackhmmer_uniref90(input_fasta_path, uniref90_out_path)) + task2 = asyncio.create_task(self.run_jackhmmer_mgnify(input_fasta_path, mgnify_out_path)) + task3 = asyncio.create_task(self.run_bfd_alignments(msa_output_dir, input_fasta_path)) + jackhmmer_uniref90_result, msa_for_templates = await task1 + jackhmmer_mgnify_result = await task2 + bfd_msa = await task3 + + return {"uniref90_results": (jackhmmer_uniref90_result, msa_for_templates), + "mgnify_results": jackhmmer_mgnify_result, "bfd_results": bfd_msa} + + def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: + """Runs alignment tools on the input sequence and creates features.""" + with open(input_fasta_path) as f: + input_fasta_str = f.read() + input_seqs, input_descs = parsers.parse_fasta(input_fasta_str) + if len(input_seqs) != 1: + raise ValueError( + f'More than one input sequence found in {input_fasta_path}.') + input_sequence = input_seqs[0] + input_description = input_descs[0] + num_res = len(input_sequence) + + uniref90_out_path = os.path.join(msa_output_dir, 'uniref90_hits.sto') + + mgnify_out_path = os.path.join(msa_output_dir, 'mgnify_hits.sto') + msa_results = asyncio.run(self.run_all_msa_runners(input_fasta_path,uniref90_out_path, + mgnify_out_path,msa_output_dir)) + jackhmmer_uniref90_result, msa_for_templates = msa_results['uniref90_results'] + jackhmmer_mgnify_result = msa_results['mgnify_results'] + bfd_msa = msa_results['bfd_results'] + if self.template_searcher.input_format == 'sto': pdb_templates_result = self.template_searcher.query(msa_for_templates) elif self.template_searcher.input_format == 'a3m': @@ -201,25 +252,6 @@ def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: pdb_template_hits = self.template_searcher.get_template_hits( output_string=pdb_templates_result, input_sequence=input_sequence) - if self._use_small_bfd: - bfd_out_path = os.path.join(msa_output_dir, 'small_bfd_hits.sto') - jackhmmer_small_bfd_result = run_msa_tool( - msa_runner=self.jackhmmer_small_bfd_runner, - input_fasta_path=input_fasta_path, - msa_out_path=bfd_out_path, - msa_format='sto', - use_precomputed_msas=self.use_precomputed_msas) - bfd_msa = parsers.parse_stockholm(jackhmmer_small_bfd_result['sto']) - else: - bfd_out_path = os.path.join(msa_output_dir, 'bfd_uniref_hits.a3m') - hhblits_bfd_uniref_result = run_msa_tool( - msa_runner=self.hhblits_bfd_uniref_runner, - input_fasta_path=input_fasta_path, - msa_out_path=bfd_out_path, - msa_format='a3m', - use_precomputed_msas=self.use_precomputed_msas) - bfd_msa = parsers.parse_a3m(hhblits_bfd_uniref_result['a3m']) - templates_result = self.template_featurizer.get_templates( query_sequence=input_sequence, hits=pdb_template_hits) From e23e25607bab24c6ad0b6344a19632484098961e Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Thu, 21 Dec 2023 11:34:41 +0100 Subject: [PATCH 2/7] updated asyncio part in pipeline.py --- alphafold/data/pipeline.py | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/alphafold/data/pipeline.py b/alphafold/data/pipeline.py index 6f80a1643..684ecc73d 100644 --- a/alphafold/data/pipeline.py +++ b/alphafold/data/pipeline.py @@ -27,6 +27,7 @@ from alphafold.data.tools import jackhmmer import numpy as np import asyncio +from functools import partial # Internal import (7716). FeatureDict = MutableMapping[str, np.ndarray] @@ -84,16 +85,16 @@ def make_msa_features(msas: Sequence[parsers.Msa]) -> FeatureDict: return features -def run_msa_tool(msa_runner, input_fasta_path: str, msa_out_path: str, +async def run_msa_tool(msa_runner, input_fasta_path: str, msa_out_path: str, msa_format: str, use_precomputed_msas: bool, max_sto_sequences: Optional[int] = None ) -> Mapping[str, Any]: """Runs an MSA tool, checking if output already exists first.""" if not use_precomputed_msas or not os.path.exists(msa_out_path): if msa_format == 'sto' and max_sto_sequences is not None: - result = msa_runner.query(input_fasta_path, max_sto_sequences)[0] # pytype: disable=wrong-arg-count + result = await msa_runner.query(input_fasta_path, max_sto_sequences)[0] # pytype: disable=wrong-arg-count else: - result = msa_runner.query(input_fasta_path)[0] + result = await msa_runner.query(input_fasta_path)[0] with open(msa_out_path, 'w') as f: f.write(result[msa_format]) else: @@ -149,7 +150,9 @@ def __init__(self, async def run_jackhmmer_uniref90(self, input_fasta_path, uniref90_out_path): """An async function that runs alignment against uniref90""" - jackhmmer_uniref90_result = run_msa_tool( + print(f"############# line 152 now running in async way") + loop = asyncio.get_event_loop() + jackhmmer_uniref90_result = await run_msa_tool( msa_runner=self.jackhmmer_uniref90_runner, input_fasta_path=input_fasta_path, msa_out_path=uniref90_out_path, @@ -164,7 +167,9 @@ async def run_jackhmmer_uniref90(self, input_fasta_path, uniref90_out_path): async def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): """An async function that runs msa alignment against mgnify database""" - jackhmmer_mgnify_result = run_msa_tool( + print(f"############# line 168 now running in async way") + loop = asyncio.get_event_loop() + jackhmmer_mgnify_result = await run_msa_tool( msa_runner=self.jackhmmer_mgnify_runner, input_fasta_path=input_fasta_path, msa_out_path=mgnify_out_path, @@ -175,9 +180,11 @@ async def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): """An async function that runs msa alignment against bfd database""" + print(f"############# line 180 now running in async way") + loop = asyncio.get_event_loop() if self._use_small_bfd: bfd_out_path = os.path.join(msa_output_dir, 'small_bfd_hits.sto') - jackhmmer_small_bfd_result = run_msa_tool( + jackhmmer_small_bfd_result = await run_msa_tool( msa_runner=self.jackhmmer_small_bfd_runner, input_fasta_path=input_fasta_path, msa_out_path=bfd_out_path, @@ -186,7 +193,7 @@ async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): bfd_msa = parsers.parse_stockholm(jackhmmer_small_bfd_result['sto']) else: bfd_out_path = os.path.join(msa_output_dir, 'bfd_uniref_hits.a3m') - hhblits_bfd_uniref_result = run_msa_tool( + hhblits_bfd_uniref_result = await run_msa_tool( msa_runner=self.hhblits_bfd_uniref_runner, input_fasta_path=input_fasta_path, msa_out_path=bfd_out_path, @@ -200,12 +207,13 @@ async def run_all_msa_runners(self, input_fasta_path:str, uniref90_out_path:str, mgnify_out_path:str, msa_output_dir:str): """An async function that creates all async tasks and run them in parallel""" - task1 = asyncio.create_task(self.run_jackhmmer_uniref90(input_fasta_path, uniref90_out_path)) - task2 = asyncio.create_task(self.run_jackhmmer_mgnify(input_fasta_path, mgnify_out_path)) - task3 = asyncio.create_task(self.run_bfd_alignments(msa_output_dir, input_fasta_path)) - jackhmmer_uniref90_result, msa_for_templates = await task1 - jackhmmer_mgnify_result = await task2 - bfd_msa = await task3 + task1 = self.run_jackhmmer_uniref90(input_fasta_path, uniref90_out_path) + task2 = self.run_jackhmmer_mgnify(input_fasta_path, mgnify_out_path) + task3 = self.run_bfd_alignments(msa_output_dir, input_fasta_path) + results = await asyncio.gather(task1, task2, task3) + jackhmmer_uniref90_result, msa_for_templates = results[0] + jackhmmer_mgnify_result = results[1] + bfd_msa = results[2] return {"uniref90_results": (jackhmmer_uniref90_result, msa_for_templates), "mgnify_results": jackhmmer_mgnify_result, "bfd_results": bfd_msa} @@ -231,7 +239,7 @@ def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: jackhmmer_uniref90_result, msa_for_templates = msa_results['uniref90_results'] jackhmmer_mgnify_result = msa_results['mgnify_results'] bfd_msa = msa_results['bfd_results'] - + if self.template_searcher.input_format == 'sto': pdb_templates_result = self.template_searcher.query(msa_for_templates) elif self.template_searcher.input_format == 'a3m': From 48bb45301bc4b929071e511e5aacb7458d8ccd8f Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Fri, 22 Dec 2023 12:35:45 +0100 Subject: [PATCH 3/7] update asyncio part in pipeline.py --- alphafold/data/pipeline.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/alphafold/data/pipeline.py b/alphafold/data/pipeline.py index 684ecc73d..7e24721f3 100644 --- a/alphafold/data/pipeline.py +++ b/alphafold/data/pipeline.py @@ -85,16 +85,16 @@ def make_msa_features(msas: Sequence[parsers.Msa]) -> FeatureDict: return features -async def run_msa_tool(msa_runner, input_fasta_path: str, msa_out_path: str, +def run_msa_tool(msa_runner, input_fasta_path: str, msa_out_path: str, msa_format: str, use_precomputed_msas: bool, max_sto_sequences: Optional[int] = None ) -> Mapping[str, Any]: """Runs an MSA tool, checking if output already exists first.""" if not use_precomputed_msas or not os.path.exists(msa_out_path): if msa_format == 'sto' and max_sto_sequences is not None: - result = await msa_runner.query(input_fasta_path, max_sto_sequences)[0] # pytype: disable=wrong-arg-count + result = msa_runner.query(input_fasta_path, max_sto_sequences)[0] # pytype: disable=wrong-arg-count else: - result = await msa_runner.query(input_fasta_path)[0] + result = msa_runner.query(input_fasta_path)[0] with open(msa_out_path, 'w') as f: f.write(result[msa_format]) else: @@ -169,7 +169,7 @@ async def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): """An async function that runs msa alignment against mgnify database""" print(f"############# line 168 now running in async way") loop = asyncio.get_event_loop() - jackhmmer_mgnify_result = await run_msa_tool( + jackhmmer_mgnify_result = await self.run_msa_tool_async( msa_runner=self.jackhmmer_mgnify_runner, input_fasta_path=input_fasta_path, msa_out_path=mgnify_out_path, @@ -184,7 +184,7 @@ async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): loop = asyncio.get_event_loop() if self._use_small_bfd: bfd_out_path = os.path.join(msa_output_dir, 'small_bfd_hits.sto') - jackhmmer_small_bfd_result = await run_msa_tool( + jackhmmer_small_bfd_result = await self.run_msa_tool_async( msa_runner=self.jackhmmer_small_bfd_runner, input_fasta_path=input_fasta_path, msa_out_path=bfd_out_path, @@ -193,7 +193,7 @@ async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): bfd_msa = parsers.parse_stockholm(jackhmmer_small_bfd_result['sto']) else: bfd_out_path = os.path.join(msa_output_dir, 'bfd_uniref_hits.a3m') - hhblits_bfd_uniref_result = await run_msa_tool( + hhblits_bfd_uniref_result = await self.run_msa_tool_async( msa_runner=self.hhblits_bfd_uniref_runner, input_fasta_path=input_fasta_path, msa_out_path=bfd_out_path, @@ -203,6 +203,10 @@ async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): return bfd_msa + async def run_msa_tool_async(self,**kwargs): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, run_msa_tool, **kwargs) + async def run_all_msa_runners(self, input_fasta_path:str, uniref90_out_path:str, mgnify_out_path:str, msa_output_dir:str): From e0286b6fa4fefda34c7f4db2384376652b35a085 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Fri, 22 Dec 2023 13:32:33 +0100 Subject: [PATCH 4/7] update pipeline.py with concurrent.futures --- alphafold/data/pipeline.py | 53 +++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 30 deletions(-) diff --git a/alphafold/data/pipeline.py b/alphafold/data/pipeline.py index 7e24721f3..c9d122787 100644 --- a/alphafold/data/pipeline.py +++ b/alphafold/data/pipeline.py @@ -26,8 +26,8 @@ from alphafold.data.tools import hmmsearch from alphafold.data.tools import jackhmmer import numpy as np -import asyncio -from functools import partial +from concurrent.futures import ProcessPoolExecutor, as_completed +import multiprocessing # Internal import (7716). FeatureDict = MutableMapping[str, np.ndarray] @@ -148,11 +148,10 @@ def __init__(self, self.uniref_max_hits = uniref_max_hits self.use_precomputed_msas = use_precomputed_msas - async def run_jackhmmer_uniref90(self, input_fasta_path, uniref90_out_path): + def run_jackhmmer_uniref90(self, input_fasta_path, uniref90_out_path): """An async function that runs alignment against uniref90""" - print(f"############# line 152 now running in async way") - loop = asyncio.get_event_loop() - jackhmmer_uniref90_result = await run_msa_tool( + logging.info(f"Now running uniref90 alignment concurrently") + jackhmmer_uniref90_result = run_msa_tool( msa_runner=self.jackhmmer_uniref90_runner, input_fasta_path=input_fasta_path, msa_out_path=uniref90_out_path, @@ -165,11 +164,10 @@ async def run_jackhmmer_uniref90(self, input_fasta_path, uniref90_out_path): msa_for_templates) return jackhmmer_uniref90_result, msa_for_templates - async def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): + def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): """An async function that runs msa alignment against mgnify database""" - print(f"############# line 168 now running in async way") - loop = asyncio.get_event_loop() - jackhmmer_mgnify_result = await self.run_msa_tool_async( + logging.info(f"Now running mgnify alignment concurrently") + jackhmmer_mgnify_result = run_msa_tool( msa_runner=self.jackhmmer_mgnify_runner, input_fasta_path=input_fasta_path, msa_out_path=mgnify_out_path, @@ -178,13 +176,12 @@ async def run_jackhmmer_mgnify(self, input_fasta_path, mgnify_out_path): max_sto_sequences=self.mgnify_max_hits) return jackhmmer_mgnify_result - async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): + def run_bfd_alignments(self, msa_output_dir, input_fasta_path): """An async function that runs msa alignment against bfd database""" - print(f"############# line 180 now running in async way") - loop = asyncio.get_event_loop() + logging.info(f"Now running bfd alignment concurrently") if self._use_small_bfd: bfd_out_path = os.path.join(msa_output_dir, 'small_bfd_hits.sto') - jackhmmer_small_bfd_result = await self.run_msa_tool_async( + jackhmmer_small_bfd_result = run_msa_tool( msa_runner=self.jackhmmer_small_bfd_runner, input_fasta_path=input_fasta_path, msa_out_path=bfd_out_path, @@ -193,7 +190,7 @@ async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): bfd_msa = parsers.parse_stockholm(jackhmmer_small_bfd_result['sto']) else: bfd_out_path = os.path.join(msa_output_dir, 'bfd_uniref_hits.a3m') - hhblits_bfd_uniref_result = await self.run_msa_tool_async( + hhblits_bfd_uniref_result = run_msa_tool( msa_runner=self.hhblits_bfd_uniref_runner, input_fasta_path=input_fasta_path, msa_out_path=bfd_out_path, @@ -202,23 +199,19 @@ async def run_bfd_alignments(self, msa_output_dir, input_fasta_path): bfd_msa = parsers.parse_a3m(hhblits_bfd_uniref_result['a3m']) return bfd_msa - - async def run_msa_tool_async(self,**kwargs): - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, run_msa_tool, **kwargs) - async def run_all_msa_runners(self, input_fasta_path:str, + def run_all_msa_runners(self, input_fasta_path:str, uniref90_out_path:str, mgnify_out_path:str, msa_output_dir:str): """An async function that creates all async tasks and run them in parallel""" - task1 = self.run_jackhmmer_uniref90(input_fasta_path, uniref90_out_path) - task2 = self.run_jackhmmer_mgnify(input_fasta_path, mgnify_out_path) - task3 = self.run_bfd_alignments(msa_output_dir, input_fasta_path) - results = await asyncio.gather(task1, task2, task3) - jackhmmer_uniref90_result, msa_for_templates = results[0] - jackhmmer_mgnify_result = results[1] - bfd_msa = results[2] - + with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: + uniref_msa_process = [executor.submit(self.run_jackhmmer_uniref90,*(input_fasta_path, uniref90_out_path))] + mgnify_msa_process = [executor.submit(self.run_jackhmmer_mgnify,*(input_fasta_path, mgnify_out_path))] + bfd_msa_process = [executor.submit(self.run_bfd_alignments, *(msa_output_dir, input_fasta_path))] + jackhmmer_uniref90_result, msa_for_templates = [process.result() for process in as_completed(uniref_msa_process)][0] + jackhmmer_mgnify_result = [process.result() for process in as_completed(mgnify_msa_process)][0] + bfd_msa = [process.result() for process in as_completed(bfd_msa_process)][0] + return {"uniref90_results": (jackhmmer_uniref90_result, msa_for_templates), "mgnify_results": jackhmmer_mgnify_result, "bfd_results": bfd_msa} @@ -238,8 +231,8 @@ def process(self, input_fasta_path: str, msa_output_dir: str) -> FeatureDict: mgnify_out_path = os.path.join(msa_output_dir, 'mgnify_hits.sto') - msa_results = asyncio.run(self.run_all_msa_runners(input_fasta_path,uniref90_out_path, - mgnify_out_path,msa_output_dir)) + msa_results = self.run_all_msa_runners(input_fasta_path,uniref90_out_path, + mgnify_out_path,msa_output_dir) jackhmmer_uniref90_result, msa_for_templates = msa_results['uniref90_results'] jackhmmer_mgnify_result = msa_results['mgnify_results'] bfd_msa = msa_results['bfd_results'] From 535f74651124a918266fcc083c2fcc8798ff815c Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Wed, 17 Jan 2024 11:32:17 +0100 Subject: [PATCH 5/7] updated the maximum number of workers to be 5 instead of total number of cpus --- alphafold/data/pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/alphafold/data/pipeline.py b/alphafold/data/pipeline.py index c9d122787..e393ff08d 100644 --- a/alphafold/data/pipeline.py +++ b/alphafold/data/pipeline.py @@ -27,7 +27,6 @@ from alphafold.data.tools import jackhmmer import numpy as np from concurrent.futures import ProcessPoolExecutor, as_completed -import multiprocessing # Internal import (7716). FeatureDict = MutableMapping[str, np.ndarray] @@ -204,7 +203,7 @@ def run_all_msa_runners(self, input_fasta_path:str, uniref90_out_path:str, mgnify_out_path:str, msa_output_dir:str): """An async function that creates all async tasks and run them in parallel""" - with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: + with ProcessPoolExecutor(max_workers=5) as executor: uniref_msa_process = [executor.submit(self.run_jackhmmer_uniref90,*(input_fasta_path, uniref90_out_path))] mgnify_msa_process = [executor.submit(self.run_jackhmmer_mgnify,*(input_fasta_path, mgnify_out_path))] bfd_msa_process = [executor.submit(self.run_bfd_alignments, *(msa_output_dir, input_fasta_path))] From ec4406c91fab99b39b56fda242405071a9ee8ce8 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Tue, 6 Feb 2024 17:10:15 +0100 Subject: [PATCH 6/7] update the parser so that it can also parse .pdb files --- alphafold/data/mmcif_parsing.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/alphafold/data/mmcif_parsing.py b/alphafold/data/mmcif_parsing.py index 61cf149c0..71ac402e5 100644 --- a/alphafold/data/mmcif_parsing.py +++ b/alphafold/data/mmcif_parsing.py @@ -165,6 +165,7 @@ def mmcif_loop_to_dict(prefix: str, def parse(*, file_id: str, mmcif_string: str, + is_pdb_file:bool=False, catch_all_errors: bool = True) -> ParsingResult: """Entry point, parses an mmcif_string. @@ -181,7 +182,10 @@ def parse(*, """ errors = {} try: - parser = PDB.MMCIFParser(QUIET=True) + if not is_pdb_file: + parser = PDB.MMCIFParser(QUIET=True) + else: + parser = PDB.PDBParser(QUIET=True) handle = io.StringIO(mmcif_string) full_structure = parser.get_structure('', handle) first_model_structure = _get_first_model(full_structure) From cc19876eac9aa028f167cbb3c05f231cd4c3eb0d Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Wed, 28 Feb 2024 16:41:47 +0100 Subject: [PATCH 7/7] added new processing features --- alphafold/data/mmcif_parsing.py | 2 ++ alphafold/data/pipeline.py | 1 + alphafold/data/templates.py | 1 + alphafold/data/tools/hhblits.py | 2 +- alphafold/data/tools/jackhmmer.py | 2 +- 5 files changed, 6 insertions(+), 2 deletions(-) diff --git a/alphafold/data/mmcif_parsing.py b/alphafold/data/mmcif_parsing.py index 71ac402e5..48d4ce07f 100644 --- a/alphafold/data/mmcif_parsing.py +++ b/alphafold/data/mmcif_parsing.py @@ -189,6 +189,7 @@ def parse(*, handle = io.StringIO(mmcif_string) full_structure = parser.get_structure('', handle) first_model_structure = _get_first_model(full_structure) + # Extract the _mmcif_dict from the parser, which contains useful fields not # reflected in the Biopython structure. parsed_info = parser._mmcif_dict # pylint:disable=protected-access @@ -277,6 +278,7 @@ def parse(*, return ParsingResult(mmcif_object=mmcif_object, errors=errors) except Exception as e: # pylint:disable=broad-except + print(f"##### line 281 mmcif_parsing failed to parse mmcif file") errors[(file_id, '')] = e if not catch_all_errors: raise diff --git a/alphafold/data/pipeline.py b/alphafold/data/pipeline.py index e393ff08d..4ba269b18 100644 --- a/alphafold/data/pipeline.py +++ b/alphafold/data/pipeline.py @@ -203,6 +203,7 @@ def run_all_msa_runners(self, input_fasta_path:str, uniref90_out_path:str, mgnify_out_path:str, msa_output_dir:str): """An async function that creates all async tasks and run them in parallel""" + logging.info(f"Now running MSA runners in parallel.") with ProcessPoolExecutor(max_workers=5) as executor: uniref_msa_process = [executor.submit(self.run_jackhmmer_uniref90,*(input_fasta_path, uniref90_out_path))] mgnify_msa_process = [executor.submit(self.run_jackhmmer_mgnify,*(input_fasta_path, mgnify_out_path))] diff --git a/alphafold/data/templates.py b/alphafold/data/templates.py index f2de65c6e..1a828c60c 100644 --- a/alphafold/data/templates.py +++ b/alphafold/data/templates.py @@ -601,6 +601,7 @@ def _extract_template_features( templates_aatype = residue_constants.sequence_to_onehot( output_templates_sequence, residue_constants.HHBLITS_AA_TO_ID) + return ( { 'template_all_atom_positions': np.array(templates_all_atom_positions), diff --git a/alphafold/data/tools/hhblits.py b/alphafold/data/tools/hhblits.py index 1d8c180d8..027834c16 100644 --- a/alphafold/data/tools/hhblits.py +++ b/alphafold/data/tools/hhblits.py @@ -35,7 +35,7 @@ def __init__(self, *, binary_path: str, databases: Sequence[str], - n_cpu: int = 4, + n_cpu: int = 2, n_iter: int = 3, e_value: float = 0.001, maxseq: int = 1_000_000, diff --git a/alphafold/data/tools/jackhmmer.py b/alphafold/data/tools/jackhmmer.py index 68997f857..16739cc51 100644 --- a/alphafold/data/tools/jackhmmer.py +++ b/alphafold/data/tools/jackhmmer.py @@ -35,7 +35,7 @@ def __init__(self, *, binary_path: str, database_path: str, - n_cpu: int = 8, + n_cpu: int = 2, n_iter: int = 1, e_value: float = 0.0001, z_value: Optional[int] = None,