Add more scientific skills

This commit is contained in:
Timothy Kassis
2025-10-19 14:12:02 -07:00
parent 78d5ac2b56
commit 660c8574d0
210 changed files with 88957 additions and 1 deletions

View File

@@ -0,0 +1,370 @@
#!/usr/bin/env python3
"""
Sequence alignment and phylogenetic analysis using BioPython.
This script demonstrates:
- Pairwise sequence alignment
- Multiple sequence alignment I/O
- Distance matrix calculation
- Phylogenetic tree construction
- Tree manipulation and visualization
"""
from Bio import Align, AlignIO, Phylo
from Bio.Phylo.TreeConstruction import DistanceCalculator, DistanceTreeConstructor
from Bio.Phylo.TreeConstruction import ParsimonyScorer, NNITreeSearcher
from Bio.Seq import Seq
import matplotlib.pyplot as plt
def pairwise_alignment_example():
"""Demonstrate pairwise sequence alignment."""
print("Pairwise Sequence Alignment")
print("=" * 60)
# Create aligner
aligner = Align.PairwiseAligner()
# Set parameters
aligner.mode = "global" # or 'local' for local alignment
aligner.match_score = 2
aligner.mismatch_score = -1
aligner.open_gap_score = -2
aligner.extend_gap_score = -0.5
# Sequences to align
seq1 = "ACGTACGTACGT"
seq2 = "ACGTTACGTGT"
print(f"Sequence 1: {seq1}")
print(f"Sequence 2: {seq2}")
print()
# Perform alignment
alignments = aligner.align(seq1, seq2)
# Show results
print(f"Number of optimal alignments: {len(alignments)}")
print(f"Best alignment score: {alignments.score:.1f}")
print()
# Display best alignment
print("Best alignment:")
print(alignments[0])
print()
def local_alignment_example():
"""Demonstrate local alignment (Smith-Waterman)."""
print("Local Sequence Alignment")
print("=" * 60)
aligner = Align.PairwiseAligner()
aligner.mode = "local"
aligner.match_score = 2
aligner.mismatch_score = -1
aligner.open_gap_score = -2
aligner.extend_gap_score = -0.5
seq1 = "AAAAACGTACGTACGTAAAAA"
seq2 = "TTTTTTACGTACGTTTTTTT"
print(f"Sequence 1: {seq1}")
print(f"Sequence 2: {seq2}")
print()
alignments = aligner.align(seq1, seq2)
print(f"Best local alignment score: {alignments.score:.1f}")
print()
print("Best local alignment:")
print(alignments[0])
print()
def read_and_analyze_alignment(alignment_file, format="fasta"):
"""Read and analyze a multiple sequence alignment."""
print(f"Reading alignment from: {alignment_file}")
print("-" * 60)
# Read alignment
alignment = AlignIO.read(alignment_file, format)
print(f"Number of sequences: {len(alignment)}")
print(f"Alignment length: {alignment.get_alignment_length()}")
print()
# Display alignment
print("Alignment preview:")
for record in alignment[:5]: # Show first 5 sequences
print(f"{record.id[:15]:15s} {record.seq[:50]}...")
print()
# Calculate some statistics
analyze_alignment_statistics(alignment)
return alignment
def analyze_alignment_statistics(alignment):
"""Calculate statistics for an alignment."""
print("Alignment Statistics:")
print("-" * 60)
# Get alignment length
length = alignment.get_alignment_length()
# Count gaps
total_gaps = sum(str(record.seq).count("-") for record in alignment)
gap_percentage = (total_gaps / (length * len(alignment))) * 100
print(f"Total positions: {length}")
print(f"Number of sequences: {len(alignment)}")
print(f"Total gaps: {total_gaps} ({gap_percentage:.1f}%)")
print()
# Calculate conservation at each position
conserved_positions = 0
for i in range(length):
column = alignment[:, i]
# Count most common residue
if column.count(max(set(column), key=column.count)) == len(alignment):
conserved_positions += 1
conservation = (conserved_positions / length) * 100
print(f"Fully conserved positions: {conserved_positions} ({conservation:.1f}%)")
print()
def calculate_distance_matrix(alignment):
"""Calculate distance matrix from alignment."""
print("Calculating Distance Matrix")
print("-" * 60)
calculator = DistanceCalculator("identity")
dm = calculator.get_distance(alignment)
print("Distance matrix:")
print(dm)
print()
return dm
def build_upgma_tree(alignment):
"""Build phylogenetic tree using UPGMA."""
print("Building UPGMA Tree")
print("=" * 60)
# Calculate distance matrix
calculator = DistanceCalculator("identity")
dm = calculator.get_distance(alignment)
# Construct tree
constructor = DistanceTreeConstructor(calculator)
tree = constructor.upgma(dm)
print("UPGMA tree constructed")
print(f"Number of terminals: {tree.count_terminals()}")
print()
return tree
def build_nj_tree(alignment):
"""Build phylogenetic tree using Neighbor-Joining."""
print("Building Neighbor-Joining Tree")
print("=" * 60)
# Calculate distance matrix
calculator = DistanceCalculator("identity")
dm = calculator.get_distance(alignment)
# Construct tree
constructor = DistanceTreeConstructor(calculator)
tree = constructor.nj(dm)
print("Neighbor-Joining tree constructed")
print(f"Number of terminals: {tree.count_terminals()}")
print()
return tree
def visualize_tree(tree, title="Phylogenetic Tree"):
"""Visualize phylogenetic tree."""
print("Visualizing tree...")
print()
# ASCII visualization
print("ASCII tree:")
Phylo.draw_ascii(tree)
print()
# Matplotlib visualization
fig, ax = plt.subplots(figsize=(10, 8))
Phylo.draw(tree, axes=ax, do_show=False)
ax.set_title(title)
plt.tight_layout()
plt.savefig("tree_visualization.png", dpi=300, bbox_inches="tight")
print("Tree saved to tree_visualization.png")
print()
def manipulate_tree(tree):
"""Demonstrate tree manipulation operations."""
print("Tree Manipulation")
print("=" * 60)
# Get terminals
terminals = tree.get_terminals()
print(f"Terminal nodes: {[t.name for t in terminals]}")
print()
# Get nonterminals
nonterminals = tree.get_nonterminals()
print(f"Number of internal nodes: {len(nonterminals)}")
print()
# Calculate total branch length
total_length = tree.total_branch_length()
print(f"Total branch length: {total_length:.4f}")
print()
# Find specific clade
if len(terminals) > 0:
target_name = terminals[0].name
found = tree.find_any(name=target_name)
print(f"Found clade: {found.name}")
print()
# Ladderize tree (sort branches)
tree.ladderize()
print("Tree ladderized (branches sorted)")
print()
# Root at midpoint
tree.root_at_midpoint()
print("Tree rooted at midpoint")
print()
return tree
def read_and_analyze_tree(tree_file, format="newick"):
"""Read and analyze a phylogenetic tree."""
print(f"Reading tree from: {tree_file}")
print("-" * 60)
tree = Phylo.read(tree_file, format)
print(f"Tree format: {format}")
print(f"Number of terminals: {tree.count_terminals()}")
print(f"Is bifurcating: {tree.is_bifurcating()}")
print(f"Total branch length: {tree.total_branch_length():.4f}")
print()
# Show tree structure
print("Tree structure:")
Phylo.draw_ascii(tree)
print()
return tree
def compare_trees(tree1, tree2):
"""Compare two phylogenetic trees."""
print("Comparing Trees")
print("=" * 60)
# Get terminal names
terminals1 = {t.name for t in tree1.get_terminals()}
terminals2 = {t.name for t in tree2.get_terminals()}
print(f"Tree 1 terminals: {len(terminals1)}")
print(f"Tree 2 terminals: {len(terminals2)}")
print(f"Shared terminals: {len(terminals1 & terminals2)}")
print(f"Unique to tree 1: {len(terminals1 - terminals2)}")
print(f"Unique to tree 2: {len(terminals2 - terminals1)}")
print()
def create_example_alignment():
"""Create an example alignment for demonstration."""
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Align import MultipleSeqAlignment
sequences = [
SeqRecord(Seq("ACTGCTAGCTAGCTAG"), id="seq1"),
SeqRecord(Seq("ACTGCTAGCT-GCTAG"), id="seq2"),
SeqRecord(Seq("ACTGCTAGCTAGCTGG"), id="seq3"),
SeqRecord(Seq("ACTGCT-GCTAGCTAG"), id="seq4"),
]
alignment = MultipleSeqAlignment(sequences)
# Save alignment
AlignIO.write(alignment, "example_alignment.fasta", "fasta")
print("Created example alignment: example_alignment.fasta")
print()
return alignment
def example_workflow():
"""Demonstrate complete alignment and phylogeny workflow."""
print("=" * 60)
print("BioPython Alignment & Phylogeny Workflow")
print("=" * 60)
print()
# Pairwise alignment examples
pairwise_alignment_example()
print()
local_alignment_example()
print()
# Create example data
alignment = create_example_alignment()
# Analyze alignment
analyze_alignment_statistics(alignment)
# Calculate distance matrix
dm = calculate_distance_matrix(alignment)
# Build trees
upgma_tree = build_upgma_tree(alignment)
nj_tree = build_nj_tree(alignment)
# Manipulate tree
manipulate_tree(upgma_tree)
# Visualize
visualize_tree(upgma_tree, "UPGMA Tree")
print("Workflow completed!")
print()
if __name__ == "__main__":
example_workflow()
print("Note: For real analyses, use actual alignment files.")
print("Supported alignment formats: clustal, phylip, stockholm, nexus, fasta")
print("Supported tree formats: newick, nexus, phyloxml, nexml")

