Skip to content

Commit eb5da34

Browse files
authored
Merge pull request #2283 from broadinstitute/jb-dot-plot-ingest-job
Rails-based integration for DotPlotGene processing (SCP-6029)
2 parents 6a1fe17 + 0b32fe2 commit eb5da34

12 files changed

+378
-94
lines changed

app/lib/differential_expression_service.rb

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def self.run_differential_expression_job(cluster_group, study, user, annotation_
131131
de_type: 'rest', group1: nil, group2: nil, machine_type: nil, dry_run: nil)
132132
validate_study(study)
133133
validate_annotation(cluster_group, study, annotation_name, annotation_scope, group1:, group2:)
134-
cluster_url = cluster_file_url(cluster_group)
134+
cluster_url = RequestUtils.cluster_file_url(cluster_group)
135135
study_file = cluster_group.study_file
136136
metadata_url = study_file.is_viz_anndata? ?
137137
RequestUtils.data_fragment_url(study_file, 'metadata') :
@@ -480,23 +480,6 @@ def self.encode_filename(values)
480480
values.map { |val| val.gsub(/\+/, 'pos').gsub(/\W/, '_') }.join('--')
481481
end
482482

483-
# return a GS URL for a requested ClusterGroup, depending on file type
484-
#
485-
# * *params*
486-
# - +cluster_group+ (ClusterGroup) => Clustering object to source name/file from
487-
#
488-
# * *returns*
489-
# - (String)
490-
def self.cluster_file_url(cluster_group)
491-
study_file = cluster_group.study_file
492-
if study_file.is_viz_anndata?
493-
data_frag = study_file.ann_data_file_info.find_fragment(data_type: :cluster, name: cluster_group.name)
494-
RequestUtils.data_fragment_url(study_file, 'cluster', file_type_detail: data_frag[:obsm_key_name])
495-
else
496-
study_file.gs_url
497-
end
498-
end
499-
500483
# retrieve the weekly user quota value
501484
#
502485
# * *returns*

app/lib/dot_plot_service.rb

Lines changed: 94 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,135 @@
1-
# frozen_string_literal: true
2-
31
# service that handles preprocessing expression/annotation data to speed up dot plot rendering
42
class DotPlotService
5-
# main handler for launching ingest job to process expression data
3+
# main handler for launching ingest job to process expression data into DotPlotGene objects
4+
# since the study can have only one processed matrix/metadata file, this will only run if the study is eligible
65
#
76
# * *params*
87
# - +study+ (Study) => the study that owns the data
9-
# - +cluster_group+ (ClusterGroup) => the cluster to source cell names from
10-
# - +annotation_file+ (StudyFile) => the StudyFile containing annotation data
11-
# - +expression_file+ (StudyFile) => the StudyFile to source data from
8+
# - +cluster_group+ (ClusterGroup) => the cluster to set associations for
9+
# - +user+ (User) => the user that will run the job
1210
#
1311
# * *yields*
1412
# - (IngestJob) => the job that will be run to process the data
15-
def self.run_preprocess_expression_job(study, cluster_group, annotation_file, expression_file)
16-
study_eligible?(study) # method stub, waiting for scp-ingest-pipeline implementation
13+
def self.run_process_dot_plot_genes(study, cluster_group, user)
14+
validate_study(study, cluster_group)
15+
expression_file = study_processed_matrices(study)&.first
16+
metadata_file = study.metadata_file
17+
validate_source_data(expression_file, metadata_file)
18+
params_object = create_params_object(cluster_group, expression_file, metadata_file)
19+
if params_object.valid?
20+
job = IngestJob.new(
21+
study:, study_file: expression_file, user:, action: :ingest_dot_plot_genes, params_object:
22+
)
23+
job.delay.push_remote_and_launch_ingest
24+
true
25+
else
26+
raise ArgumentError, "job parameters failed to validate: #{params_object.errors.full_messages.join(', ')}"
27+
end
28+
end
29+
30+
# create DotPlotGeneIngestParameters object based on the provided files
31+
#
32+
# * *params*
33+
# - +cluster_group+ (ClusterGroup) => the cluster group to associate with
34+
# - +expression_file+ (StudyFile) => the expression matrix file to process
35+
# - +metadata_file+ (StudyFile) => the metadata file to source annotations
36+
#
37+
# * *returns*
38+
# - (DotPlotGeneIngestParameters) => a parameters object with the necessary file paths and metadata
39+
def self.create_params_object(cluster_group, expression_file, metadata_file)
40+
params = {
41+
cluster_group_id: cluster_group.id,
42+
cluster_file: RequestUtils.cluster_file_url(cluster_group)
43+
}
44+
case expression_file.file_type
45+
when 'Expression Matrix'
46+
params[:matrix_file_type] = 'dense'
47+
params[:matrix_file_path] = expression_file.gs_url
48+
params[:cell_metadata_file] = metadata_file.gs_url
49+
when 'MM Coordinate Matrix'
50+
params[:matrix_file_type] = 'mtx'
51+
genes_file = expression_file.bundled_files.detect { |f| f.file_type == '10X Genes File' }
52+
barcodes_file = expression_file.bundled_files.detect { |f| f.file_type == '10X Barcodes File' }
53+
params[:matrix_file_path] = expression_file.gs_url
54+
params[:cell_metadata_file] = metadata_file.gs_url
55+
params[:gene_file] = genes_file.gs_url
56+
params[:barcode_file] = barcodes_file.gs_url
57+
when 'AnnData'
58+
params[:matrix_file_type] = 'mtx' # extracted expression for AnnData is in MTX format
59+
params[:cell_metadata_file] = RequestUtils.data_fragment_url(metadata_file, 'metadata')
60+
params[:matrix_file_path] = RequestUtils.data_fragment_url(
61+
expression_file, 'matrix', file_type_detail: 'processed'
62+
)
63+
params[:gene_file] = RequestUtils.data_fragment_url(
64+
expression_file, 'features', file_type_detail: 'processed'
65+
)
66+
params[:barcode_file] = RequestUtils.data_fragment_url(
67+
expression_file, 'barcodes', file_type_detail: 'processed'
68+
)
69+
end
70+
DotPlotGeneIngestParameters.new(**params)
1771
end
1872

