mirror of
https://github.com/K-Dense-AI/claude-scientific-skills.git
synced 2026-03-27 07:09:27 +08:00
Remove references/export.md and references/ingestion.md
- Delete detailed export and ingestion reference documentation - Update main skill to remove references to deleted files - Simplify skill to focus on core querying and population genomics - Keep querying.md and population_genomics.md reference files
This commit is contained in:
@@ -339,12 +339,8 @@ config = tiledbvcf.ReadConfig(
|
|||||||
|
|
||||||
Detailed documentation for each major capability:
|
Detailed documentation for each major capability:
|
||||||
|
|
||||||
- **ingestion.md** - Complete guide to dataset creation and VCF/BCF ingestion, including parallel processing, memory optimization, and error handling
|
|
||||||
|
|
||||||
- **querying.md** - Complete guide to efficient variant queries, including region specification, attribute selection, filtering strategies, and performance optimization
|
- **querying.md** - Complete guide to efficient variant queries, including region specification, attribute selection, filtering strategies, and performance optimization
|
||||||
|
|
||||||
- **export.md** - Complete guide to data export in various formats, including VCF/BCF export, TSV generation, and integration with downstream analysis tools
|
|
||||||
|
|
||||||
- **population_genomics.md** - Practical examples of population genomics workflows, including GWAS preparation, quality control, allele frequency analysis, and integration with analysis tools
|
- **population_genomics.md** - Practical examples of population genomics workflows, including GWAS preparation, quality control, allele frequency analysis, and integration with analysis tools
|
||||||
|
|
||||||
## Getting Help
|
## Getting Help
|
||||||
@@ -353,9 +349,7 @@ Detailed documentation for each major capability:
|
|||||||
|
|
||||||
For detailed information on specific operations, refer to the appropriate reference document:
|
For detailed information on specific operations, refer to the appropriate reference document:
|
||||||
|
|
||||||
- Creating datasets or ingesting VCF files → `ingestion.md`
|
|
||||||
- Querying variant data efficiently → `querying.md`
|
- Querying variant data efficiently → `querying.md`
|
||||||
- Exporting data or integrating with other tools → `export.md`
|
|
||||||
- Population genomics workflows → `population_genomics.md`
|
- Population genomics workflows → `population_genomics.md`
|
||||||
|
|
||||||
**Open Source Documentation:**
|
**Open Source Documentation:**
|
||||||
|
|||||||
@@ -1,208 +0,0 @@
|
|||||||
# TileDB-VCF Export Guide
|
|
||||||
|
|
||||||
Complete guide to exporting data from TileDB-VCF datasets in various formats for downstream analysis and integration with other genomics tools.
|
|
||||||
|
|
||||||
## VCF/BCF Export
|
|
||||||
|
|
||||||
### Basic VCF Export
|
|
||||||
```python
|
|
||||||
import tiledbvcf
|
|
||||||
|
|
||||||
# Open dataset for reading
|
|
||||||
ds = tiledbvcf.Dataset(uri="my_dataset", mode="r")
|
|
||||||
|
|
||||||
# Export specific regions as VCF
|
|
||||||
ds.export_vcf(
|
|
||||||
uri="output.vcf.gz",
|
|
||||||
regions=["chr1:1000000-2000000"],
|
|
||||||
samples=["SAMPLE_001", "SAMPLE_002", "SAMPLE_003"]
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### BCF Export (Binary VCF)
|
|
||||||
```python
|
|
||||||
# Export as compressed BCF for faster processing
|
|
||||||
ds.export_bcf(
|
|
||||||
uri="output.bcf",
|
|
||||||
regions=["chr1:1000000-2000000", "chr2:500000-1500000"],
|
|
||||||
samples=["SAMPLE_001", "SAMPLE_002"]
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Large-Scale Export
|
|
||||||
```python
|
|
||||||
# Export entire chromosomes efficiently
|
|
||||||
def export_chromosome(ds, chrom, output_dir, samples=None):
|
|
||||||
"""Export full chromosome data"""
|
|
||||||
output_path = f"{output_dir}/chr{chrom}.bcf"
|
|
||||||
|
|
||||||
print(f"Exporting chromosome {chrom}")
|
|
||||||
ds.export_bcf(
|
|
||||||
uri=output_path,
|
|
||||||
regions=[f"chr{chrom}"],
|
|
||||||
samples=samples
|
|
||||||
)
|
|
||||||
print(f"Exported to {output_path}")
|
|
||||||
|
|
||||||
# Export all autosomes
|
|
||||||
for chrom in range(1, 23):
|
|
||||||
export_chromosome(ds, chrom, "exported_data")
|
|
||||||
```
|
|
||||||
|
|
||||||
## TSV Export
|
|
||||||
|
|
||||||
### Basic TSV Export
|
|
||||||
```python
|
|
||||||
# Export as tab-separated values
|
|
||||||
ds.export_tsv(
|
|
||||||
uri="variants.tsv",
|
|
||||||
regions=["chr1:1000000-2000000"],
|
|
||||||
samples=["SAMPLE_001", "SAMPLE_002"],
|
|
||||||
tsv_fields=["CHR", "POS", "REF", "ALT", "S:GT", "S:DP"]
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
## Pandas DataFrame Export
|
|
||||||
|
|
||||||
### Query to DataFrame
|
|
||||||
```python
|
|
||||||
# Export query results as pandas DataFrame for analysis
|
|
||||||
df = ds.read(
|
|
||||||
attrs=["sample_name", "contig", "pos_start", "alleles", "fmt_GT", "info_AF"],
|
|
||||||
regions=["chr1:1000000-2000000"],
|
|
||||||
samples=["SAMPLE_001", "SAMPLE_002", "SAMPLE_003"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Save DataFrame to various formats
|
|
||||||
df.to_csv("variants.csv", index=False)
|
|
||||||
df.to_parquet("variants.parquet")
|
|
||||||
df.to_pickle("variants.pkl")
|
|
||||||
```
|
|
||||||
|
|
||||||
### Processed Data Export
|
|
||||||
```python
|
|
||||||
def export_processed_variants(ds, regions, samples, output_file):
|
|
||||||
"""Export processed variant data with calculated metrics"""
|
|
||||||
# Query raw data
|
|
||||||
df = ds.read(
|
|
||||||
attrs=["sample_name", "contig", "pos_start", "pos_end",
|
|
||||||
"alleles", "fmt_GT", "fmt_DP", "fmt_GQ", "info_AF"],
|
|
||||||
regions=regions,
|
|
||||||
samples=samples
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add calculated columns
|
|
||||||
df['variant_id'] = df['contig'] + ':' + df['pos_start'].astype(str)
|
|
||||||
|
|
||||||
# Parse genotypes
|
|
||||||
def parse_genotype(gt):
|
|
||||||
if isinstance(gt, list) and len(gt) == 2:
|
|
||||||
if -1 in gt:
|
|
||||||
return "missing"
|
|
||||||
elif gt[0] == gt[1]:
|
|
||||||
return "homozygous"
|
|
||||||
else:
|
|
||||||
return "heterozygous"
|
|
||||||
return "unknown"
|
|
||||||
|
|
||||||
df['genotype_type'] = df['fmt_GT'].apply(parse_genotype)
|
|
||||||
|
|
||||||
# Filter high-quality variants
|
|
||||||
high_qual = df[
|
|
||||||
(df['fmt_DP'] >= 10) &
|
|
||||||
(df['fmt_GQ'] >= 20) &
|
|
||||||
(df['genotype_type'] != 'missing')
|
|
||||||
]
|
|
||||||
|
|
||||||
# Export processed data
|
|
||||||
high_qual.to_csv(output_file, index=False)
|
|
||||||
print(f"Exported {len(high_qual)} high-quality variants to {output_file}")
|
|
||||||
|
|
||||||
return high_qual
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
processed_df = export_processed_variants(
|
|
||||||
ds,
|
|
||||||
regions=["chr1:1000000-2000000"],
|
|
||||||
samples=ds.sample_names()[:50], # First 50 samples
|
|
||||||
output_file="high_quality_variants.csv"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
## Integration with Analysis Tools
|
|
||||||
|
|
||||||
### PLINK Format Export
|
|
||||||
```python
|
|
||||||
def export_for_plink(ds, regions, samples, output_prefix):
|
|
||||||
"""Export data in format suitable for PLINK analysis"""
|
|
||||||
# Query variant data
|
|
||||||
df = ds.read(
|
|
||||||
attrs=["sample_name", "contig", "pos_start", "id", "alleles", "fmt_GT"],
|
|
||||||
regions=regions,
|
|
||||||
samples=samples
|
|
||||||
)
|
|
||||||
|
|
||||||
# Prepare PLINK-compatible data
|
|
||||||
plink_data = []
|
|
||||||
for _, row in df.iterrows():
|
|
||||||
gt = row['fmt_GT']
|
|
||||||
if isinstance(gt, list) and len(gt) == 2 and -1 not in gt:
|
|
||||||
# Convert genotype to PLINK format (0/1/2)
|
|
||||||
alleles = row['alleles']
|
|
||||||
if len(alleles) >= 2:
|
|
||||||
ref_allele = alleles[0]
|
|
||||||
alt_allele = alleles[1]
|
|
||||||
|
|
||||||
# Count alternative alleles
|
|
||||||
alt_count = sum(1 for allele in gt if allele == 1)
|
|
||||||
|
|
||||||
plink_data.append({
|
|
||||||
'sample': row['sample_name'],
|
|
||||||
'chr': row['contig'],
|
|
||||||
'pos': row['pos_start'],
|
|
||||||
'id': row['id'] if row['id'] else f"{row['contig']}_{row['pos_start']}",
|
|
||||||
'ref': ref_allele,
|
|
||||||
'alt': alt_allele,
|
|
||||||
'genotype': alt_count
|
|
||||||
})
|
|
||||||
|
|
||||||
# Save as PLINK-compatible format
|
|
||||||
plink_df = pd.DataFrame(plink_data)
|
|
||||||
|
|
||||||
# Pivot for PLINK .raw format
|
|
||||||
plink_matrix = plink_df.pivot_table(
|
|
||||||
index='sample',
|
|
||||||
columns=['chr', 'pos', 'id'],
|
|
||||||
values='genotype',
|
|
||||||
fill_value=-9 # Missing data code
|
|
||||||
)
|
|
||||||
|
|
||||||
# Save files
|
|
||||||
plink_matrix.to_csv(f"{output_prefix}.raw", sep='\t')
|
|
||||||
|
|
||||||
# Create map file
|
|
||||||
map_data = plink_df[['chr', 'id', 'pos']].drop_duplicates()
|
|
||||||
map_data['genetic_distance'] = 0 # Placeholder
|
|
||||||
map_data = map_data[['chr', 'id', 'genetic_distance', 'pos']]
|
|
||||||
map_data.to_csv(f"{output_prefix}.map", sep='\t', header=False, index=False)
|
|
||||||
|
|
||||||
print(f"Exported PLINK files: {output_prefix}.raw, {output_prefix}.map")
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
export_for_plink(
|
|
||||||
ds,
|
|
||||||
regions=["chr22"], # Start with smaller chromosome
|
|
||||||
samples=ds.sample_names()[:100],
|
|
||||||
output_prefix="plink_data"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## Best Practices
|
|
||||||
|
|
||||||
|
|
||||||
This comprehensive export guide covers all aspects of getting data out of TileDB-VCF in various formats optimized for different downstream analysis workflows.
|
|
||||||
@@ -1,431 +0,0 @@
|
|||||||
# TileDB-VCF Ingestion Guide
|
|
||||||
|
|
||||||
Complete guide to creating TileDB-VCF datasets and ingesting VCF/BCF files with optimal performance and reliability.
|
|
||||||
|
|
||||||
## Important Requirements
|
|
||||||
|
|
||||||
**Before ingesting VCF files, ensure they meet these requirements:**
|
|
||||||
|
|
||||||
- **Single-sample VCFs only**: Multi-sample VCFs are not supported by TileDB-VCF
|
|
||||||
- **Index files required**: All VCF/BCF files must have corresponding index files:
|
|
||||||
- `.csi` files (created with `bcftools index`)
|
|
||||||
- `.tbi` files (created with `tabix`)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Create indexes if they don't exist
|
|
||||||
bcftools index sample.vcf.gz # Creates sample.vcf.gz.csi
|
|
||||||
# OR
|
|
||||||
tabix -p vcf sample.vcf.gz # Creates sample.vcf.gz.tbi
|
|
||||||
```
|
|
||||||
|
|
||||||
## Dataset Creation
|
|
||||||
|
|
||||||
### Basic Dataset Creation
|
|
||||||
```python
|
|
||||||
import tiledbvcf
|
|
||||||
|
|
||||||
# Create a new dataset
|
|
||||||
ds = tiledbvcf.Dataset(uri="my_dataset", mode="w")
|
|
||||||
```
|
|
||||||
|
|
||||||
### Advanced Configuration
|
|
||||||
```python
|
|
||||||
# Custom configuration for large datasets
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
memory_budget=4096, # MB
|
|
||||||
tiledb_config={
|
|
||||||
"sm.tile_cache_size": "2000000000", # 2GB tile cache
|
|
||||||
"sm.mem.total_budget": "4000000000", # 4GB total memory
|
|
||||||
"vfs.file.posix_file_permissions": "644"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
ds = tiledbvcf.Dataset(
|
|
||||||
uri="large_dataset",
|
|
||||||
mode="w",
|
|
||||||
cfg=config
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Cloud Dataset Creation
|
|
||||||
```python
|
|
||||||
# S3 dataset with credentials
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
tiledb_config={
|
|
||||||
"vfs.s3.aws_access_key_id": "YOUR_KEY",
|
|
||||||
"vfs.s3.aws_secret_access_key": "YOUR_SECRET",
|
|
||||||
"vfs.s3.region": "us-east-1"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
ds = tiledbvcf.Dataset(
|
|
||||||
uri="s3://my-bucket/vcf-dataset",
|
|
||||||
mode="w",
|
|
||||||
cfg=config
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Single Sample Ingestion
|
|
||||||
|
|
||||||
### Basic Ingestion
|
|
||||||
```python
|
|
||||||
# Ingest a single VCF file
|
|
||||||
ds.ingest_samples(["sample1.vcf.gz"])
|
|
||||||
|
|
||||||
# Multiple files at once
|
|
||||||
ds.ingest_samples([
|
|
||||||
"sample1.vcf.gz",
|
|
||||||
"sample2.vcf.gz",
|
|
||||||
"sample3.vcf.gz"
|
|
||||||
])
|
|
||||||
```
|
|
||||||
|
|
||||||
### Custom Sample Names
|
|
||||||
```python
|
|
||||||
# Override sample names from VCF headers
|
|
||||||
ds.ingest_samples(
|
|
||||||
["data/unknown_sample.vcf.gz"],
|
|
||||||
sample_names=["SAMPLE_001"]
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Ingestion with Validation
|
|
||||||
```python
|
|
||||||
# Enable additional validation during ingestion
|
|
||||||
try:
|
|
||||||
ds.ingest_samples(
|
|
||||||
["sample1.vcf.gz"],
|
|
||||||
contig_fragment_merging=True, # Merge fragments on same contig
|
|
||||||
resume=False # Start fresh (don't resume)
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Ingestion failed: {e}")
|
|
||||||
```
|
|
||||||
|
|
||||||
## Parallel Ingestion
|
|
||||||
|
|
||||||
### Multi-threaded Ingestion
|
|
||||||
```python
|
|
||||||
# Configure for parallel ingestion
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
tiledb_config={
|
|
||||||
"sm.num_async_threads": "8",
|
|
||||||
"sm.num_reader_threads": "4",
|
|
||||||
"sm.num_writer_threads": "4"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
ds = tiledbvcf.Dataset(uri="dataset", mode="w", cfg=config)
|
|
||||||
|
|
||||||
# Ingest multiple files in parallel
|
|
||||||
file_list = [f"sample_{i}.vcf.gz" for i in range(1, 101)]
|
|
||||||
ds.ingest_samples(file_list)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Batched Processing
|
|
||||||
```python
|
|
||||||
# Process files in batches to manage memory
|
|
||||||
import glob
|
|
||||||
|
|
||||||
vcf_files = glob.glob("*.vcf.gz")
|
|
||||||
batch_size = 10
|
|
||||||
|
|
||||||
for i in range(0, len(vcf_files), batch_size):
|
|
||||||
batch = vcf_files[i:i+batch_size]
|
|
||||||
print(f"Processing batch {i//batch_size + 1}: {len(batch)} files")
|
|
||||||
|
|
||||||
try:
|
|
||||||
ds.ingest_samples(batch)
|
|
||||||
print(f"Successfully ingested batch {i//batch_size + 1}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in batch {i//batch_size + 1}: {e}")
|
|
||||||
# Continue with next batch
|
|
||||||
continue
|
|
||||||
```
|
|
||||||
|
|
||||||
## Incremental Addition
|
|
||||||
|
|
||||||
### Adding New Samples
|
|
||||||
```python
|
|
||||||
# Open existing dataset and add new samples
|
|
||||||
ds = tiledbvcf.Dataset(uri="existing_dataset", mode="a") # append mode
|
|
||||||
|
|
||||||
# Add new samples without affecting existing data
|
|
||||||
ds.ingest_samples(["new_sample1.vcf.gz", "new_sample2.vcf.gz"])
|
|
||||||
```
|
|
||||||
|
|
||||||
### Resuming Interrupted Ingestion
|
|
||||||
```python
|
|
||||||
# Resume a previously interrupted ingestion
|
|
||||||
ds.ingest_samples(
|
|
||||||
["large_sample.vcf.gz"],
|
|
||||||
resume=True # Continue from where it left off
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Memory Optimization
|
|
||||||
|
|
||||||
### Memory Budget Configuration
|
|
||||||
```python
|
|
||||||
# Configure memory usage based on system resources
|
|
||||||
import psutil
|
|
||||||
|
|
||||||
# Use 75% of available memory
|
|
||||||
available_memory = psutil.virtual_memory().available
|
|
||||||
memory_budget_mb = int((available_memory * 0.75) / (1024 * 1024))
|
|
||||||
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
memory_budget=memory_budget_mb,
|
|
||||||
tiledb_config={
|
|
||||||
"sm.mem.total_budget": str(int(available_memory * 0.75))
|
|
||||||
}
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Large File Handling
|
|
||||||
```python
|
|
||||||
# For very large VCF files (>10GB), use streaming ingestion
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
memory_budget=2048, # Conservative memory usage
|
|
||||||
tiledb_config={
|
|
||||||
"sm.tile_cache_size": "500000000", # 500MB cache
|
|
||||||
"sm.consolidation.buffer_size": "100000000" # 100MB buffer
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Process large files one at a time
|
|
||||||
large_files = ["huge_sample1.vcf.gz", "huge_sample2.vcf.gz"]
|
|
||||||
for vcf_file in large_files:
|
|
||||||
print(f"Processing {vcf_file}")
|
|
||||||
ds.ingest_samples([vcf_file])
|
|
||||||
print(f"Completed {vcf_file}")
|
|
||||||
```
|
|
||||||
|
|
||||||
## Error Handling and Validation
|
|
||||||
|
|
||||||
### Comprehensive Error Handling
|
|
||||||
```python
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
|
|
||||||
def robust_ingestion(dataset_uri, vcf_files):
|
|
||||||
config = tiledbvcf.ReadConfig(memory_budget=2048)
|
|
||||||
|
|
||||||
with tiledbvcf.Dataset(uri=dataset_uri, mode="w", cfg=config) as ds:
|
|
||||||
failed_files = []
|
|
||||||
|
|
||||||
for vcf_file in vcf_files:
|
|
||||||
try:
|
|
||||||
# Validate file exists and is readable
|
|
||||||
if not os.path.exists(vcf_file):
|
|
||||||
logging.error(f"File not found: {vcf_file}")
|
|
||||||
failed_files.append(vcf_file)
|
|
||||||
continue
|
|
||||||
|
|
||||||
logging.info(f"Ingesting {vcf_file}")
|
|
||||||
ds.ingest_samples([vcf_file])
|
|
||||||
logging.info(f"Successfully ingested {vcf_file}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(f"Failed to ingest {vcf_file}: {e}")
|
|
||||||
failed_files.append(vcf_file)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if failed_files:
|
|
||||||
logging.warning(f"Failed to ingest {len(failed_files)} files: {failed_files}")
|
|
||||||
|
|
||||||
return failed_files
|
|
||||||
```
|
|
||||||
|
|
||||||
### Pre-ingestion Validation
|
|
||||||
```python
|
|
||||||
import pysam
|
|
||||||
|
|
||||||
def validate_vcf_files(vcf_files):
|
|
||||||
"""Validate VCF files before ingestion"""
|
|
||||||
valid_files = []
|
|
||||||
invalid_files = []
|
|
||||||
|
|
||||||
for vcf_file in vcf_files:
|
|
||||||
try:
|
|
||||||
# Basic validation using pysam
|
|
||||||
vcf = pysam.VariantFile(vcf_file)
|
|
||||||
|
|
||||||
# Check if file has variants
|
|
||||||
try:
|
|
||||||
next(iter(vcf))
|
|
||||||
valid_files.append(vcf_file)
|
|
||||||
print(f"✓ {vcf_file}: Valid")
|
|
||||||
except StopIteration:
|
|
||||||
print(f"⚠ {vcf_file}: No variants found")
|
|
||||||
valid_files.append(vcf_file) # Empty files are valid
|
|
||||||
|
|
||||||
vcf.close()
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"✗ {vcf_file}: Invalid - {e}")
|
|
||||||
invalid_files.append(vcf_file)
|
|
||||||
|
|
||||||
return valid_files, invalid_files
|
|
||||||
|
|
||||||
# Use validation before ingestion
|
|
||||||
vcf_files = ["sample1.vcf.gz", "sample2.vcf.gz"]
|
|
||||||
valid_files, invalid_files = validate_vcf_files(vcf_files)
|
|
||||||
|
|
||||||
if valid_files:
|
|
||||||
ds.ingest_samples(valid_files)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Performance Optimization
|
|
||||||
|
|
||||||
### I/O Optimization
|
|
||||||
```python
|
|
||||||
# Optimize for different storage types
|
|
||||||
def get_optimized_config(storage_type="local"):
|
|
||||||
base_config = {
|
|
||||||
"sm.mem.total_budget": "4000000000",
|
|
||||||
"sm.tile_cache_size": "1000000000"
|
|
||||||
}
|
|
||||||
|
|
||||||
if storage_type == "local":
|
|
||||||
# Optimize for local SSD storage
|
|
||||||
base_config.update({
|
|
||||||
"sm.num_async_threads": "8",
|
|
||||||
"vfs.file.enable_filelocks": "true"
|
|
||||||
})
|
|
||||||
elif storage_type == "s3":
|
|
||||||
# Optimize for S3 storage
|
|
||||||
base_config.update({
|
|
||||||
"vfs.s3.multipart_part_size": "50MB",
|
|
||||||
"vfs.s3.max_parallel_ops": "8",
|
|
||||||
"vfs.s3.use_multipart_upload": "true"
|
|
||||||
})
|
|
||||||
elif storage_type == "azure":
|
|
||||||
# Optimize for Azure Blob Storage
|
|
||||||
base_config.update({
|
|
||||||
"vfs.azure.max_parallel_ops": "8",
|
|
||||||
"vfs.azure.block_list_block_size": "50MB"
|
|
||||||
})
|
|
||||||
|
|
||||||
return tiledbvcf.ReadConfig(
|
|
||||||
memory_budget=4096,
|
|
||||||
tiledb_config=base_config
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Monitoring Ingestion Progress
|
|
||||||
```python
|
|
||||||
import time
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
def ingest_with_progress(dataset, vcf_files):
|
|
||||||
"""Ingest files with progress monitoring"""
|
|
||||||
start_time = time.time()
|
|
||||||
total_files = len(vcf_files)
|
|
||||||
|
|
||||||
for i, vcf_file in enumerate(vcf_files, 1):
|
|
||||||
file_start = time.time()
|
|
||||||
file_size = Path(vcf_file).stat().st_size / (1024*1024) # MB
|
|
||||||
|
|
||||||
print(f"[{i}/{total_files}] Processing {vcf_file} ({file_size:.1f} MB)")
|
|
||||||
|
|
||||||
try:
|
|
||||||
dataset.ingest_samples([vcf_file])
|
|
||||||
file_duration = time.time() - file_start
|
|
||||||
|
|
||||||
print(f" ✓ Completed in {file_duration:.1f}s "
|
|
||||||
f"({file_size/file_duration:.1f} MB/s)")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f" ✗ Failed: {e}")
|
|
||||||
|
|
||||||
total_duration = time.time() - start_time
|
|
||||||
print(f"\nIngestion complete: {total_duration:.1f}s total")
|
|
||||||
```
|
|
||||||
|
|
||||||
## Cloud Storage Patterns
|
|
||||||
|
|
||||||
### S3 Ingestion Pipeline
|
|
||||||
```python
|
|
||||||
import boto3
|
|
||||||
|
|
||||||
def ingest_from_s3_bucket(dataset_uri, bucket, prefix):
|
|
||||||
"""Ingest VCF files from S3 bucket"""
|
|
||||||
s3 = boto3.client('s3')
|
|
||||||
|
|
||||||
# List VCF files in bucket
|
|
||||||
response = s3.list_objects_v2(
|
|
||||||
Bucket=bucket,
|
|
||||||
Prefix=prefix,
|
|
||||||
MaxKeys=1000
|
|
||||||
)
|
|
||||||
|
|
||||||
vcf_files = [
|
|
||||||
f"s3://{bucket}/{obj['Key']}"
|
|
||||||
for obj in response.get('Contents', [])
|
|
||||||
if obj['Key'].endswith(('.vcf.gz', '.vcf'))
|
|
||||||
]
|
|
||||||
|
|
||||||
print(f"Found {len(vcf_files)} VCF files in s3://{bucket}/{prefix}")
|
|
||||||
|
|
||||||
# Configure for S3
|
|
||||||
config = get_optimized_config("s3")
|
|
||||||
|
|
||||||
with tiledbvcf.Dataset(uri=dataset_uri, mode="w", cfg=config) as ds:
|
|
||||||
ds.ingest_samples(vcf_files)
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
ingest_from_s3_bucket(
|
|
||||||
dataset_uri="s3://my-output-bucket/vcf-dataset",
|
|
||||||
bucket="my-input-bucket",
|
|
||||||
prefix="vcf_files/"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Best Practices
|
|
||||||
|
|
||||||
### Dataset Organization
|
|
||||||
```python
|
|
||||||
# Organize datasets by study or cohort
|
|
||||||
study_datasets = {
|
|
||||||
"ukb": "s3://genomics-data/ukb-dataset",
|
|
||||||
"1kgp": "s3://genomics-data/1kgp-dataset",
|
|
||||||
"gnomad": "s3://genomics-data/gnomad-dataset"
|
|
||||||
}
|
|
||||||
|
|
||||||
def create_study_dataset(study_name, vcf_files):
|
|
||||||
"""Create a dataset for a specific study"""
|
|
||||||
dataset_uri = study_datasets[study_name]
|
|
||||||
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
memory_budget=4096,
|
|
||||||
tiledb_config={
|
|
||||||
"sm.consolidation.mode": "fragments",
|
|
||||||
"sm.consolidation.buffer_size": "200000000"
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
with tiledbvcf.Dataset(uri=dataset_uri, mode="w", cfg=config) as ds:
|
|
||||||
ds.ingest_samples(vcf_files)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Maintenance and Consolidation
|
|
||||||
```python
|
|
||||||
# Consolidate dataset after ingestion for optimal query performance
|
|
||||||
def consolidate_dataset(dataset_uri):
|
|
||||||
"""Consolidate dataset fragments for better query performance"""
|
|
||||||
config = tiledbvcf.ReadConfig(
|
|
||||||
tiledb_config={
|
|
||||||
"sm.consolidation.mode": "fragments",
|
|
||||||
"sm.consolidation.buffer_size": "1000000000" # 1GB buffer
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Note: Consolidation API varies by TileDB-VCF version
|
|
||||||
# This is a conceptual example
|
|
||||||
print(f"Consolidating dataset: {dataset_uri}")
|
|
||||||
# Implementation depends on specific TileDB-VCF version
|
|
||||||
```
|
|
||||||
|
|
||||||
This comprehensive guide covers all aspects of TileDB-VCF ingestion from basic single-file ingestion to complex cloud-based parallel processing workflows. Use the patterns that best fit your data scale and infrastructure requirements.
|
|
||||||
Reference in New Issue
Block a user