View File

@@ -0,0 +1,272 @@
#!/usr/bin/env python3
"""
BLAST searches and result parsing using BioPython.
This script demonstrates:
- Running BLAST searches via NCBI (qblast)
- Parsing BLAST XML output
- Filtering and analyzing results
- Working with alignments and HSPs
"""
from Bio.Blast import NCBIWWW, NCBIXML
from Bio import SeqIO
def run_blast_online(sequence, program="blastn", database="nt", expect=0.001):
"""
Run BLAST search via NCBI's qblast.
Parameters:
- sequence: Sequence string or Seq object
- program: blastn, blastp, blastx, tblastn, tblastx
- database: nt (nucleotide), nr (protein), refseq_rna, etc.
- expect: E-value threshold
"""
print(f"Running {program} search against {database} database...")
print(f"E-value threshold: {expect}")
print("-" * 60)
# Run BLAST
result_handle = NCBIWWW.qblast(
program=program,
database=database,
sequence=sequence,
expect=expect,
hitlist_size=50, # Number of sequences to show alignments for
)
# Save results
output_file = "blast_results.xml"
with open(output_file, "w") as out:
out.write(result_handle.read())
result_handle.close()
print(f"BLAST search complete. Results saved to {output_file}")
print()
return output_file
def parse_blast_results(xml_file, max_hits=10, evalue_threshold=0.001):
"""Parse BLAST XML results."""
print(f"Parsing BLAST results from: {xml_file}")
print(f"E-value threshold: {evalue_threshold}")
print("=" * 60)
with open(xml_file) as result_handle:
blast_record = NCBIXML.read(result_handle)
print(f"Query: {blast_record.query}")
print(f"Query length: {blast_record.query_length} residues")
print(f"Database: {blast_record.database}")
print(f"Number of alignments: {len(blast_record.alignments)}")
print()
hit_count = 0
for alignment in blast_record.alignments:
for hsp in alignment.hsps:
if hsp.expect <= evalue_threshold:
hit_count += 1
if hit_count <= max_hits:
print(f"Hit {hit_count}:")
print(f" Sequence: {alignment.title}")
print(f" Length: {alignment.length}")
print(f" E-value: {hsp.expect:.2e}")
print(f" Score: {hsp.score}")
print(f" Identities: {hsp.identities}/{hsp.align_length} ({hsp.identities / hsp.align_length * 100:.1f}%)")
print(f" Positives: {hsp.positives}/{hsp.align_length} ({hsp.positives / hsp.align_length * 100:.1f}%)")
print(f" Gaps: {hsp.gaps}/{hsp.align_length}")
print(f" Query range: {hsp.query_start} - {hsp.query_end}")
print(f" Subject range: {hsp.sbjct_start} - {hsp.sbjct_end}")
print()
# Show alignment (first 100 characters)
print(" Alignment preview:")
print(f" Query: {hsp.query[:100]}")
print(f" Match: {hsp.match[:100]}")
print(f" Sbjct: {hsp.sbjct[:100]}")
print()
print(f"Total significant hits (E-value <= {evalue_threshold}): {hit_count}")
print()
return blast_record
def parse_multiple_queries(xml_file):
"""Parse BLAST results with multiple queries."""
print(f"Parsing multiple queries from: {xml_file}")
print("=" * 60)
with open(xml_file) as result_handle:
blast_records = NCBIXML.parse(result_handle)
for i, blast_record in enumerate(blast_records, 1):
print(f"\nQuery {i}: {blast_record.query}")
print(f" Number of hits: {len(blast_record.alignments)}")
if blast_record.alignments:
best_hit = blast_record.alignments[0]
best_hsp = best_hit.hsps[0]
print(f" Best hit: {best_hit.title[:80]}...")
print(f" Best E-value: {best_hsp.expect:.2e}")
def filter_blast_results(blast_record, min_identity=0.7, min_coverage=0.5):
"""Filter BLAST results by identity and coverage."""
print(f"Filtering results:")
print(f" Minimum identity: {min_identity * 100}%")
print(f" Minimum coverage: {min_coverage * 100}%")
print("-" * 60)
filtered_hits = []
for alignment in blast_record.alignments:
for hsp in alignment.hsps:
identity_fraction = hsp.identities / hsp.align_length
coverage = hsp.align_length / blast_record.query_length
if identity_fraction >= min_identity and coverage >= min_coverage:
filtered_hits.append(
{
"title": alignment.title,
"length": alignment.length,
"evalue": hsp.expect,
"identity": identity_fraction,
"coverage": coverage,
"alignment": alignment,
"hsp": hsp,
}
)
print(f"Found {len(filtered_hits)} hits matching criteria")
print()
# Sort by E-value
filtered_hits.sort(key=lambda x: x["evalue"])
# Display top hits
for i, hit in enumerate(filtered_hits[:5], 1):
print(f"{i}. {hit['title'][:80]}")
print(f" Identity: {hit['identity']*100:.1f}%, Coverage: {hit['coverage']*100:.1f}%, E-value: {hit['evalue']:.2e}")
print()
return filtered_hits
def extract_hit_sequences(blast_record, output_file="blast_hits.fasta"):
"""Extract aligned sequences from BLAST results."""
print(f"Extracting hit sequences to {output_file}...")
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
records = []
for i, alignment in enumerate(blast_record.alignments[:10]): # Top 10 hits
hsp = alignment.hsps[0] # Best HSP for this alignment
# Extract accession from title
accession = alignment.title.split()[0]
# Create SeqRecord from aligned subject sequence
record = SeqRecord(
Seq(hsp.sbjct.replace("-", "")), # Remove gaps
id=accession,
description=f"E-value: {hsp.expect:.2e}, Identity: {hsp.identities}/{hsp.align_length}",
)
records.append(record)
# Write to FASTA
SeqIO.write(records, output_file, "fasta")
print(f"Extracted {len(records)} sequences")
print()
def analyze_blast_statistics(blast_record):
"""Compute statistics from BLAST results."""
print("BLAST Result Statistics:")
print("-" * 60)
if not blast_record.alignments:
print("No hits found")
return
evalues = []
identities = []
scores = []
for alignment in blast_record.alignments:
for hsp in alignment.hsps:
evalues.append(hsp.expect)
identities.append(hsp.identities / hsp.align_length)
scores.append(hsp.score)
import statistics
print(f"Total HSPs: {len(evalues)}")
print(f"\nE-values:")
print(f" Min: {min(evalues):.2e}")
print(f" Max: {max(evalues):.2e}")
print(f" Median: {statistics.median(evalues):.2e}")
print(f"\nIdentity percentages:")
print(f" Min: {min(identities)*100:.1f}%")
print(f" Max: {max(identities)*100:.1f}%")
print(f" Mean: {statistics.mean(identities)*100:.1f}%")
print(f"\nBit scores:")
print(f" Min: {min(scores):.1f}")
print(f" Max: {max(scores):.1f}")
print(f" Mean: {statistics.mean(scores):.1f}")
print()
def example_workflow():
"""Demonstrate BLAST workflow."""
print("=" * 60)
print("BioPython BLAST Example Workflow")
print("=" * 60)
print()
# Example sequence (human beta-globin)
example_sequence = """
ATGGTGCATCTGACTCCTGAGGAGAAGTCTGCCGTTACTGCCCTGTGGGGCAAGGTGAACGTGGATGAAGTTGGTGGTGAGGCCCTGGGCAGGCTGCTGGTGGTCTACCCTTGGACCCAGAGGTTCTTTGAGTCCTTTGGGGATCTGTCCACTCCTGATGCTGTTATGGGCAACCCTAAGGTGAAGGCTCATGGCAAGAAAGTGCTCGGTGCCTTTAGTGATGGCCTGGCTCACCTGGACAACCTCAAGGGCACCTTTGCCACACTGAGTGAGCTGCACTGTGACAAGCTGCACGTGGATCCTGAGAACTTCAGGCTCCTGGGCAACGTGCTGGTCTGTGTGCTGGCCCATCACTTTGGCAAAGAATTCACCCCACCAGTGCAGGCTGCCTATCAGAAAGTGGTGGCTGGTGTGGCTAATGCCCTGGCCCACAAGTATCACTAAGCTCGCTTTCTTGCTGTCCAATTTCTATTAAAGGTTCCTTTGTTCCCTAAGTCCAACTACTAAACTGGGGGATATTATGAAGGGCCTTGAGCATCTGGATTCTGCCTAATAAAAAACATTTATTTTCATTGC
""".replace("\n", "").replace(" ", "")
print("Example: Human beta-globin sequence")
print(f"Length: {len(example_sequence)} bp")
print()
# Note: Uncomment to run actual BLAST search (takes time)
# xml_file = run_blast_online(example_sequence, program="blastn", database="nt", expect=0.001)
# For demonstration, use a pre-existing results file
print("To run a real BLAST search, uncomment the run_blast_online() line")
print("For now, demonstrating parsing with example results file")
print()
# If you have results, parse them:
# blast_record = parse_blast_results("blast_results.xml", max_hits=5)
# filtered = filter_blast_results(blast_record, min_identity=0.9)
# analyze_blast_statistics(blast_record)
# extract_hit_sequences(blast_record)
if __name__ == "__main__":
example_workflow()
print()
print("Note: BLAST searches can take several minutes.")
print("For production use, consider running local BLAST instead.")