1973
# determine study eligibility - can only have one processed matrix and be able to visualize clusters
2074
#
2175
# * *params*
22-
# - +study+ (Study) the study that owns the data
76+
# - +study+ (Study) => the study that owns the data
2377
# * *returns*
24-
# - (Boolean) true if the study is eligible for dot plot visualization
78+
# - (Boolean) => true if the study is eligible for dot plot visualization
2579
def self.study_eligible?(study)
2680
processed_matrices = study_processed_matrices(study)
2781
study.can_visualize_clusters? && study.has_expression_data? && processed_matrices.size == 1
2882
end
2983

3084
# check if the given study/cluster has already been preprocessed
3185
# * *params*
32-
# - +study+ (Study) the study that owns the data
33-
# - +cluster_group+ (ClusterGroup) the cluster to check for processed data
86+
# - +study+ (Study) => the study that owns the data
87+
# - +cluster_group+ (ClusterGroup) => the cluster to check for processed data
3488
#
3589
# * *returns*
36-
# - (Boolean) true if the study/cluster has already been processed
90+
# - (Boolean) => true if the study/cluster has already been processed
3791
def self.cluster_processed?(study, cluster_group)
3892
DotPlotGene.where(study:, cluster_group:).exists?
3993
end
4094

4195
# get processed expression matrices for a study
4296
#
4397
# * *params*
44-
# - +study+ (Study) the study to get matrices for
98+
# - +study+ (Study) => the study to get matrices for
4599
#
46100
# * *returns*
47-
# - (Array<StudyFile>) an array of processed expression matrices for the study
101+
# - (Array<StudyFile>) => an array of processed expression matrices for the study
48102
def self.study_processed_matrices(study)
49103
study.expression_matrices.select do |matrix|
50104
matrix.is_viz_anndata? || !matrix.is_raw_counts_file?
51105
end
52106
end
53107

