mirror of
https://github.com/K-Dense-AI/claude-scientific-skills.git
synced 2026-03-27 07:09:27 +08:00
Remove Streaming Export for Large Datasets section
- Remove chunked export functionality - Remove parallel export functionality - Simplify export guide to focus on basic export operations
This commit is contained in:
@@ -130,112 +130,6 @@ processed_df = export_processed_variants(
|
||||
)
|
||||
```
|
||||
|
||||
## Streaming Export for Large Datasets
|
||||
|
||||
### Chunked Export
|
||||
```python
|
||||
def streaming_export(ds, regions, samples, output_file, chunk_size=100000):
|
||||
"""Export large datasets in chunks to manage memory"""
|
||||
import csv
|
||||
|
||||
total_variants = 0
|
||||
|
||||
with open(output_file, 'w', newline='') as f:
|
||||
writer = None
|
||||
header_written = False
|
||||
|
||||
for region in regions:
|
||||
print(f"Processing region: {region}")
|
||||
|
||||
# Query region
|
||||
df = ds.read(
|
||||
attrs=["sample_name", "contig", "pos_start", "alleles", "fmt_GT"],
|
||||
regions=[region],
|
||||
samples=samples
|
||||
)
|
||||
|
||||
if df.empty:
|
||||
continue
|
||||
|
||||
# Process in chunks
|
||||
for i in range(0, len(df), chunk_size):
|
||||
chunk = df.iloc[i:i+chunk_size]
|
||||
|
||||
# Write header on first chunk
|
||||
if not header_written:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(chunk.columns)
|
||||
header_written = True
|
||||
|
||||
# Write chunk data
|
||||
for _, row in chunk.iterrows():
|
||||
writer.writerow(row.values)
|
||||
|
||||
total_variants += len(chunk)
|
||||
|
||||
if i + chunk_size < len(df):
|
||||
print(f" Processed {i + chunk_size:,} variants...")
|
||||
|
||||
print(f"Exported {total_variants:,} variants to {output_file}")
|
||||
|
||||
# Usage
|
||||
regions = [f"chr{i}" for i in range(1, 23)] # All autosomes
|
||||
streaming_export(ds, regions, ds.sample_names(), "genome_wide_variants.csv")
|
||||
```
|
||||
|
||||
### Parallel Export
|
||||
```python
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
|
||||
def export_region_chunk(args):
|
||||
"""Export single region - for parallel processing"""
|
||||
dataset_uri, region, samples, output_dir = args
|
||||
|
||||
# Create separate dataset instance for each process
|
||||
ds = tiledbvcf.Dataset(uri=dataset_uri, mode="r")
|
||||
|
||||
# Generate output filename
|
||||
region_safe = region.replace(":", "_").replace("-", "_")
|
||||
output_file = os.path.join(output_dir, f"variants_{region_safe}.tsv")
|
||||
|
||||
# Export region
|
||||
ds.export_tsv(
|
||||
uri=output_file,
|
||||
regions=[region],
|
||||
samples=samples,
|
||||
tsv_fields=["CHR", "POS", "REF", "ALT", "S:GT", "S:DP"]
|
||||
)
|
||||
|
||||
return region, output_file
|
||||
|
||||
def parallel_export(dataset_uri, regions, samples, output_dir, n_processes=4):
|
||||
"""Export multiple regions in parallel"""
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Prepare arguments for parallel processing
|
||||
args = [(dataset_uri, region, samples, output_dir) for region in regions]
|
||||
|
||||
# Export in parallel
|
||||
with mp.Pool(n_processes) as pool:
|
||||
results = pool.map(export_region_chunk, args)
|
||||
|
||||
# Combine results if needed
|
||||
output_files = [output_file for _, output_file in results]
|
||||
print(f"Exported {len(output_files)} region files to {output_dir}")
|
||||
|
||||
return output_files
|
||||
|
||||
# Usage
|
||||
regions = [f"chr{i}:1-50000000" for i in range(1, 23)] # First half of each chromosome
|
||||
output_files = parallel_export(
|
||||
dataset_uri="my_dataset",
|
||||
regions=regions,
|
||||
samples=ds.sample_names()[:100],
|
||||
output_dir="parallel_export",
|
||||
n_processes=8
|
||||
)
|
||||
```
|
||||
|
||||
## Integration with Analysis Tools
|
||||
|
||||
|
||||
Reference in New Issue
Block a user