1
1
import gzip
2
+ import os
2
3
import pathlib
3
4
import subprocess
4
- import sys
5
5
import textwrap
6
-
6
+ import logging
7
+ from types import SimpleNamespace
7
8
8
9
from .common import constants
9
10
11
+ # Set up logging
12
+ logger = logging .getLogger (__name__ )
10
13
11
14
# TODO(SW): create note that number this assumes location of `<root>/<package>/<file>`
12
15
def get_project_root ():
@@ -16,46 +19,58 @@ def get_project_root():
16
19
return project_root
17
20
18
21
19
- def execute_command (command ):
20
- command_prepared = command_prepare ( command )
22
+ def execute_command (command , log_file_path = None ):
23
+ logger . info ( "Executing command: %s" , command . strip () )
21
24
22
- print (command_prepared )
25
+ # Open the log file if provided
26
+ log_file = log_file_path .open ('a' , encoding = 'utf-8' ) if log_file_path else None
23
27
24
- process = subprocess .run (
25
- command_prepared ,
28
+ # Launch process with combined stdout and stderr streams, and line buffering enabled.
29
+ process = subprocess .Popen (
30
+ command ,
26
31
shell = True ,
27
32
executable = '/bin/bash' ,
28
- capture_output = True ,
33
+ stdout = subprocess .PIPE ,
34
+ stderr = subprocess .STDOUT ,
35
+ text = True ,
29
36
encoding = 'utf-8' ,
37
+ bufsize = 1 # line buffered
30
38
)
31
39
32
- if process .returncode != 0 :
33
- print (process )
34
- print (process .stderr )
35
- sys .exit (1 )
36
-
37
- return process
40
+ output_lines = []
41
+ # Iterate over each line as it becomes available
42
+ with process .stdout :
43
+ for line in iter (process .stdout .readline , '' ):
44
+ if line :
45
+ logger .info (line .strip ())
46
+ output_lines .append (line )
47
+ if log_file :
48
+ log_file .write (line )
49
+ log_file .flush () # flush immediately for real-time logging
50
+ process .wait () # wait for the process to complete
51
+
52
+ if log_file :
53
+ log_file .close ()
54
+
55
+ result = SimpleNamespace (
56
+ stdout = '' .join (output_lines ),
57
+ returncode = process .returncode ,
58
+ pid = process .pid ,
59
+ command = command
60
+ )
38
61
62
+ return result
39
63
40
64
def command_prepare (command ):
41
65
return f'set -o pipefail; { textwrap .dedent (command )} '
42
66
43
-
44
- #def count_vcf_records(fp, exclude_args=None):
45
- # args = list()
46
- # if exclude_args:
47
- # args.append(f'-e \'{exclude_args}\'')
48
- #
49
- # args_str = ' '.join(args)
50
- # command = f'bcftools view -H {args_str} {fp} | wc -l'
51
- #
52
- # result = execute_command(command)
53
- # return int(result.stdout)
54
-
55
-
56
67
def count_vcf_records (fp ):
57
- result = execute_command (f'bcftools view -H { fp } | wc -l' )
58
- return int (result .stdout )
68
+ result = subprocess .run (f'bcftools view -H { fp } | wc -l' ,
69
+ shell = True ,
70
+ executable = "/bin/bash" ,
71
+ capture_output = True ,
72
+ text = True )
73
+ return int (result .stdout .strip ())
59
74
60
75
61
76
def add_vcf_header_entry (fh , anno_enum ):
@@ -95,15 +110,63 @@ def get_qualified_vcf_annotation(anno_enum):
95
110
assert anno_enum in constants .VcfInfo or anno_enum in constants .VcfFormat
96
111
return f'{ anno_enum .namespace } /{ anno_enum .value } '
97
112
113
+ def split_vcf (input_vcf , output_dir ):
114
+ """
115
+ Splits a VCF file into multiple chunks, each containing up to max_variants variants.
116
+ Each chunk includes the VCF header.
117
+ Ensures no overlapping positions between chunks.
118
+ """
119
+ output_dir = pathlib .Path (output_dir / "vcf_chunks" )
120
+ output_dir .mkdir (parents = True , exist_ok = True )
121
+
122
+ chunk_files = []
123
+ chunk_number = 1
124
+ variant_count = 0
125
+ base_filename = pathlib .Path (input_vcf ).stem
126
+ chunk_filename = output_dir / f"{ base_filename } _chunk{ chunk_number } .vcf"
127
+ base_filename = input_vcf .stem
128
+ chunk_filename = output_dir / f"{ base_filename } _chunk{ chunk_number } .vcf"
129
+ chunk_files .append (chunk_filename )
130
+
131
+ # Open the input VCF using cyvcf2
132
+ vcf_in = cyvcf2 .VCF (input_vcf )
133
+ # Create a new VCF file for the first chunk
134
+ vcf_out = cyvcf2 .Writer (str (chunk_filename ), vcf_in )
135
+
136
+ last_position = None
137
+
138
+ for record in vcf_in :
139
+ current_position = record .POS
140
+ # Check if we need to start a new chunk
141
+ if variant_count >= constants .MAX_SOMATIC_VARIANTS and (last_position is None or current_position != last_position ):
142
+ # Close the current chunk file and start a new one
143
+ vcf_out .close ()
144
+ chunk_number += 1
145
+ chunk_filename = output_dir / f"{ base_filename } _chunk{ chunk_number } .vcf"
146
+ chunk_files .append (chunk_filename )
147
+ vcf_out = cyvcf2 .Writer (str (chunk_filename ), vcf_in )
148
+ variant_count = 0
149
+
150
+ # Write the record to the current chunk
151
+ vcf_out .write_record (record )
152
+ variant_count += 1
153
+ last_position = current_position
154
+
155
+ # Close the last chunk file
156
+ vcf_out .close ()
157
+ vcf_in .close ()
158
+
159
+ logger .info (f"VCF file split into { len (chunk_files )} chunks." )
160
+
161
+ return chunk_files
98
162
99
163
def merge_tsv_files (tsv_files , merged_tsv_fp ):
100
164
"""
101
165
Merges all TSV files into a single TSV.
102
166
"""
103
- logger .info ("Merging TSV files..." )
104
- with gzip .open (merged_tsv_fp , 'wt' ) as merged_tsv :
167
+ with open (merged_tsv_fp , 'w' ) as merged_tsv :
105
168
for i , tsv_file in enumerate (tsv_files ):
106
- with gzip . open (tsv_file , 'rt ' ) as infile :
169
+ with open (tsv_file , 'r ' ) as infile :
107
170
for line_number , line in enumerate (infile ):
108
171
# Skip header except for the first file
109
172
if i > 0 and line_number == 0 :
@@ -123,7 +186,6 @@ def merge_vcf_files(vcf_files, merged_vcf_fp):
123
186
Returns:
124
187
- Path to the sorted merged VCF file.
125
188
"""
126
- logger .info ("Merging VCF files..." )
127
189
merged_vcf_fp = pathlib .Path (merged_vcf_fp )
128
190
merged_unsorted_vcf = merged_vcf_fp .with_suffix ('.unsorted.vcf.gz' )
129
191
merged_vcf = merged_vcf_fp .with_suffix ('.vcf.gz' )
@@ -185,4 +247,48 @@ def merge_vcf_files(vcf_files, merged_vcf_fp):
185
247
if merged_unsorted_vcf .exists ():
186
248
merged_unsorted_vcf .unlink ()
187
249
188
- return merged_vcf
250
+ return merged_vcf
251
+
252
+ def merging_pcgr_files (output_dir , pcgr_vcf_files , pcgr_tsv_fp ):
253
+ # Step 3: Merge all chunk VCF files into a single file
254
+ pcgr_dir = output_dir / 'pcgr/'
255
+ pcgr_dir .mkdir (exist_ok = True )
256
+ # Merge all TSV files into a single file in the pcgr directory merged_tsv_fp = os.path.join(pcgr_dir, "nosampleset.pcgr_acmg.grch38.snvs_indels.tiers.tsv")
257
+ merged_tsv_fp = os .path .join (pcgr_dir , "nosampleset.pcgr_acmg.grch38.snvs_indels.tiers.tsv" )
258
+ merge_tsv_files (pcgr_tsv_fp , merged_tsv_fp )
259
+ # Step 5: Merge all VCF files into a single file in the pcgr directory
260
+ merged_vcf_path = os .path .join (pcgr_dir , "nosampleset.pcgr_acmg.grch38" )
261
+ merged_vcf = merge_vcf_files (pcgr_vcf_files , merged_vcf_path )
262
+ return merged_vcf , merged_tsv_fp
263
+
264
+ def run_somatic_chunck (vcf_chunks , pcgr_data_dir , output_dir , pcgr_output_dir , max_threads , pcgr_conda , pcgrr_conda ):
265
+ pcgr_tsv_files = []
266
+ pcgr_vcf_files = []
267
+
268
+ num_chunks = len (vcf_chunks )
269
+ # Ensure we don't use more workers than available threads, and each worker has at least 2 threads
270
+ max_workers = min (num_chunks , max_threads // 2 )
271
+ threads_quot , threads_rem = divmod (max_threads , num_chunks )
272
+ threads_per_chunk = max (2 , threads_quot )
273
+
274
+ # Limit the number of workers to the smaller of num_chunks or max_threads // 2
275
+ with concurrent .futures .ProcessPoolExecutor (max_workers = max_workers ) as executor :
276
+ futures = {}
277
+ for chunk_number , vcf_file in enumerate (vcf_chunks , start = 1 ):
278
+ # Assign extra thread to the first 'threads_rem' chunks
279
+ additional_thread = 1 if chunk_number <= threads_rem else 0
280
+ total_threads = threads_per_chunk + additional_thread
281
+ futures [executor .submit (pcgr .run_somatic , vcf_file , pcgr_data_dir , pcgr_output_dir , chunk_number , total_threads , pcgr_conda , pcgrr_conda )] = chunk_number
282
+
283
+ for future in concurrent .futures .as_completed (futures ):
284
+ try :
285
+ pcgr_tsv_fp , pcgr_vcf_fp = future .result ()
286
+ if pcgr_tsv_fp :
287
+ pcgr_tsv_files .append (pcgr_tsv_fp )
288
+ if pcgr_vcf_fp :
289
+ pcgr_vcf_files .append (pcgr_vcf_fp )
290
+ except Exception as e :
291
+ print (f"Exception occurred: { e } " )
292
+
293
+ merged_vcf_fp , merged_tsv_fp = merging_pcgr_files (output_dir , pcgr_vcf_files , pcgr_tsv_files )
294
+ return merged_tsv_fp , merged_vcf_fp
0 commit comments