View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""
File I/O operations using BioPython SeqIO.
This script demonstrates:
- Reading sequences from various formats
- Writing sequences to files
- Converting between formats
- Filtering and processing sequences
- Working with large files efficiently
"""
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
def read_sequences(filename, format_type):
"""Read and display sequences from a file."""
print(f"Reading {format_type} file: {filename}")
print("-" * 60)
count = 0
for record in SeqIO.parse(filename, format_type):
count += 1
print(f"ID: {record.id}")
print(f"Name: {record.name}")
print(f"Description: {record.description}")
print(f"Sequence length: {len(record.seq)}")
print(f"Sequence: {record.seq[:50]}...")
print()
# Only show first 3 sequences
if count >= 3:
break
# Count total sequences
total = len(list(SeqIO.parse(filename, format_type)))
print(f"Total sequences in file: {total}")
print()
def read_single_sequence(filename, format_type):
"""Read a single sequence from a file."""
record = SeqIO.read(filename, format_type)
print("Single sequence record:")
print(f"ID: {record.id}")
print(f"Sequence: {record.seq}")
print()
def write_sequences(records, output_filename, format_type):
"""Write sequences to a file."""
count = SeqIO.write(records, output_filename, format_type)
print(f"Wrote {count} sequences to {output_filename} in {format_type} format")
print()
def convert_format(input_file, input_format, output_file, output_format):
"""Convert sequences from one format to another."""
count = SeqIO.convert(input_file, input_format, output_file, output_format)
print(f"Converted {count} sequences from {input_format} to {output_format}")
print()
def filter_sequences(input_file, format_type, min_length=100, max_length=1000):
"""Filter sequences by length."""
filtered = []
for record in SeqIO.parse(input_file, format_type):
if min_length <= len(record.seq) <= max_length:
filtered.append(record)
print(f"Found {len(filtered)} sequences between {min_length} and {max_length} bp")
return filtered
def extract_subsequence(input_file, format_type, seq_id, start, end):
"""Extract a subsequence from a specific record."""
# Index for efficient access
record_dict = SeqIO.index(input_file, format_type)
if seq_id in record_dict:
record = record_dict[seq_id]
subseq = record.seq[start:end]
print(f"Extracted subsequence from {seq_id} ({start}:{end}):")
print(subseq)
return subseq
else:
print(f"Sequence {seq_id} not found")
return None
def create_sequence_records():
"""Create SeqRecord objects from scratch."""
# Simple record
simple_record = SeqRecord(
Seq("ATGCATGCATGC"),
id="seq001",
name="MySequence",
description="Example sequence"
)
# Record with annotations
annotated_record = SeqRecord(
Seq("ATGGTGCATCTGACTCCTGAGGAG"),
id="seq002",
name="GeneX",
description="Important gene"
)
annotated_record.annotations["molecule_type"] = "DNA"
annotated_record.annotations["organism"] = "Homo sapiens"
return [simple_record, annotated_record]
def index_large_file(filename, format_type):
"""Index a large file for random access without loading into memory."""
# Create index
record_index = SeqIO.index(filename, format_type)
print(f"Indexed {len(record_index)} sequences")
print(f"Available IDs: {list(record_index.keys())[:10]}...")
print()
# Access specific record by ID
if len(record_index) > 0:
first_id = list(record_index.keys())[0]
record = record_index[first_id]
print(f"Accessed record: {record.id}")
print()
# Close index
record_index.close()
def parse_with_quality_scores(fastq_file):
"""Parse FASTQ files with quality scores."""
print("Parsing FASTQ with quality scores:")
print("-" * 60)
for record in SeqIO.parse(fastq_file, "fastq"):
print(f"ID: {record.id}")
print(f"Sequence: {record.seq[:50]}...")
print(f"Quality scores (first 10): {record.letter_annotations['phred_quality'][:10]}")
# Calculate average quality
avg_quality = sum(record.letter_annotations["phred_quality"]) / len(record)
print(f"Average quality: {avg_quality:.2f}")
print()
break # Just show first record
def batch_process_large_file(input_file, format_type, batch_size=100):
"""Process large files in batches to manage memory."""
batch = []
count = 0
for record in SeqIO.parse(input_file, format_type):
batch.append(record)
count += 1
if len(batch) == batch_size:
# Process batch
print(f"Processing batch of {len(batch)} sequences...")
# Do something with batch
batch = [] # Clear for next batch
# Process remaining records
if batch:
print(f"Processing final batch of {len(batch)} sequences...")
print(f"Total sequences processed: {count}")
def example_workflow():
"""Demonstrate a complete workflow."""
print("=" * 60)
print("BioPython SeqIO Workflow Example")
print("=" * 60)
print()
# Create example sequences
records = create_sequence_records()
# Write as FASTA
write_sequences(records, "example_output.fasta", "fasta")
# Write as GenBank
write_sequences(records, "example_output.gb", "genbank")
# Convert FASTA to GenBank (would work if file exists)
# convert_format("input.fasta", "fasta", "output.gb", "genbank")
print("Example workflow completed!")
if __name__ == "__main__":
example_workflow()
print()
print("Note: This script demonstrates BioPython SeqIO operations.")
print("Uncomment and adapt the functions for your specific files.")

View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
"""
NCBI Entrez database access using BioPython.
This script demonstrates:
- Searching NCBI databases
- Downloading sequences by accession
- Retrieving PubMed articles
- Batch downloading with WebEnv
- Proper error handling and rate limiting
"""
import time
from Bio import Entrez, SeqIO
# IMPORTANT: Always set your email
Entrez.email = "your.email@example.com" # Change this!
def search_nucleotide(query, max_results=10):
"""Search NCBI nucleotide database."""
print(f"Searching nucleotide database for: {query}")
print("-" * 60)
handle = Entrez.esearch(db="nucleotide", term=query, retmax=max_results)
record = Entrez.read(handle)
handle.close()
print(f"Found {record['Count']} total matches")
print(f"Returning top {len(record['IdList'])} IDs:")
print(record["IdList"])
print()
return record["IdList"]
def fetch_sequence_by_accession(accession):
"""Download a sequence by accession number."""
print(f"Fetching sequence: {accession}")
try:
handle = Entrez.efetch(
db="nucleotide", id=accession, rettype="gb", retmode="text"
)
record = SeqIO.read(handle, "genbank")
handle.close()
print(f"Successfully retrieved: {record.id}")
print(f"Description: {record.description}")
print(f"Length: {len(record.seq)} bp")
print(f"Organism: {record.annotations.get('organism', 'Unknown')}")
print()
return record
except Exception as e:
print(f"Error fetching {accession}: {e}")
return None
def fetch_multiple_sequences(id_list, output_file="downloaded_sequences.fasta"):
"""Download multiple sequences and save to file."""
print(f"Fetching {len(id_list)} sequences...")
try:
# For >200 IDs, efetch automatically uses POST
handle = Entrez.efetch(
db="nucleotide", id=id_list, rettype="fasta", retmode="text"
)
# Parse and save
records = list(SeqIO.parse(handle, "fasta"))
handle.close()
SeqIO.write(records, output_file, "fasta")
print(f"Successfully downloaded {len(records)} sequences to {output_file}")
print()
return records
except Exception as e:
print(f"Error fetching sequences: {e}")
return []
def search_and_download(query, output_file, max_results=100):
"""Complete workflow: search and download sequences."""
print(f"Searching and downloading: {query}")
print("=" * 60)
# Search
handle = Entrez.esearch(db="nucleotide", term=query, retmax=max_results)
record = Entrez.read(handle)
handle.close()
id_list = record["IdList"]
print(f"Found {len(id_list)} sequences")
if not id_list:
print("No results found")
return
# Download in batches to be polite
batch_size = 100
all_records = []
for start in range(0, len(id_list), batch_size):
end = min(start + batch_size, len(id_list))
batch_ids = id_list[start:end]
print(f"Downloading batch {start // batch_size + 1} ({len(batch_ids)} sequences)...")
handle = Entrez.efetch(
db="nucleotide", id=batch_ids, rettype="fasta", retmode="text"
)
batch_records = list(SeqIO.parse(handle, "fasta"))
handle.close()
all_records.extend(batch_records)
# Be polite - wait between requests
time.sleep(0.5)
# Save all records
SeqIO.write(all_records, output_file, "fasta")
print(f"Downloaded {len(all_records)} sequences to {output_file}")
print()
def use_history_for_large_queries(query, max_results=1000):
"""Use NCBI History server for large queries."""
print("Using NCBI History server for large query")
print("-" * 60)
# Search with history
search_handle = Entrez.esearch(
db="nucleotide", term=query, retmax=max_results, usehistory="y"
)
search_results = Entrez.read(search_handle)
search_handle.close()
count = int(search_results["Count"])
webenv = search_results["WebEnv"]
query_key = search_results["QueryKey"]
print(f"Found {count} total sequences")
print(f"WebEnv: {webenv[:20]}...")
print(f"QueryKey: {query_key}")
print()
# Fetch in batches using history
batch_size = 500
all_records = []
for start in range(0, min(count, max_results), batch_size):
end = min(start + batch_size, max_results)
print(f"Downloading records {start + 1} to {end}...")
fetch_handle = Entrez.efetch(
db="nucleotide",
rettype="fasta",
retmode="text",
retstart=start,
retmax=batch_size,
webenv=webenv,
query_key=query_key,
)
batch_records = list(SeqIO.parse(fetch_handle, "fasta"))
fetch_handle.close()
all_records.extend(batch_records)
# Be polite
time.sleep(0.5)
print(f"Downloaded {len(all_records)} sequences total")
return all_records
def search_pubmed(query, max_results=10):
"""Search PubMed for articles."""
print(f"Searching PubMed for: {query}")
print("-" * 60)
handle = Entrez.esearch(db="pubmed", term=query, retmax=max_results)
record = Entrez.read(handle)
handle.close()
id_list = record["IdList"]
print(f"Found {record['Count']} total articles")
print(f"Returning {len(id_list)} PMIDs:")
print(id_list)
print()
return id_list
def fetch_pubmed_abstracts(pmid_list):
"""Fetch PubMed article summaries."""
print(f"Fetching summaries for {len(pmid_list)} articles...")
handle = Entrez.efetch(db="pubmed", id=pmid_list, rettype="abstract", retmode="text")
abstracts = handle.read()
handle.close()
print(abstracts[:500]) # Show first 500 characters
print("...")
print()
def get_database_info(database="nucleotide"):
"""Get information about an NCBI database."""
print(f"Getting info for database: {database}")
print("-" * 60)
handle = Entrez.einfo(db=database)
record = Entrez.read(handle)
handle.close()
db_info = record["DbInfo"]
print(f"Name: {db_info['DbName']}")
print(f"Description: {db_info['Description']}")
print(f"Record count: {db_info['Count']}")
print(f"Last update: {db_info['LastUpdate']}")
print()
def link_databases(db_from, db_to, id_):
"""Find related records in other databases."""
print(f"Finding links from {db_from} ID {id_} to {db_to}")
print("-" * 60)
handle = Entrez.elink(dbfrom=db_from, db=db_to, id=id_)
record = Entrez.read(handle)
handle.close()
if record[0]["LinkSetDb"]:
linked_ids = [link["Id"] for link in record[0]["LinkSetDb"][0]["Link"]]
print(f"Found {len(linked_ids)} linked records")
print(f"IDs: {linked_ids[:10]}")
else:
print("No linked records found")
print()
def example_workflow():
"""Demonstrate complete Entrez workflow."""
print("=" * 60)
print("BioPython Entrez Example Workflow")
print("=" * 60)
print()
# Note: These are examples - uncomment to run with your email set
# # Example 1: Search and get IDs
# ids = search_nucleotide("Homo sapiens[Organism] AND COX1[Gene]", max_results=5)
#
# # Example 2: Fetch a specific sequence
# fetch_sequence_by_accession("NM_001301717")
#
# # Example 3: Complete search and download
# search_and_download("Escherichia coli[Organism] AND 16S", "ecoli_16s.fasta", max_results=50)
#
# # Example 4: PubMed search
# pmids = search_pubmed("CRISPR[Title] AND 2023[PDAT]", max_results=5)
# fetch_pubmed_abstracts(pmids[:2])
#
# # Example 5: Get database info
# get_database_info("nucleotide")
print("Examples are commented out. Uncomment and set your email to run.")
if __name__ == "__main__":
example_workflow()
print()
print("IMPORTANT: Always set Entrez.email before using these functions!")
print("NCBI requires an email address for their E-utilities.")

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""
Common sequence operations using BioPython.
This script demonstrates basic sequence manipulation tasks like:
- Creating and manipulating Seq objects
- Transcription and translation
- Complement and reverse complement
- Calculating GC content and melting temperature
"""
from Bio.Seq import Seq
from Bio.SeqUtils import gc_fraction, MeltingTemp as mt
def demonstrate_seq_operations():
"""Show common Seq object operations."""
# Create DNA sequence
dna_seq = Seq("ATGGTGCATCTGACTCCTGAGGAGAAGTCTGCCGTTACTGCCCTG")
print("Original DNA sequence:")
print(dna_seq)
print()
# Transcription (DNA -> RNA)
rna_seq = dna_seq.transcribe()
print("Transcribed to RNA:")
print(rna_seq)
print()
# Translation (DNA -> Protein)
protein_seq = dna_seq.translate()
print("Translated to protein:")
print(protein_seq)
print()
# Translation with stop codon handling
protein_to_stop = dna_seq.translate(to_stop=True)
print("Translated to first stop codon:")
print(protein_to_stop)
print()
# Complement
complement = dna_seq.complement()
print("Complement:")
print(complement)
print()
# Reverse complement
reverse_complement = dna_seq.reverse_complement()
print("Reverse complement:")
print(reverse_complement)
print()
# GC content
gc = gc_fraction(dna_seq) * 100
print(f"GC content: {gc:.2f}%")
print()
# Melting temperature
tm = mt.Tm_NN(dna_seq)
print(f"Melting temperature (nearest-neighbor): {tm:.2f}°C")
print()
# Sequence searching
codon_start = dna_seq.find("ATG")
print(f"Start codon (ATG) position: {codon_start}")
# Count occurrences
g_count = dna_seq.count("G")
print(f"Number of G nucleotides: {g_count}")
print()
def translate_with_genetic_code():
"""Demonstrate translation with different genetic codes."""
dna_seq = Seq("ATGGTGCATCTGACTCCTGAGGAGAAGTCT")
# Standard genetic code (table 1)
standard = dna_seq.translate(table=1)
print("Standard genetic code translation:")
print(standard)
# Vertebrate mitochondrial code (table 2)
mito = dna_seq.translate(table=2)
print("Vertebrate mitochondrial code translation:")
print(mito)
print()
def working_with_codons():
"""Access genetic code tables."""
from Bio.Data import CodonTable
# Get standard genetic code
standard_table = CodonTable.unambiguous_dna_by_id[1]
print("Standard genetic code:")
print(f"Start codons: {standard_table.start_codons}")
print(f"Stop codons: {standard_table.stop_codons}")
print()
# Show some codon translations
print("Example codons:")
for codon in ["ATG", "TGG", "TAA", "TAG", "TGA"]:
if codon in standard_table.stop_codons:
print(f"{codon} -> STOP")
else:
aa = standard_table.forward_table.get(codon, "Unknown")
print(f"{codon} -> {aa}")
if __name__ == "__main__":
print("=" * 60)
print("BioPython Sequence Operations Demo")
print("=" * 60)
print()
demonstrate_seq_operations()
print("-" * 60)
translate_with_genetic_code()
print("-" * 60)
working_with_codons()