Skip to content

Commit c65aadf

Browse files
Merge pull request #3 from PHILIPP111007/dev
Dev
2 parents 8f9923c + 3382758 commit c65aadf

File tree

9 files changed

+235
-190
lines changed

9 files changed

+235
-190
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# MatrixTableConsumer v1.2.3
1+
# MatrixTableConsumer v1.2.5
22

33
To install this package run (you need to have Go):
44

@@ -12,7 +12,7 @@ To compile Go modules with C types to work with Python run:
1212
```bash
1313
export CGO_ENABLED=1
1414

15-
go build -o main.so -buildmode=c-shared functions_go/main.go
15+
go build -o main.so -buildmode=c-shared main.go
1616
```
1717

1818
We have a class `MatrixTableConsumer`, which performs operations on Hail matrix table:
@@ -39,13 +39,13 @@ You can look at the `main.ipynb` file, which contains examples of using `MatrixT
3939

4040
You can look at the `benchmarks.md` file, which contains benchmark of my program and bcftools
4141

42-
You can filter `.vcf` files (currently only the `&&` operator is available):
42+
You can filter `.vcf` and `.vcf.gz` files (`&&` and `||` operators is available):
4343

4444
```bash
4545
python matrix_table_consumer/vcf_tools.py -filter \
4646
-o ./data/test_1.vcf \
4747
-vcf ./data/ALL.chr1.phase3_shapeit2_mvncall_integrated_v5b.20130502.genotypes.vcf.gz \
48-
-i "QUAL>=90 && AF>=0.00001" \
48+
-i "(QUAL>=90 && AF>=0.00001) || AF>=0.001" \
4949
-gzip \
5050
-num_cpu 7
5151
```

matrix_table_consumer/functions_go/collect.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func Collect(num_rows int, start_row int, vcf_path string, is_gzip bool, num_cpu
6161
}
6262

6363
scanner := bufio.NewScanner(reader)
64-
const maxTokenSize = 1 << 20
64+
const maxTokenSize = 1 << 21
6565
buf := make([]byte, maxTokenSize)
6666
scanner.Buffer(buf, maxTokenSize)
6767

@@ -174,7 +174,7 @@ func CollectAll(vcf_path string, is_gzip bool, num_cpu int) string {
174174
}
175175

176176
scanner := bufio.NewScanner(reader)
177-
const maxTokenSize = 1 << 20
177+
const maxTokenSize = 1 << 21
178178
buf := make([]byte, maxTokenSize)
179179
scanner.Buffer(buf, maxTokenSize)
180180

@@ -265,7 +265,7 @@ func Count(vcf_path string, is_gzip bool) int {
265265
rows_count := 0
266266

267267
scanner := bufio.NewScanner(reader)
268-
const maxTokenSize = 1 << 20
268+
const maxTokenSize = 1 << 21
269269
buf := make([]byte, maxTokenSize)
270270
scanner.Buffer(buf, maxTokenSize)
271271

matrix_table_consumer/functions_go/filter.go

Lines changed: 208 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,201 @@ import (
55
"compress/gzip"
66
"fmt"
77
"os"
8-
"regexp"
8+
"strconv"
99
"strings"
1010
"sync"
11+
12+
"github.com/PHILIPP111007/govaluate"
1113
)
1214

13-
func Filter(include string, input_vcf_path string, output_vcf_path string, is_gzip bool, num_cpu int) {
14-
if num_cpu <= 0 {
15-
num_cpu = 1
15+
// ParseVCFRow парсит строку VCF
16+
func ParseVCFRow(line string) *VCFRow {
17+
parts := strings.Split(line, "\t")
18+
if len(parts) < 8 {
19+
return nil
20+
}
21+
22+
row := &VCFRow{
23+
Chrom: parts[0],
24+
Pos: parts[1],
25+
ID: parts[2],
26+
Ref: parts[3],
27+
Alt: parts[4],
28+
Qual: parts[5],
29+
Filter: parts[6],
30+
Info: parts[7],
31+
InfoFields: make(map[string]string),
32+
}
33+
34+
if len(parts) > 8 {
35+
row.Format = parts[8]
36+
}
37+
if len(parts) > 9 {
38+
row.Samples = parts[9:]
39+
}
40+
41+
// Парсим INFO поле
42+
infoParts := strings.Split(row.Info, ";")
43+
for _, part := range infoParts {
44+
if strings.Contains(part, "=") {
45+
kv := strings.SplitN(part, "=", 2)
46+
if len(kv) == 2 {
47+
row.InfoFields[kv[0]] = kv[1]
48+
}
49+
} else {
50+
row.InfoFields[part] = "true"
51+
}
52+
}
53+
54+
return row
55+
}
56+
57+
// GetValue возвращает значение поля по имени
58+
func (r *VCFRow) GetValue(fieldName string) (interface{}, error) {
59+
switch fieldName {
60+
case "QUAL":
61+
if r.Qual == "." {
62+
return nil, fmt.Errorf("QUAL is missing")
63+
}
64+
return strconv.ParseFloat(r.Qual, 64)
65+
66+
case "CHROM", "POS", "ID", "REF", "ALT", "FILTER":
67+
// Эти поля доступны напрямую
68+
fieldValue := ""
69+
switch fieldName {
70+
case "CHROM":
71+
fieldValue = r.Chrom
72+
case "POS":
73+
fieldValue = r.Pos
74+
case "ID":
75+
fieldValue = r.ID
76+
case "REF":
77+
fieldValue = r.Ref
78+
case "ALT":
79+
fieldValue = r.Alt
80+
case "FILTER":
81+
fieldValue = r.Filter
82+
}
83+
return fieldValue, nil
84+
85+
default:
86+
// INFO поля
87+
if value, exists := r.InfoFields[fieldName]; exists {
88+
if value == "true" {
89+
return true, nil
90+
}
91+
// Пробуем парсить как число
92+
93+
valueParts := strings.Split(value, ",")
94+
value = valueParts[0]
95+
96+
if num, err := strconv.ParseFloat(value, 64); err == nil {
97+
return num, nil
98+
}
99+
return value, nil
100+
}
101+
return nil, fmt.Errorf("field %s not found", fieldName)
16102
}
103+
}
104+
105+
// FilterFunctions содержит функции для фильтрации
106+
var FilterFunctions = map[string]govaluate.ExpressionFunction{
107+
"has": func(args ...interface{}) (interface{}, error) {
108+
if len(args) != 2 {
109+
return nil, fmt.Errorf("has() expects 2 arguments")
110+
}
111+
_, ok := args[0].(string)
112+
if !ok {
113+
return nil, fmt.Errorf("first argument must be string")
114+
}
115+
_, ok = args[1].(string)
116+
if !ok {
117+
return nil, fmt.Errorf("second argument must be string")
118+
}
119+
120+
// Для простоты всегда возвращаем true, реальная реализация будет в EvaluateRow
121+
return true, nil
122+
},
123+
}
17124

18-
var expressions []map[string]string
125+
// EvaluateRow оценивает строку VCF по выражению
126+
func EvaluateRow(row *VCFRow, expression *govaluate.EvaluableExpression) (bool, error) {
127+
parameters := make(map[string]interface{})
19128

20-
parts := strings.SplitSeq(include, "&&")
21-
for part := range parts {
22-
matchExpr := regexp.MustCompile(`(\w+)\s*([=<>!]+)\s*(["']?[\d\.]+["']?)`)
23-
matches := matchExpr.FindStringSubmatch(part)
129+
// Добавляем все поля VCF как параметры
130+
if qual, err := row.GetValue("QUAL"); err == nil {
131+
parameters["QUAL"] = qual
132+
}
133+
parameters["CHROM"] = row.Chrom
134+
parameters["POS"] = row.Pos
135+
parameters["ID"] = row.ID
136+
parameters["REF"] = row.Ref
137+
parameters["ALT"] = row.Alt
138+
parameters["FILTER"] = row.Filter
24139

25-
if len(matches) != 4 {
26-
s := fmt.Sprintf("Invalid expression format: %s\n", part)
140+
// Добавляем INFO поля
141+
for key, value := range row.InfoFields {
142+
if num, err := strconv.ParseFloat(value, 64); err == nil {
143+
parameters[key] = num
144+
} else if value == "true" {
145+
parameters[key] = true
146+
} else if value == "false" {
147+
parameters[key] = false
148+
} else {
149+
parameters[key] = value
150+
}
151+
}
152+
153+
result, err := expression.Evaluate(parameters)
154+
if err != nil {
155+
return false, err
156+
}
157+
158+
if boolResult, ok := result.(bool); ok {
159+
return boolResult, nil
160+
}
161+
162+
return false, fmt.Errorf("expression did not return boolean")
163+
}
164+
165+
// ParallelFilterRows параллельно фильтрует строки
166+
func ParallelFilterRows(lines <-chan string, wg *sync.WaitGroup, output chan<- string, expression *govaluate.EvaluableExpression) {
167+
defer wg.Done()
168+
169+
for line := range lines {
170+
if strings.HasPrefix(line, "#") {
171+
continue // Пропускаем заголовки
172+
}
173+
174+
row := ParseVCFRow(line)
175+
if row == nil {
176+
continue
177+
}
178+
179+
matches, err := EvaluateRow(row, expression)
180+
if err != nil {
181+
s := fmt.Sprintf("Error evaluating row: %v\n", err)
27182
LoggerError(s)
183+
continue
184+
}
185+
186+
if matches {
187+
output <- line
28188
}
189+
}
190+
}
29191

30-
key := matches[1]
31-
operator := matches[2]
32-
value := matches[3]
192+
func Filter(include string, input_vcf_path string, output_vcf_path string, is_gzip bool, num_cpu int) {
193+
if num_cpu <= 0 {
194+
num_cpu = 1
195+
}
33196

34-
expressions = append(expressions, map[string]string{"key": key, "operator": operator, "value": value})
197+
// Создаем выражение с помощью govaluate
198+
expression, err := govaluate.NewEvaluableExpressionWithFunctions(include, FilterFunctions)
199+
if err != nil {
200+
s := fmt.Sprintf("Failed to parse expression '%s': %v\n", include, err)
201+
LoggerError(s)
202+
return
35203
}
36204

37205
var reader *bufio.Reader
@@ -40,13 +208,15 @@ func Filter(include string, input_vcf_path string, output_vcf_path string, is_gz
40208
if err != nil {
41209
s := fmt.Sprintf("Failed to open the file: %v\n", err)
42210
LoggerError(s)
211+
return
43212
}
44213
defer f.Close()
45214

46215
gr, err := gzip.NewReader(f)
47216
if err != nil {
48217
s := fmt.Sprintf("Error creating gzip reader: %v\n", err)
49218
LoggerError(s)
219+
return
50220
}
51221
defer gr.Close()
52222

@@ -56,6 +226,7 @@ func Filter(include string, input_vcf_path string, output_vcf_path string, is_gz
56226
if err != nil {
57227
s := fmt.Sprintf("Failed to open the file: %v\n", err)
58228
LoggerError(s)
229+
return
59230
}
60231
defer f.Close()
61232

@@ -66,53 +237,61 @@ func Filter(include string, input_vcf_path string, output_vcf_path string, is_gz
66237
if err != nil {
67238
s := fmt.Sprintf("Error creating file: %v\n", err)
68239
LoggerError(s)
240+
return
69241
}
70242
defer outputFile.Close()
71243

72244
scanner := bufio.NewScanner(reader)
245+
const maxTokenSize = 1 << 21
246+
buf := make([]byte, maxTokenSize)
247+
scanner.Buffer(buf, maxTokenSize)
73248
writer := bufio.NewWriter(outputFile)
74249
defer writer.Flush()
75250

76251
wg := sync.WaitGroup{}
77252
wg.Add(num_cpu)
78-
linesChan := make(chan string, 100_000)
79-
resultsChan := make(chan string, 500_000)
80-
num := 0
253+
linesChan := make(chan string, 100000)
254+
resultsChan := make(chan string, 500000)
81255

82-
for range num_cpu {
83-
go ParallelFilterRows(linesChan, &wg, resultsChan, expressions)
256+
// Запускаем worker'ов
257+
for i := 0; i < num_cpu; i++ {
258+
go ParallelFilterRows(linesChan, &wg, resultsChan, expression)
84259
}
85260

261+
num := 0
86262
for scanner.Scan() {
87263
line := scanner.Text()
264+
265+
// Заголовки пишем сразу
88266
if strings.HasPrefix(line, "#") {
89267
fmt.Fprintln(writer, line)
90268
continue
91269
}
92270

93-
if num == 500_000 {
271+
// Периодически сбрасываем результаты
272+
if num >= 500000 {
94273
num = 0
95-
len_chan := len(resultsChan)
96-
if len_chan != 0 {
97-
for range len_chan {
98-
row := <-resultsChan
99-
fmt.Fprintln(writer, row)
100-
}
274+
for len(resultsChan) > 0 {
275+
row := <-resultsChan
276+
fmt.Fprintln(writer, row)
101277
}
278+
writer.Flush()
102279
}
280+
103281
linesChan <- line
104-
num += 1
282+
num++
105283
}
106284

107285
close(linesChan)
108286
wg.Wait()
109287
close(resultsChan)
110288

111289
if err := scanner.Err(); err != nil {
112-
s := fmt.Sprintf("Reading standard input: %v\n", err)
290+
s := fmt.Sprintf("Reading input: %v\n", err)
113291
LoggerError(s)
114292
}
115293

294+
// Записываем оставшиеся результаты
116295
for row := range resultsChan {
117296
fmt.Fprintln(writer, row)
118297
}

0 commit comments

Comments
 (0)