54-
# seeding method for testing purposes, will be removed once pipeline is in place
55-
# data is random and not representative of actual expression data
56-
def self.seed_dot_plot_genes(study)
57-
return false unless study_eligible?(study)
58-
59-
DotPlotGene.where(study_id: study.id).delete_all
60-
puts "Seeding dot plot genes for #{study.accession}"
61-
expression_matrix = study.expression_matrices.first
62-
print 'assembling genes and annotations...'
63-
genes = Gene.where(study:, study_file: expression_matrix).pluck(:name)
64-
annotations = AnnotationVizService.available_metadata_annotations(
65-
study, annotation_type: 'group'
66-
).reject { |a| a[:scope] == 'invalid' }
67-
puts " done. Found #{genes.size} genes and #{annotations.size} study-wide annotations."
68-
study.cluster_groups.each do |cluster_group|
69-
next if cluster_processed?(study, cluster_group)
108+
# validate the study for dot plot preprocessing
109+
#
110+
# * *params*
111+
# - +study+ (Study) => the study to validate
112+
#
113+
# * *raises*
114+
# - (ArgumentError) => if the study is invalid or does not qualify for dot plot visualization
115+
def self.validate_study(study, cluster_group)
116+
raise ArgumentError, 'Invalid study' unless study.present? && study.is_a?(Study)
117+
raise ArgumentError, 'Study does not qualify for dot plot visualization' unless study_eligible?(study)
118+
raise ArgumentError, 'Study has already been processed' if cluster_processed?(study, cluster_group)
119+
end
70120

71-
cluster_annotations = ClusterVizService.available_annotations_by_cluster(
72-
cluster_group, 'group'
73-
).reject { |a| a[:scope] == 'invalid' }
74-
all_annotations = annotations + cluster_annotations
75-
puts "Processing #{cluster_group.name} with #{all_annotations.size} annotations."
76-
documents = []
77-
genes.each do |gene|
78-
exp_scores = all_annotations.map do |annotation|
79-
{
80-
"#{annotation[:name]}--#{annotation[:type]}--#{annotation[:scope]}" => annotation[:values].map do |value|
81-
{ value => [rand.round(3), rand.round(3)] }
82-
end.reduce({}, :merge)
83-
}
84-
end.reduce({}, :merge)
85-
documents << DotPlotGene.new(
86-
study:, study_file: expression_matrix, cluster_group:, gene_symbol: gene, searchable_gene: gene.downcase,
87-
exp_scores:
88-
).attributes
89-
if documents.size == 1000
90-
DotPlotGene.collection.insert_many(documents)
91-
count = DotPlotGene.where(study_id: study.id, cluster_group_id: cluster_group.id).size
92-
puts "Inserted #{count}/#{genes.size} DotPlotGenes for #{cluster_group.name}."
93-
documents.clear
94-
end
95-
end
96-
DotPlotGene.collection.insert_many(documents)
97-
count = DotPlotGene.where(study_id: study.id, cluster_group_id: cluster_group.id).size
98-
puts "Inserted #{count}/#{genes.size} DotPlotGenes for #{cluster_group.name}."
99-
puts "Finished processing #{cluster_group.name}"
100-
end
101-
puts "Seeding complete for #{study.accession}, #{DotPlotGene.where(study_id: study.id).size} DotPlotGenes created."
102-
true
103-
rescue StandardError => e
104-
puts "Error seeding DotPlotGenes in #{study.accession}: #{e.message}"
105-
false
121+
# validate required data is present for processing
122+
#
123+
# * *params*
124+
# - +expression_file+ (StudyFile) => the expression matrix file to process
125+
# - +metadata_file+ (StudyFile) => the metadata file to source annotations
126+
#
127+
# * *raises*
128+
# - (ArgumentError) => if the source data is not fully parsed or MTX bundled is not completed
129+
def self.validate_source_data(expression_file, metadata_file)
130+
raise ArgumentError, 'Missing required files' unless expression_file.present? && metadata_file.present?
131+
raise ArgumentError, 'Source data not fully parsed' unless expression_file.parsed? && metadata_file.parsed?
132+
raise ArgumentError, 'MTX bundled not completed' if expression_file.should_bundle? &&
133+
!expression_file.has_completed_bundle?
106134
end
107135
end

app/lib/request_utils.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,24 @@ def self.data_fragment_url(ann_data_file, fragment_type, gs_url: true, file_type
154154
"#{url}.#{ext}.gz"
155155
end
156156

157+
# return a GS URL for a requested ClusterGroup, depending on file type
158+
#
159+
# * *params*
160+
# - +cluster_group+ (ClusterGroup) => Clustering object to source name/file from
161+
#
162+
# * *returns*
163+
# - (String)
164+
def self.cluster_file_url(cluster_group)
165+
study_file = cluster_group.study_file
166+
if study_file.is_viz_anndata?
167+
data_frag = study_file.ann_data_file_info.find_fragment(data_type: :cluster, name: cluster_group.name)
168+
safe_frag = data_frag.with_indifferent_access
169+
data_fragment_url(study_file, 'cluster', file_type_detail: safe_frag[:obsm_key_name])
170+
else
171+
study_file.gs_url
172+
end
173+
end
174+
157175
# extracts an array of genes from a comma-delimited string list of gene names
158176
def self.get_genes_from_param(study, gene_param)
159177
terms = RequestUtils.sanitize_search_terms(gene_param).split(',')

app/models/ann_data_file_info.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,14 @@ def merge_form_fragments(form_data, fragments)
126126
# also supports finding values as both strings and symbols (for data_type values)
127127
def find_fragment(**attrs)
128128
data_fragments.detect do |fragment|
129-
!{ **attrs }.map { |k, v| fragment[k] == v || fragment[k] == v.send(transform_for(v)) }.include?(false)
129+
!{ **attrs }.map do |k, v|
130+
safe_v = v.send(transform_for(v))
131+
safe_k = k.send(transform_for(k))
132+
fragment[k] == v ||
133+
fragment[k] == safe_v ||
134+
fragment[safe_k] == v ||
135+
fragment[safe_k] == safe_v
136+
end.include?(false)
130137
end
131138
end
132139

app/models/batch_api_client.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ class BatchApiClient
2121
ingest_differential_expression: ['Differential Expression'],
2222
render_expression_arrays: %w[Cluster],
2323
image_pipeline: %w[Cluster],
24-
ingest_anndata: %w[AnnData]
24+
ingest_anndata: %w[AnnData],
25+
ingest_dot_plot_genes: ['Expression Matrix', 'MM Coordinate Matrix', 'AnnData']
2526
}.freeze
2627

2728
# default GCE machine_type
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# class to hold parameters specific to ingest job for computing dot plot gene metrics
2+
class DotPlotGeneIngestParameters
3+
include ActiveModel::Model
4+
include Parameterizable
5+
6+
PARAMETER_NAME = '--ingest-dot-plot-genes'
7+
8+
# cell_metadata_file: metadata file to source annotations
9+
# cluster_file: clustering file with cells to use as control list for filtering and optional annotations
10+
# cluster_group_id: BSON ID of ClusterGroup object for associations
11+
# matrix_file_path: expression matrix with source data
12+
# matrix_file_type: type of expression matrix (dense, sparse)
13+
# gene_file (optional): genes/features file for sparse matrix
14+
# barcode_file (optional): barcodes file for sparse matrix
15+
# machine_type (optional): override for default ingest machine type (uses 'n2d-highmem-8')
16+
PARAM_DEFAULTS = {
17+
cell_metadata_file: nil,
18+
cluster_file: nil,
19+
cluster_group_id: nil,
20+
matrix_file_path: nil,
21+
matrix_file_type: nil,
22+
gene_file: nil,
23+
barcode_file: nil,
24+
machine_type: 'n2d-highmem-8'
25+
}.freeze
26+
27+
# values that are available as methods but not as attributes (and not passed to command line)
28+
NON_ATTRIBUTE_PARAMS = %i[machine_type].freeze
29+
30+
attr_accessor(*PARAM_DEFAULTS.keys)
31+
32+
validates :cell_metadata_file, :cluster_file, :cluster_group_id, :matrix_file_path, :matrix_file_type, presence: true
33+
validates :cell_metadata_file, :cluster_file, :matrix_file_path,
34+
format: { with: Parameterizable::GS_URL_REGEXP, message: 'is not a valid GS url' }
35+
validates :matrix_file_type, inclusion: %w[dense mtx]
36+
validates :machine_type, inclusion: Parameterizable::GCE_MACHINE_TYPES
37+
validates :gene_file, :barcode_file,
38+
presence: true,
39+
format: {
40+
with: Parameterizable::GS_URL_REGEXP,
41+
message: 'is not a valid GS url'
42+
},
43+
if: -> { matrix_file_type == 'mtx' }
44+
45+
def initialize(attributes = nil)
46+
super
47+
end
48+
49+
def cluster_group
50+
ClusterGroup.find(cluster_group_id)
51+
end
52+
end

0 commit comments

Comments
 (0)