This commit is contained in:
Timothy Kassis
2025-10-20 17:47:37 -07:00
parent c7296b2661
commit fac5860912
8 changed files with 3107 additions and 1 deletions

View File

@@ -7,7 +7,7 @@
},
"metadata": {
"description": "Claude scientific skills from K-Dense Inc",
"version": "1.19.0"
"version": "1.20.0"
},
"plugins": [
{
@@ -24,6 +24,7 @@
"./scientific-packages/bioservices",
"./scientific-packages/cellxgene-census",
"./scientific-packages/cobrapy",
"./scientific-packages/dask",
"./scientific-packages/datamol",
"./scientific-packages/deepchem",
"./scientific-packages/deeptools",

View File

@@ -0,0 +1,450 @@
---
name: dask
description: Toolkit for parallel and distributed computing in Python enabling larger-than-memory operations, parallel processing, and distributed computation. Use this skill when: (1) datasets exceed available RAM and need chunked processing, (2) pandas/NumPy operations are slow and need parallelization across cores, (3) processing multiple files (CSV, Parquet, JSON, logs) that collectively exceed memory, (4) building custom parallel workflows with task dependencies, (5) scaling from prototype pandas/NumPy code to production on larger data, (6) need distributed computing across multiple machines, (7) working with scientific datasets (HDF5, Zarr, NetCDF) larger than memory, (8) ETL pipelines processing terabytes of unstructured data, (9) parameter sweeps or embarrassingly parallel computations, (10) when simpler solutions (better algorithms, efficient formats, sampling) aren't sufficient. Supports DataFrames (parallel pandas), Arrays (parallel NumPy), Bags (parallel Python lists), Futures (task-based parallelization), and various schedulers. NOT suitable for: small datasets fitting in memory, single-file processing, simple computations without parallelism needs, when NumPy/Pandas already perform adequately, or when task overhead exceeds computation time.
---
# Dask
## Overview
Dask is a Python library for parallel and distributed computing that enables three critical capabilities:
- **Larger-than-memory execution** on single machines for data exceeding available RAM
- **Parallel processing** for improved computational speed across multiple cores
- **Distributed computation** supporting terabyte-scale datasets across multiple machines
Dask scales from laptops (processing ~100 GiB) to clusters (processing ~100 TiB) while maintaining familiar Python APIs.
## When to Use Dask
Apply this skill when users need to:
- Process datasets that exceed available RAM
- Scale pandas or NumPy operations to larger datasets
- Parallelize computations for performance improvements
- Process multiple files efficiently (CSVs, Parquet, JSON, text logs)
- Build custom parallel workflows with task dependencies
- Distribute workloads across multiple cores or machines
## Core Capabilities
Dask provides five main components, each suited to different use cases:
### 1. DataFrames - Parallel Pandas Operations
**Purpose**: Scale pandas operations to larger datasets through parallel processing.
**When to Use**:
- Tabular data exceeds available RAM
- Need to process multiple CSV/Parquet files together
- Pandas operations are slow and need parallelization
- Scaling from pandas prototype to production
**Reference Documentation**: For comprehensive guidance on Dask DataFrames, refer to `references/dataframes.md` which includes:
- Reading data (single files, multiple files, glob patterns)
- Common operations (filtering, groupby, joins, aggregations)
- Custom operations with `map_partitions`
- Performance optimization tips
- Common patterns (ETL, time series, multi-file processing)
**Quick Example**:
```python
import dask.dataframe as dd
# Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# Operations are lazy until compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
```
**Key Points**:
- Operations are lazy (build task graph) until `.compute()` called
- Use `map_partitions` for efficient custom operations
- Convert to DataFrame early when working with structured data from other sources
### 2. Arrays - Parallel NumPy Operations
**Purpose**: Extend NumPy capabilities to datasets larger than memory using blocked algorithms.
**When to Use**:
- Arrays exceed available RAM
- NumPy operations need parallelization
- Working with scientific datasets (HDF5, Zarr, NetCDF)
- Need parallel linear algebra or array operations
**Reference Documentation**: For comprehensive guidance on Dask Arrays, refer to `references/arrays.md` which includes:
- Creating arrays (from NumPy, random, from disk)
- Chunking strategies and optimization
- Common operations (arithmetic, reductions, linear algebra)
- Custom operations with `map_blocks`
- Integration with HDF5, Zarr, and XArray
**Quick Example**:
```python
import dask.array as da
# Create large array with chunks
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# Operations are lazy
y = x + 100
z = y.mean(axis=0)
# Compute result
result = z.compute()
```
**Key Points**:
- Chunk size is critical (aim for ~100 MB per chunk)
- Operations work on chunks in parallel
- Rechunk data when needed for efficient operations
- Use `map_blocks` for operations not available in Dask
### 3. Bags - Parallel Processing of Unstructured Data
**Purpose**: Process unstructured or semi-structured data (text, JSON, logs) with functional operations.
**When to Use**:
- Processing text files, logs, or JSON records
- Data cleaning and ETL before structured analysis
- Working with Python objects that don't fit array/dataframe formats
- Need memory-efficient streaming processing
**Reference Documentation**: For comprehensive guidance on Dask Bags, refer to `references/bags.md` which includes:
- Reading text and JSON files
- Functional operations (map, filter, fold, groupby)
- Converting to DataFrames
- Common patterns (log analysis, JSON processing, text processing)
- Performance considerations
**Quick Example**:
```python
import dask.bag as db
import json
# Read and parse JSON files
bag = db.read_text('logs/*.json').map(json.loads)
# Filter and transform
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
# Convert to DataFrame for analysis
ddf = processed.to_dataframe()
```
**Key Points**:
- Use for initial data cleaning, then convert to DataFrame/Array
- Use `foldby` instead of `groupby` for better performance
- Operations are streaming and memory-efficient
- Convert to structured formats (DataFrame) for complex operations
### 4. Futures - Task-Based Parallelization
**Purpose**: Build custom parallel workflows with fine-grained control over task execution and dependencies.
**When to Use**:
- Building dynamic, evolving workflows
- Need immediate task execution (not lazy)
- Computations depend on runtime conditions
- Implementing custom parallel algorithms
- Need stateful computations
**Reference Documentation**: For comprehensive guidance on Dask Futures, refer to `references/futures.md` which includes:
- Setting up distributed client
- Submitting tasks and working with futures
- Task dependencies and data movement
- Advanced coordination (queues, locks, events, actors)
- Common patterns (parameter sweeps, dynamic tasks, iterative algorithms)
**Quick Example**:
```python
from dask.distributed import Client
client = Client() # Create local cluster
# Submit tasks (executes immediately)
def process(x):
return x ** 2
futures = client.map(process, range(100))
# Gather results
results = client.gather(futures)
client.close()
```
**Key Points**:
- Requires distributed client (even for single machine)
- Tasks execute immediately when submitted
- Pre-scatter large data to avoid repeated transfers
- ~1ms overhead per task (not suitable for millions of tiny tasks)
- Use actors for stateful workflows
### 5. Schedulers - Execution Backends
**Purpose**: Control how and where Dask tasks execute (threads, processes, distributed).
**When to Choose Scheduler**:
- **Threads** (default): NumPy/Pandas operations, GIL-releasing libraries, shared memory benefit
- **Processes**: Pure Python code, text processing, GIL-bound operations
- **Synchronous**: Debugging with pdb, profiling, understanding errors
- **Distributed**: Need dashboard, multi-machine clusters, advanced features
**Reference Documentation**: For comprehensive guidance on Dask Schedulers, refer to `references/schedulers.md` which includes:
- Detailed scheduler descriptions and characteristics
- Configuration methods (global, context manager, per-compute)
- Performance considerations and overhead
- Common patterns and troubleshooting
- Thread configuration for optimal performance
**Quick Example**:
```python
import dask
import dask.dataframe as dd
# Use threads for DataFrame (default, good for numeric)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # Uses threads
# Use processes for Python-heavy work
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
# Use synchronous for debugging
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # Can use pdb
# Use distributed for monitoring and scaling
from dask.distributed import Client
client = Client()
result4 = computation.compute() # Uses distributed with dashboard
```
**Key Points**:
- Threads: Lowest overhead (~10 µs/task), best for numeric work
- Processes: Avoids GIL (~10 ms/task), best for Python work
- Distributed: Monitoring dashboard (~1 ms/task), scales to clusters
- Can switch schedulers per computation or globally
## Best Practices
For comprehensive performance optimization guidance, memory management strategies, and common pitfalls to avoid, refer to `references/best-practices.md`. Key principles include:
### Start with Simpler Solutions
Before using Dask, explore:
- Better algorithms
- Efficient file formats (Parquet instead of CSV)
- Compiled code (Numba, Cython)
- Data sampling
### Critical Performance Rules
**1. Don't Load Data Locally Then Hand to Dask**
```python
# Wrong: Loads all data in memory first
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
# Correct: Let Dask handle loading
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
```
**2. Avoid Repeated compute() Calls**
```python
# Wrong: Each compute is separate
for item in items:
result = dask_computation(item).compute()
# Correct: Single compute for all
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
```
**3. Don't Build Excessively Large Task Graphs**
- Increase chunk sizes if millions of tasks
- Use `map_partitions`/`map_blocks` to fuse operations
- Check task graph size: `len(ddf.__dask_graph__())`
**4. Choose Appropriate Chunk Sizes**
- Target: ~100 MB per chunk (or 10 chunks per core in worker memory)
- Too large: Memory overflow
- Too small: Scheduling overhead
**5. Use the Dashboard**
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitor performance, identify bottlenecks
```
## Common Workflow Patterns
### ETL Pipeline
```python
import dask.dataframe as dd
# Extract: Read data
ddf = dd.read_csv('raw_data/*.csv')
# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# Load: Aggregate and save
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
```
### Unstructured to Structured Pipeline
```python
import dask.bag as db
import json
# Start with Bag for unstructured data
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
# Convert to DataFrame for structured analysis
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
```
### Large-Scale Array Computation
```python
import dask.array as da
# Load or create large array
x = da.from_zarr('large_dataset.zarr')
# Process in chunks
normalized = (x - x.mean()) / x.std()
# Save result
da.to_zarr(normalized, 'normalized.zarr')
```
### Custom Parallel Workflow
```python
from dask.distributed import Client
client = Client()
# Scatter large dataset once
data = client.scatter(large_dataset)
# Process in parallel with dependencies
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
# Gather results
results = client.gather(futures)
```
## Selecting the Right Component
Use this decision guide to choose the appropriate Dask component:
**Data Type**:
- Tabular data → **DataFrames**
- Numeric arrays → **Arrays**
- Text/JSON/logs → **Bags** (then convert to DataFrame)
- Custom Python objects → **Bags** or **Futures**
**Operation Type**:
- Standard pandas operations → **DataFrames**
- Standard NumPy operations → **Arrays**
- Custom parallel tasks → **Futures**
- Text processing/ETL → **Bags**
**Control Level**:
- High-level, automatic → **DataFrames/Arrays**
- Low-level, manual → **Futures**
**Workflow Type**:
- Static computation graph → **DataFrames/Arrays/Bags**
- Dynamic, evolving → **Futures**
## Integration Considerations
### File Formats
- **Efficient**: Parquet, HDF5, Zarr (columnar, compressed, parallel-friendly)
- **Compatible but slower**: CSV (use for initial ingestion only)
- **For Arrays**: HDF5, Zarr, NetCDF
### Conversion Between Collections
```python
# Bag → DataFrame
ddf = bag.to_dataframe()
# DataFrame → Array (for numeric data)
arr = ddf.to_dask_array(lengths=True)
# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
```
### With Other Libraries
- **XArray**: Wraps Dask arrays with labeled dimensions (geospatial, imaging)
- **Dask-ML**: Machine learning with scikit-learn compatible APIs
- **Distributed**: Advanced cluster management and monitoring
## Debugging and Development
### Iterative Development Workflow
1. **Test on small data with synchronous scheduler**:
```python
dask.config.set(scheduler='synchronous')
result = computation.compute() # Can use pdb, easy debugging
```
2. **Validate with threads on sample**:
```python
sample = ddf.head(1000) # Small sample
# Test logic, then scale to full dataset
```
3. **Scale with distributed for monitoring**:
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitor performance
result = computation.compute()
```
### Common Issues
**Memory Errors**:
- Decrease chunk sizes
- Use `persist()` strategically and delete when done
- Check for memory leaks in custom functions
**Slow Start**:
- Task graph too large (increase chunk sizes)
- Use `map_partitions` or `map_blocks` to reduce tasks
**Poor Parallelization**:
- Chunks too large (increase number of partitions)
- Using threads with Python code (switch to processes)
- Data dependencies preventing parallelism
## Reference Files
All reference documentation files can be read as needed for detailed information:
- `references/dataframes.md` - Complete Dask DataFrame guide
- `references/arrays.md` - Complete Dask Array guide
- `references/bags.md` - Complete Dask Bag guide
- `references/futures.md` - Complete Dask Futures and distributed computing guide
- `references/schedulers.md` - Complete scheduler selection and configuration guide
- `references/best-practices.md` - Comprehensive performance optimization and troubleshooting
Load these files when users need detailed information about specific Dask components, operations, or patterns beyond the quick guidance provided here.

View File

@@ -0,0 +1,497 @@
# Dask Arrays
## Overview
Dask Array implements NumPy's ndarray interface using blocked algorithms. It coordinates many NumPy arrays arranged into a grid to enable computation on datasets larger than available memory, utilizing parallelism across multiple cores.
## Core Concept
A Dask Array is divided into chunks (blocks):
- Each chunk is a regular NumPy array
- Operations are applied to each chunk in parallel
- Results are combined automatically
- Enables out-of-core computation (data larger than RAM)
## Key Capabilities
### What Dask Arrays Support
**Mathematical Operations**:
- Arithmetic operations (+, -, *, /)
- Scalar functions (exponentials, logarithms, trigonometric)
- Element-wise operations
**Reductions**:
- `sum()`, `mean()`, `std()`, `var()`
- Reductions along specified axes
- `min()`, `max()`, `argmin()`, `argmax()`
**Linear Algebra**:
- Tensor contractions
- Dot products and matrix multiplication
- Some decompositions (SVD, QR)
**Data Manipulation**:
- Transposition
- Slicing (standard and fancy indexing)
- Reshaping
- Concatenation and stacking
**Array Protocols**:
- Universal functions (ufuncs)
- NumPy protocols for interoperability
## When to Use Dask Arrays
**Use Dask Arrays When**:
- Arrays exceed available RAM
- Computation can be parallelized across chunks
- Working with NumPy-style numerical operations
- Need to scale NumPy code to larger datasets
**Stick with NumPy When**:
- Arrays fit comfortably in memory
- Operations require global views of data
- Using specialized functions not available in Dask
- Performance is adequate with NumPy alone
## Important Limitations
Dask Arrays intentionally don't implement certain NumPy features:
**Not Implemented**:
- Most `np.linalg` functions (only basic operations available)
- Operations difficult to parallelize (like full sorting)
- Memory-inefficient operations (converting to lists, iterating via loops)
- Many specialized functions (driven by community needs)
**Workarounds**: For unsupported operations, consider using `map_blocks` with custom NumPy code.
## Creating Dask Arrays
### From NumPy Arrays
```python
import dask.array as da
import numpy as np
# Create from NumPy array with specified chunks
x = np.arange(10000)
dx = da.from_array(x, chunks=1000) # Creates 10 chunks of 1000 elements each
```
### Random Arrays
```python
# Create random array with specified chunks
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Other random functions
x = da.random.normal(10, 0.1, size=(10000, 10000), chunks=(1000, 1000))
```
### Zeros, Ones, and Empty
```python
# Create arrays filled with constants
zeros = da.zeros((10000, 10000), chunks=(1000, 1000))
ones = da.ones((10000, 10000), chunks=(1000, 1000))
empty = da.empty((10000, 10000), chunks=(1000, 1000))
```
### From Functions
```python
# Create array from function
def create_block(block_id):
return np.random.random((1000, 1000)) * block_id[0]
x = da.from_delayed(
[[dask.delayed(create_block)((i, j)) for j in range(10)] for i in range(10)],
shape=(10000, 10000),
dtype=float
)
```
### From Disk
```python
# Load from HDF5
import h5py
f = h5py.File('myfile.hdf5', mode='r')
x = da.from_array(f['/data'], chunks=(1000, 1000))
# Load from Zarr
import zarr
z = zarr.open('myfile.zarr', mode='r')
x = da.from_array(z, chunks=(1000, 1000))
```
## Common Operations
### Arithmetic Operations
```python
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = da.random.random((10000, 10000), chunks=(1000, 1000))
# Element-wise operations (lazy)
z = x + y
z = x * y
z = da.exp(x)
z = da.log(y)
# Compute result
result = z.compute()
```
### Reductions
```python
# Reductions along axes
total = x.sum().compute()
mean = x.mean().compute()
std = x.std().compute()
# Reduction along specific axis
row_means = x.mean(axis=1).compute()
col_sums = x.sum(axis=0).compute()
```
### Slicing and Indexing
```python
# Standard slicing (returns Dask Array)
subset = x[1000:5000, 2000:8000]
# Fancy indexing
indices = [0, 5, 10, 15]
selected = x[indices, :]
# Boolean indexing
mask = x > 0.5
filtered = x[mask]
```
### Matrix Operations
```python
# Matrix multiplication
A = da.random.random((10000, 5000), chunks=(1000, 1000))
B = da.random.random((5000, 8000), chunks=(1000, 1000))
C = da.matmul(A, B)
result = C.compute()
# Dot product
dot_product = da.dot(A, B)
# Transpose
AT = A.T
```
### Linear Algebra
```python
# SVD (Singular Value Decomposition)
U, s, Vt = da.linalg.svd(A)
U_computed, s_computed, Vt_computed = dask.compute(U, s, Vt)
# QR decomposition
Q, R = da.linalg.qr(A)
Q_computed, R_computed = dask.compute(Q, R)
# Note: Only some linalg operations are available
```
### Reshaping and Manipulation
```python
# Reshape
x = da.random.random((10000, 10000), chunks=(1000, 1000))
reshaped = x.reshape(5000, 20000)
# Transpose
transposed = x.T
# Concatenate
x1 = da.random.random((5000, 10000), chunks=(1000, 1000))
x2 = da.random.random((5000, 10000), chunks=(1000, 1000))
combined = da.concatenate([x1, x2], axis=0)
# Stack
stacked = da.stack([x1, x2], axis=0)
```
## Chunking Strategy
Chunking is critical for Dask Array performance.
### Chunk Size Guidelines
**Good Chunk Sizes**:
- Each chunk: ~10-100 MB (compressed)
- ~1 million elements per chunk for numeric data
- Balance between parallelism and overhead
**Example Calculation**:
```python
# For float64 data (8 bytes per element)
# Target 100 MB chunks: 100 MB / 8 bytes = 12.5M elements
# For 2D array (10000, 10000):
x = da.random.random((10000, 10000), chunks=(1000, 1000)) # ~8 MB per chunk
```
### Viewing Chunk Structure
```python
# Check chunks
print(x.chunks) # ((1000, 1000, ...), (1000, 1000, ...))
# Number of chunks
print(x.npartitions)
# Chunk sizes in bytes
print(x.nbytes / x.npartitions)
```
### Rechunking
```python
# Change chunk sizes
x = da.random.random((10000, 10000), chunks=(500, 500))
x_rechunked = x.rechunk((2000, 2000))
# Rechunk specific dimension
x_rechunked = x.rechunk({0: 2000, 1: 'auto'})
```
## Custom Operations with map_blocks
For operations not available in Dask, use `map_blocks`:
```python
import dask.array as da
import numpy as np
def custom_function(block):
# Apply custom NumPy operation
return np.fft.fft2(block)
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = da.map_blocks(custom_function, x, dtype=x.dtype)
# Compute
output = result.compute()
```
### map_blocks with Different Output Shape
```python
def reduction_function(block):
# Returns scalar for each block
return np.array([block.mean()])
result = da.map_blocks(
reduction_function,
x,
dtype='float64',
drop_axis=[0, 1], # Output has no axes from input
new_axis=0, # Output has new axis
chunks=(1,) # One element per block
)
```
## Lazy Evaluation and Computation
### Lazy Operations
```python
# All operations are lazy (instant, no computation)
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = x + 100
z = y.mean(axis=0)
result = z * 2
# Nothing computed yet, just task graph built
```
### Triggering Computation
```python
# Compute single result
final = result.compute()
# Compute multiple results efficiently
result1, result2 = dask.compute(operation1, operation2)
```
### Persist in Memory
```python
# Keep intermediate results in memory
x_cached = x.persist()
# Reuse cached results
y1 = (x_cached + 10).compute()
y2 = (x_cached * 2).compute()
```
## Saving Results
### To NumPy
```python
# Convert to NumPy (loads all in memory)
numpy_array = dask_array.compute()
```
### To Disk
```python
# Save to HDF5
import h5py
with h5py.File('output.hdf5', mode='w') as f:
dset = f.create_dataset('/data', shape=x.shape, dtype=x.dtype)
da.store(x, dset)
# Save to Zarr
import zarr
z = zarr.open('output.zarr', mode='w', shape=x.shape, dtype=x.dtype, chunks=x.chunks)
da.store(x, z)
```
## Performance Considerations
### Efficient Operations
- Element-wise operations: Very efficient
- Reductions with parallelizable operations: Efficient
- Slicing along chunk boundaries: Efficient
- Matrix operations with good chunk alignment: Efficient
### Expensive Operations
- Slicing across many chunks: Requires data movement
- Operations requiring global sorting: Not well supported
- Extremely irregular access patterns: Poor performance
- Operations with poor chunk alignment: Requires rechunking
### Optimization Tips
**1. Choose Good Chunk Sizes**
```python
# Aim for balanced chunks
# Good: ~100 MB per chunk
x = da.random.random((100000, 10000), chunks=(10000, 10000))
```
**2. Align Chunks for Operations**
```python
# Make sure chunks align for operations
x = da.random.random((10000, 10000), chunks=(1000, 1000))
y = da.random.random((10000, 10000), chunks=(1000, 1000)) # Aligned
z = x + y # Efficient
```
**3. Use Appropriate Scheduler**
```python
# Arrays work well with threaded scheduler (default)
# Shared memory access is efficient
result = x.compute() # Uses threads by default
```
**4. Minimize Data Transfer**
```python
# Better: Compute on each chunk, then transfer results
means = x.mean(axis=1).compute() # Transfers less data
# Worse: Transfer all data then compute
x_numpy = x.compute()
means = x_numpy.mean(axis=1) # Transfers more data
```
## Common Patterns
### Image Processing
```python
import dask.array as da
# Load large image stack
images = da.from_zarr('images.zarr')
# Apply filtering
def apply_gaussian(block):
from scipy.ndimage import gaussian_filter
return gaussian_filter(block, sigma=2)
filtered = da.map_blocks(apply_gaussian, images, dtype=images.dtype)
# Compute statistics
mean_intensity = filtered.mean().compute()
```
### Scientific Computing
```python
# Large-scale numerical simulation
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# Apply iterative computation
for i in range(num_iterations):
x = da.exp(-x) * da.sin(x)
x = x.persist() # Keep in memory for next iteration
# Final result
result = x.compute()
```
### Data Analysis
```python
# Load large dataset
data = da.from_zarr('measurements.zarr')
# Compute statistics
mean = data.mean(axis=0)
std = data.std(axis=0)
normalized = (data - mean) / std
# Save normalized data
da.to_zarr(normalized, 'normalized.zarr')
```
## Integration with Other Tools
### XArray
```python
import xarray as xr
import dask.array as da
# XArray wraps Dask arrays with labeled dimensions
data = da.random.random((1000, 2000, 3000), chunks=(100, 200, 300))
dataset = xr.DataArray(
data,
dims=['time', 'y', 'x'],
coords={'time': range(1000), 'y': range(2000), 'x': range(3000)}
)
```
### Scikit-learn (via Dask-ML)
```python
# Some scikit-learn compatible operations
from dask_ml.preprocessing import StandardScaler
X = da.random.random((10000, 100), chunks=(1000, 100))
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
```
## Debugging Tips
### Visualize Task Graph
```python
# Visualize computation graph (for small arrays)
x = da.random.random((100, 100), chunks=(10, 10))
y = x + 1
y.visualize(filename='graph.png')
```
### Check Array Properties
```python
# Inspect before computing
print(f"Shape: {x.shape}")
print(f"Dtype: {x.dtype}")
print(f"Chunks: {x.chunks}")
print(f"Number of tasks: {len(x.__dask_graph__())}")
```
### Test on Small Arrays First
```python
# Test logic on small array
small_x = da.random.random((100, 100), chunks=(50, 50))
result_small = computation(small_x).compute()
# Validate, then scale
large_x = da.random.random((100000, 100000), chunks=(10000, 10000))
result_large = computation(large_x).compute()
```

View File

@@ -0,0 +1,468 @@
# Dask Bags
## Overview
Dask Bag implements functional operations including `map`, `filter`, `fold`, and `groupby` on generic Python objects. It processes data in parallel while maintaining a small memory footprint through Python iterators. Bags function as "a parallel version of PyToolz or a Pythonic version of the PySpark RDD."
## Core Concept
A Dask Bag is a collection of Python objects distributed across partitions:
- Each partition contains generic Python objects
- Operations use functional programming patterns
- Processing uses streaming/iterators for memory efficiency
- Ideal for unstructured or semi-structured data
## Key Capabilities
### Functional Operations
- `map`: Transform each element
- `filter`: Select elements based on condition
- `fold`: Reduce elements with combining function
- `groupby`: Group elements by key
- `pluck`: Extract fields from records
- `flatten`: Flatten nested structures
### Use Cases
- Text processing and log analysis
- JSON record processing
- ETL on unstructured data
- Data cleaning before structured analysis
## When to Use Dask Bags
**Use Bags When**:
- Working with general Python objects requiring flexible computation
- Data doesn't fit structured array or tabular formats
- Processing text, JSON, or custom Python objects
- Initial data cleaning and ETL is needed
- Memory-efficient streaming is important
**Use Other Collections When**:
- Data is structured (use DataFrames instead)
- Numeric computing (use Arrays instead)
- Operations require complex groupby or shuffles (use DataFrames)
**Key Recommendation**: Use Bag to clean and process data, then transform it into an array or DataFrame before embarking on more complex operations that require shuffle steps.
## Important Limitations
Bags sacrifice performance for generality:
- Rely on multiprocessing scheduling (not threads)
- Remain immutable (create new bags for changes)
- Operate slower than array/DataFrame equivalents
- Handle `groupby` inefficiently (use `foldby` when possible)
- Operations requiring substantial inter-worker communication are slow
## Creating Bags
### From Sequences
```python
import dask.bag as db
# From Python list
bag = db.from_sequence([1, 2, 3, 4, 5], partition_size=2)
# From range
bag = db.from_sequence(range(10000), partition_size=1000)
```
### From Text Files
```python
# Single file
bag = db.read_text('data.txt')
# Multiple files with glob
bag = db.read_text('data/*.txt')
# With encoding
bag = db.read_text('data/*.txt', encoding='utf-8')
# Custom line processing
bag = db.read_text('logs/*.log', blocksize='64MB')
```
### From Delayed Objects
```python
import dask
@dask.delayed
def load_data(filename):
with open(filename) as f:
return [line.strip() for line in f]
files = ['file1.txt', 'file2.txt', 'file3.txt']
partitions = [load_data(f) for f in files]
bag = db.from_delayed(partitions)
```
### From Custom Sources
```python
# From any iterable-producing function
def read_json_files():
import json
for filename in glob.glob('data/*.json'):
with open(filename) as f:
yield json.load(f)
# Create bag from generator
bag = db.from_sequence(read_json_files(), partition_size=10)
```
## Common Operations
### Map (Transform)
```python
import dask.bag as db
bag = db.read_text('data/*.json')
# Parse JSON
import json
parsed = bag.map(json.loads)
# Extract field
values = parsed.map(lambda x: x['value'])
# Complex transformation
def process_record(record):
return {
'id': record['id'],
'value': record['value'] * 2,
'category': record.get('category', 'unknown')
}
processed = parsed.map(process_record)
```
### Filter
```python
# Filter by condition
valid = parsed.filter(lambda x: x['status'] == 'valid')
# Multiple conditions
filtered = parsed.filter(lambda x: x['value'] > 100 and x['year'] == 2024)
# Filter with custom function
def is_valid_record(record):
return record.get('status') == 'valid' and record.get('value') is not None
valid_records = parsed.filter(is_valid_record)
```
### Pluck (Extract Fields)
```python
# Extract single field
ids = parsed.pluck('id')
# Extract multiple fields (creates tuples)
key_pairs = parsed.pluck(['id', 'value'])
```
### Flatten
```python
# Flatten nested lists
nested = db.from_sequence([[1, 2], [3, 4], [5, 6]])
flat = nested.flatten() # [1, 2, 3, 4, 5, 6]
# Flatten after map
bag = db.read_text('data/*.txt')
words = bag.map(str.split).flatten() # All words from all files
```
### GroupBy (Expensive)
```python
# Group by key (requires shuffle)
grouped = parsed.groupby(lambda x: x['category'])
# Aggregate after grouping
counts = grouped.map(lambda key_items: (key_items[0], len(list(key_items[1]))))
result = counts.compute()
```
### FoldBy (Preferred for Aggregations)
```python
# FoldBy is more efficient than groupby for aggregations
def add(acc, item):
return acc + item['value']
def combine(acc1, acc2):
return acc1 + acc2
# Sum values by category
sums = parsed.foldby(
key='category',
binop=add,
initial=0,
combine=combine
)
result = sums.compute()
```
### Reductions
```python
# Count elements
count = bag.count().compute()
# Get all distinct values (requires memory)
distinct = bag.distinct().compute()
# Take first n elements
first_ten = bag.take(10)
# Fold/reduce
total = bag.fold(
lambda acc, x: acc + x['value'],
initial=0,
combine=lambda a, b: a + b
).compute()
```
## Converting to Other Collections
### To DataFrame
```python
import dask.bag as db
import dask.dataframe as dd
# Bag of dictionaries
bag = db.read_text('data/*.json').map(json.loads)
# Convert to DataFrame
ddf = bag.to_dataframe()
# With explicit columns
ddf = bag.to_dataframe(meta={'id': int, 'value': float, 'category': str})
```
### To List/Compute
```python
# Compute to Python list (loads all in memory)
result = bag.compute()
# Take sample
sample = bag.take(100)
```
## Common Patterns
### JSON Processing
```python
import dask.bag as db
import json
# Read and parse JSON files
bag = db.read_text('logs/*.json')
parsed = bag.map(json.loads)
# Filter valid records
valid = parsed.filter(lambda x: x.get('status') == 'success')
# Extract relevant fields
processed = valid.map(lambda x: {
'user_id': x['user']['id'],
'timestamp': x['timestamp'],
'value': x['metrics']['value']
})
# Convert to DataFrame for analysis
ddf = processed.to_dataframe()
# Analyze
summary = ddf.groupby('user_id')['value'].mean().compute()
```
### Log Analysis
```python
# Read log files
logs = db.read_text('logs/*.log')
# Parse log lines
def parse_log_line(line):
parts = line.split(' ')
return {
'timestamp': parts[0],
'level': parts[1],
'message': ' '.join(parts[2:])
}
parsed_logs = logs.map(parse_log_line)
# Filter errors
errors = parsed_logs.filter(lambda x: x['level'] == 'ERROR')
# Count by message pattern
error_counts = errors.foldby(
key='message',
binop=lambda acc, x: acc + 1,
initial=0,
combine=lambda a, b: a + b
)
result = error_counts.compute()
```
### Text Processing
```python
# Read text files
text = db.read_text('documents/*.txt')
# Split into words
words = text.map(str.lower).map(str.split).flatten()
# Count word frequencies
def increment(acc, word):
return acc + 1
def combine_counts(a, b):
return a + b
word_counts = words.foldby(
key=lambda word: word,
binop=increment,
initial=0,
combine=combine_counts
)
# Get top words
top_words = word_counts.compute()
sorted_words = sorted(top_words, key=lambda x: x[1], reverse=True)[:100]
```
### Data Cleaning Pipeline
```python
import dask.bag as db
import json
# Read raw data
raw = db.read_text('raw_data/*.json').map(json.loads)
# Validation function
def is_valid(record):
required_fields = ['id', 'timestamp', 'value']
return all(field in record for field in required_fields)
# Cleaning function
def clean_record(record):
return {
'id': int(record['id']),
'timestamp': record['timestamp'],
'value': float(record['value']),
'category': record.get('category', 'unknown'),
'tags': record.get('tags', [])
}
# Pipeline
cleaned = (raw
.filter(is_valid)
.map(clean_record)
.filter(lambda x: x['value'] > 0)
)
# Convert to DataFrame
ddf = cleaned.to_dataframe()
# Save cleaned data
ddf.to_parquet('cleaned_data/')
```
## Performance Considerations
### Efficient Operations
- Map, filter, pluck: Very efficient (streaming)
- Flatten: Efficient
- FoldBy with good key distribution: Reasonable
- Take and head: Efficient (only processes needed partitions)
### Expensive Operations
- GroupBy: Requires shuffle, can be slow
- Distinct: Requires collecting all unique values
- Operations requiring full data materialization
### Optimization Tips
**1. Use FoldBy Instead of GroupBy**
```python
# Better: Use foldby for aggregations
result = bag.foldby(key='category', binop=add, initial=0, combine=sum)
# Worse: GroupBy then reduce
result = bag.groupby('category').map(lambda x: (x[0], sum(x[1])))
```
**2. Convert to DataFrame Early**
```python
# For structured operations, convert to DataFrame
bag = db.read_text('data/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
ddf = bag.to_dataframe() # Now use efficient DataFrame operations
```
**3. Control Partition Size**
```python
# Balance between too many and too few partitions
bag = db.read_text('data/*.txt', blocksize='64MB') # Reasonable partition size
```
**4. Use Lazy Evaluation**
```python
# Chain operations before computing
result = (bag
.map(process1)
.filter(condition)
.map(process2)
.compute() # Single compute at the end
)
```
## Debugging Tips
### Inspect Partitions
```python
# Get number of partitions
print(bag.npartitions)
# Take sample
sample = bag.take(10)
print(sample)
```
### Validate on Small Data
```python
# Test logic on small subset
small_bag = db.from_sequence(sample_data, partition_size=10)
result = process_pipeline(small_bag).compute()
# Validate results, then scale
```
### Check Intermediate Results
```python
# Compute intermediate steps to debug
step1 = bag.map(parse).take(5)
print("After parsing:", step1)
step2 = bag.map(parse).filter(validate).take(5)
print("After filtering:", step2)
```
## Memory Management
Bags are designed for memory-efficient processing:
```python
# Streaming processing - doesn't load all in memory
bag = db.read_text('huge_file.txt') # Lazy
processed = bag.map(process_line) # Still lazy
result = processed.compute() # Processes in chunks
```
For very large results, avoid computing to memory:
```python
# Don't compute huge results to memory
# result = bag.compute() # Could overflow memory
# Instead, convert and save to disk
ddf = bag.to_dataframe()
ddf.to_parquet('output/')
```

View File

@@ -0,0 +1,277 @@
# Dask Best Practices
## Performance Optimization Principles
### Start with Simpler Solutions First
Before implementing parallel computing with Dask, explore these alternatives:
- Better algorithms for the specific problem
- Efficient file formats (Parquet, HDF5, Zarr instead of CSV)
- Compiled code via Numba or Cython
- Data sampling for development and testing
These alternatives often provide better returns than distributed systems and should be exhausted before scaling to parallel computing.
### Chunk Size Strategy
**Critical Rule**: Chunks should be small enough that many fit in a worker's available memory at once.
**Recommended Target**: Size chunks so workers can hold 10 chunks per core without exceeding available memory.
**Why It Matters**:
- Too large chunks: Memory overflow and inefficient parallelization
- Too small chunks: Excessive scheduling overhead
**Example Calculation**:
- 8 cores with 32 GB RAM
- Target: ~400 MB per chunk (32 GB / 8 cores / 10 chunks)
### Monitor with the Dashboard
The Dask dashboard provides essential visibility into:
- Worker states and resource utilization
- Task progress and bottlenecks
- Memory usage patterns
- Performance characteristics
Access the dashboard to understand what's actually slow in parallel workloads rather than guessing at optimizations.
## Critical Pitfalls to Avoid
### 1. Don't Create Large Objects Locally Before Dask
**Wrong Approach**:
```python
import pandas as pd
import dask.dataframe as dd
# Loads entire dataset into memory first
df = pd.read_csv('large_file.csv')
ddf = dd.from_pandas(df, npartitions=10)
```
**Correct Approach**:
```python
import dask.dataframe as dd
# Let Dask handle the loading
ddf = dd.read_csv('large_file.csv')
```
**Why**: Loading data with pandas or NumPy first forces the scheduler to serialize and embed those objects in task graphs, defeating the purpose of parallel computing.
**Key Principle**: Use Dask methods to load data and use Dask to control the results.
### 2. Avoid Repeated compute() Calls
**Wrong Approach**:
```python
results = []
for item in items:
result = dask_computation(item).compute() # Each compute is separate
results.append(result)
```
**Correct Approach**:
```python
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations) # Single compute for all
```
**Why**: Calling compute in loops prevents Dask from:
- Parallelizing different computations
- Sharing intermediate results
- Optimizing the overall task graph
### 3. Don't Build Excessively Large Task Graphs
**Symptoms**:
- Millions of tasks in a single computation
- Severe scheduling overhead
- Long delays before computation starts
**Solutions**:
- Increase chunk sizes to reduce number of tasks
- Use `map_partitions` or `map_blocks` to fuse operations
- Break computations into smaller pieces with intermediate persists
- Consider whether the problem truly requires distributed computing
**Example Using map_partitions**:
```python
# Instead of applying function to each row
ddf['result'] = ddf.apply(complex_function, axis=1) # Many tasks
# Apply to entire partitions at once
ddf = ddf.map_partitions(lambda df: df.assign(result=complex_function(df)))
```
## Infrastructure Considerations
### Scheduler Selection
**Use Threads For**:
- Numeric work with GIL-releasing libraries (NumPy, Pandas, scikit-learn)
- Operations that benefit from shared memory
- Single-machine workloads with array/dataframe operations
**Use Processes For**:
- Text processing and Python collection operations
- Pure Python code that's GIL-bound
- Operations that need process isolation
**Use Distributed Scheduler For**:
- Multi-machine clusters
- Need for diagnostic dashboard
- Asynchronous APIs
- Better data locality handling
### Thread Configuration
**Recommendation**: Aim for roughly 4 threads per process on numeric workloads.
**Rationale**:
- Balance between parallelism and overhead
- Allows efficient use of CPU cores
- Reduces context switching costs
### Memory Management
**Persist Strategically**:
```python
# Persist intermediate results that are reused
intermediate = expensive_computation(data).persist()
result1 = intermediate.operation1().compute()
result2 = intermediate.operation2().compute()
```
**Clear Memory When Done**:
```python
# Explicitly delete large objects
del intermediate
```
## Data Loading Best Practices
### Use Appropriate File Formats
**For Tabular Data**:
- Parquet: Columnar, compressed, fast filtering
- CSV: Only for small data or initial ingestion
**For Array Data**:
- HDF5: Good for numeric arrays
- Zarr: Cloud-native, parallel-friendly
- NetCDF: Scientific data with metadata
### Optimize Data Ingestion
**Read Multiple Files Efficiently**:
```python
# Use glob patterns to read multiple files in parallel
ddf = dd.read_parquet('data/year=2024/month=*/day=*.parquet')
```
**Specify Useful Columns Early**:
```python
# Only read needed columns
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2', 'col3'])
```
## Common Patterns and Solutions
### Pattern: Embarrassingly Parallel Problems
For independent computations, use Futures:
```python
from dask.distributed import Client
client = Client()
futures = [client.submit(func, arg) for arg in args]
results = client.gather(futures)
```
### Pattern: Data Preprocessing Pipeline
Use Bags for initial ETL, then convert to structured formats:
```python
import dask.bag as db
# Process raw JSON
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'success')
# Convert to DataFrame for analysis
ddf = bag.to_dataframe()
```
### Pattern: Iterative Algorithms
Persist data between iterations:
```python
data = dd.read_parquet('data.parquet')
data = data.persist() # Keep in memory across iterations
for iteration in range(num_iterations):
data = update_function(data)
data = data.persist() # Persist updated version
```
## Debugging Tips
### Use Single-Threaded Scheduler
For debugging with pdb or detailed error inspection:
```python
import dask
dask.config.set(scheduler='synchronous')
result = computation.compute() # Runs in single thread for debugging
```
### Check Task Graph Size
Before computing, check the number of tasks:
```python
print(len(ddf.__dask_graph__())) # Should be reasonable, not millions
```
### Validate on Small Data First
Test logic on small subset before scaling:
```python
# Test on first partition
sample = ddf.head(1000)
# Validate results
# Then scale to full dataset
```
## Performance Troubleshooting
### Symptom: Slow Computation Start
**Likely Cause**: Task graph is too large
**Solution**: Increase chunk sizes or use map_partitions
### Symptom: Memory Errors
**Likely Causes**:
- Chunks too large
- Too many intermediate results
- Memory leaks in user functions
**Solutions**:
- Decrease chunk sizes
- Use persist() strategically and delete when done
- Profile user functions for memory issues
### Symptom: Poor Parallelization
**Likely Causes**:
- Data dependencies preventing parallelism
- Chunks too large (not enough tasks)
- GIL contention with threads on Python code
**Solutions**:
- Restructure computation to reduce dependencies
- Increase number of partitions
- Switch to multiprocessing scheduler for Python code

View File

@@ -0,0 +1,368 @@
# Dask DataFrames
## Overview
Dask DataFrames enable parallel processing of large tabular data by distributing work across multiple pandas DataFrames. As described in the documentation, "Dask DataFrames are a collection of many pandas DataFrames" with identical APIs, making the transition from pandas straightforward.
## Core Concept
A Dask DataFrame is divided into multiple pandas DataFrames (partitions) along the index:
- Each partition is a regular pandas DataFrame
- Operations are applied to each partition in parallel
- Results are combined automatically
## Key Capabilities
### Scale
- Process 100 GiB on a laptop
- Process 100 TiB on a cluster
- Handle datasets exceeding available RAM
### Compatibility
- Implements most of the pandas API
- Easy transition from pandas code
- Works with familiar operations
## When to Use Dask DataFrames
**Use Dask When**:
- Dataset exceeds available RAM
- Computations require significant time and pandas optimization hasn't helped
- Need to scale from prototype (pandas) to production (larger data)
- Working with multiple files that should be processed together
**Stick with Pandas When**:
- Data fits comfortably in memory
- Computations complete in subseconds
- Simple operations without custom `.apply()` functions
- Iterative development and exploration
## Reading Data
Dask mirrors pandas reading syntax with added support for multiple files:
### Single File
```python
import dask.dataframe as dd
# Read single file
ddf = dd.read_csv('data.csv')
ddf = dd.read_parquet('data.parquet')
```
### Multiple Files
```python
# Read multiple files using glob patterns
ddf = dd.read_csv('data/*.csv')
ddf = dd.read_parquet('s3://mybucket/data/*.parquet')
# Read with path structure
ddf = dd.read_parquet('data/year=*/month=*/day=*.parquet')
```
### Optimizations
```python
# Specify columns to read (reduces memory)
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2'])
# Control partitioning
ddf = dd.read_csv('data.csv', blocksize='64MB') # Creates 64MB partitions
```
## Common Operations
All operations are lazy until `.compute()` is called.
### Filtering
```python
# Same as pandas
filtered = ddf[ddf['column'] > 100]
filtered = ddf.query('column > 100')
```
### Column Operations
```python
# Add columns
ddf['new_column'] = ddf['col1'] + ddf['col2']
# Select columns
subset = ddf[['col1', 'col2', 'col3']]
# Drop columns
ddf = ddf.drop(columns=['unnecessary_col'])
```
### Aggregations
```python
# Standard aggregations work as expected
mean = ddf['column'].mean().compute()
sum_total = ddf['column'].sum().compute()
counts = ddf['category'].value_counts().compute()
```
### GroupBy
```python
# GroupBy operations (may require shuffle)
grouped = ddf.groupby('category')['value'].mean().compute()
# Multiple aggregations
agg_result = ddf.groupby('category').agg({
'value': ['mean', 'sum', 'count'],
'amount': 'sum'
}).compute()
```
### Joins and Merges
```python
# Merge DataFrames
merged = dd.merge(ddf1, ddf2, on='key', how='left')
# Join on index
joined = ddf1.join(ddf2, on='key')
```
### Sorting
```python
# Sorting (expensive operation, requires data movement)
sorted_ddf = ddf.sort_values('column')
result = sorted_ddf.compute()
```
## Custom Operations
### Apply Functions
**To Partitions (Efficient)**:
```python
# Apply function to entire partitions
def custom_partition_function(partition_df):
# partition_df is a pandas DataFrame
return partition_df.assign(new_col=partition_df['col1'] * 2)
ddf = ddf.map_partitions(custom_partition_function)
```
**To Rows (Less Efficient)**:
```python
# Apply to each row (creates many tasks)
ddf['result'] = ddf.apply(lambda row: custom_function(row), axis=1, meta=('result', 'float'))
```
**Note**: Always prefer `map_partitions` over row-wise `apply` for better performance.
### Meta Parameter
When Dask can't infer output structure, specify the `meta` parameter:
```python
# For apply operations
ddf['new'] = ddf.apply(func, axis=1, meta=('new', 'float64'))
# For map_partitions
ddf = ddf.map_partitions(func, meta=pd.DataFrame({
'col1': pd.Series(dtype='float64'),
'col2': pd.Series(dtype='int64')
}))
```
## Lazy Evaluation and Computation
### Lazy Operations
```python
# These operations are lazy (instant, no computation)
filtered = ddf[ddf['value'] > 100]
aggregated = filtered.groupby('category').mean()
final = aggregated[aggregated['value'] < 500]
# Nothing has computed yet
```
### Triggering Computation
```python
# Compute single result
result = final.compute()
# Compute multiple results efficiently
result1, result2, result3 = dask.compute(
operation1,
operation2,
operation3
)
```
### Persist in Memory
```python
# Keep results in distributed memory for reuse
ddf_cached = ddf.persist()
# Now multiple operations on ddf_cached won't recompute
result1 = ddf_cached.mean().compute()
result2 = ddf_cached.sum().compute()
```
## Index Management
### Setting Index
```python
# Set index (required for efficient joins and certain operations)
ddf = ddf.set_index('timestamp', sorted=True)
```
### Index Properties
- Sorted index enables efficient filtering and joins
- Index determines partitioning
- Some operations perform better with appropriate index
## Writing Results
### To Files
```python
# Write to multiple files (one per partition)
ddf.to_parquet('output/data.parquet')
ddf.to_csv('output/data-*.csv')
# Write to single file (forces computation and concatenation)
ddf.compute().to_csv('output/single_file.csv')
```
### To Memory (Pandas)
```python
# Convert to pandas (loads all data in memory)
pdf = ddf.compute()
```
## Performance Considerations
### Efficient Operations
- Column selection and filtering: Very efficient
- Simple aggregations (sum, mean, count): Efficient
- Row-wise operations on partitions: Efficient with `map_partitions`
### Expensive Operations
- Sorting: Requires data shuffle across workers
- GroupBy with many groups: May require shuffle
- Complex joins: Depends on data distribution
- Row-wise apply: Creates many tasks
### Optimization Tips
**1. Select Columns Early**
```python
# Better: Read only needed columns
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2'])
```
**2. Filter Before GroupBy**
```python
# Better: Reduce data before expensive operations
result = ddf[ddf['year'] == 2024].groupby('category').sum().compute()
```
**3. Use Efficient File Formats**
```python
# Use Parquet instead of CSV for better performance
ddf.to_parquet('data.parquet') # Faster, smaller, columnar
```
**4. Repartition Appropriately**
```python
# If partitions are too small
ddf = ddf.repartition(npartitions=10)
# If partitions are too large
ddf = ddf.repartition(partition_size='100MB')
```
## Common Patterns
### ETL Pipeline
```python
import dask.dataframe as dd
# Read data
ddf = dd.read_csv('raw_data/*.csv')
# Transform
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# Aggregate
summary = ddf.groupby('category').agg({
'amount': ['sum', 'mean'],
'quantity': 'count'
})
# Write results
summary.to_parquet('output/summary.parquet')
```
### Time Series Analysis
```python
# Read time series data
ddf = dd.read_parquet('timeseries/*.parquet')
# Set timestamp index
ddf = ddf.set_index('timestamp', sorted=True)
# Resample (if available in Dask version)
hourly = ddf.resample('1H').mean()
# Compute statistics
result = hourly.compute()
```
### Combining Multiple Files
```python
# Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# Process combined data
result = ddf.groupby('category')['value'].sum().compute()
```
## Limitations and Differences from Pandas
### Not All Pandas Features Available
Some pandas operations are not implemented in Dask:
- Some string methods
- Certain window functions
- Some specialized statistical functions
### Partitioning Matters
- Operations within partitions are efficient
- Cross-partition operations may be expensive
- Index-based operations benefit from sorted index
### Lazy Evaluation
- Operations don't execute until `.compute()`
- Need to be aware of computation triggers
- Can't inspect intermediate results without computing
## Debugging Tips
### Inspect Partitions
```python
# Get number of partitions
print(ddf.npartitions)
# Compute single partition
first_partition = ddf.get_partition(0).compute()
# View first few rows (computes first partition)
print(ddf.head())
```
### Validate Operations on Small Data
```python
# Test on small sample first
sample = ddf.head(1000)
# Validate logic works
# Then scale to full dataset
result = ddf.compute()
```
### Check Dtypes
```python
# Verify data types are correct
print(ddf.dtypes)
```

View File

@@ -0,0 +1,541 @@
# Dask Futures
## Overview
Dask futures extend Python's `concurrent.futures` interface, enabling immediate (non-lazy) task execution. Unlike delayed computations (used in DataFrames, Arrays, and Bags), futures provide more flexibility in situations where computations may evolve over time or require dynamic workflow construction.
## Core Concept
Futures represent real-time task execution:
- Tasks execute immediately when submitted (not lazy)
- Each future represents a remote computation result
- Automatic dependency tracking between futures
- Enables dynamic, evolving workflows
- Direct control over task scheduling and data placement
## Key Capabilities
### Real-Time Execution
- Tasks run immediately when submitted
- No need for explicit `.compute()` call
- Get results with `.result()` method
### Automatic Dependency Management
When you submit tasks with future inputs, Dask automatically handles dependency tracking. Once all input futures have completed, they will be moved onto a single worker for efficient computation.
### Dynamic Workflows
Build computations that evolve based on intermediate results:
- Submit new tasks based on previous results
- Conditional execution paths
- Iterative algorithms with varying structure
## When to Use Futures
**Use Futures When**:
- Building dynamic, evolving workflows
- Need immediate task execution (not lazy)
- Computations depend on runtime conditions
- Require fine control over task placement
- Implementing custom parallel algorithms
- Need stateful computations (with actors)
**Use Other Collections When**:
- Static, predefined computation graphs (use delayed, DataFrames, Arrays)
- Simple data parallelism on large collections (use Bags, DataFrames)
- Standard array/dataframe operations suffice
## Setting Up Client
Futures require a distributed client:
```python
from dask.distributed import Client
# Local cluster (on single machine)
client = Client()
# Or specify resources
client = Client(n_workers=4, threads_per_worker=2)
# Or connect to existing cluster
client = Client('scheduler-address:8786')
```
## Submitting Tasks
### Basic Submit
```python
from dask.distributed import Client
client = Client()
# Submit single task
def add(x, y):
return x + y
future = client.submit(add, 1, 2)
# Get result
result = future.result() # Blocks until complete
print(result) # 3
```
### Multiple Tasks
```python
# Submit multiple independent tasks
futures = []
for i in range(10):
future = client.submit(add, i, i)
futures.append(future)
# Gather results
results = client.gather(futures) # Efficient parallel gathering
```
### Map Over Inputs
```python
# Apply function to multiple inputs
def square(x):
return x ** 2
# Submit batch of tasks
futures = client.map(square, range(100))
# Gather results
results = client.gather(futures)
```
**Note**: Each task carries ~1ms overhead, making `map` less suitable for millions of tiny tasks. For massive datasets, use Bags or DataFrames instead.
## Working with Futures
### Check Status
```python
future = client.submit(expensive_function, arg)
# Check if complete
print(future.done()) # False or True
# Check status
print(future.status) # 'pending', 'running', 'finished', or 'error'
```
### Non-Blocking Result Retrieval
```python
# Non-blocking check
if future.done():
result = future.result()
else:
print("Still computing...")
# Or use callbacks
def handle_result(future):
print(f"Result: {future.result()}")
future.add_done_callback(handle_result)
```
### Error Handling
```python
def might_fail(x):
if x < 0:
raise ValueError("Negative value")
return x ** 2
future = client.submit(might_fail, -5)
try:
result = future.result()
except ValueError as e:
print(f"Task failed: {e}")
```
## Task Dependencies
### Automatic Dependency Tracking
```python
# Submit task
future1 = client.submit(add, 1, 2)
# Use future as input (creates dependency)
future2 = client.submit(add, future1, 10) # Depends on future1
# Chain dependencies
future3 = client.submit(add, future2, 100) # Depends on future2
# Get final result
result = future3.result() # 113
```
### Complex Dependencies
```python
# Multiple dependencies
a = client.submit(func1, x)
b = client.submit(func2, y)
c = client.submit(func3, a, b) # Depends on both a and b
result = c.result()
```
## Data Movement Optimization
### Scatter Data
Pre-scatter important data to avoid repeated transfers:
```python
# Upload data to cluster once
large_dataset = client.scatter(big_data) # Returns future
# Use scattered data in multiple tasks
futures = [client.submit(process, large_dataset, i) for i in range(100)]
# Each task uses the same scattered data without re-transfer
results = client.gather(futures)
```
### Efficient Gathering
Use `client.gather()` for concurrent result collection:
```python
# Better: Gather all at once (parallel)
results = client.gather(futures)
# Worse: Sequential result retrieval
results = [f.result() for f in futures]
```
## Fire-and-Forget
For side-effect tasks without needing the result:
```python
from dask.distributed import fire_and_forget
def log_to_database(data):
# Write to database, no return value needed
database.write(data)
# Submit without keeping reference
future = client.submit(log_to_database, data)
fire_and_forget(future)
# Dask won't abandon this computation even without active future reference
```
## Performance Characteristics
### Task Overhead
- ~1ms overhead per task
- Good for: Thousands of tasks
- Not suitable for: Millions of tiny tasks
### Worker-to-Worker Communication
- Direct worker-to-worker data transfer
- Roundtrip latency: ~1ms
- Efficient for task dependencies
### Memory Management
Dask tracks active futures locally. When a future is garbage collected by your local Python session, Dask will feel free to delete that data.
**Keep References**:
```python
# Keep reference to prevent deletion
important_result = client.submit(expensive_calc, data)
# Use result multiple times
future1 = client.submit(process1, important_result)
future2 = client.submit(process2, important_result)
```
## Advanced Coordination
### Distributed Primitives
**Queues**:
```python
from dask.distributed import Queue
queue = Queue()
def producer():
for i in range(10):
queue.put(i)
def consumer():
results = []
for _ in range(10):
results.append(queue.get())
return results
# Submit tasks
client.submit(producer)
result_future = client.submit(consumer)
results = result_future.result()
```
**Locks**:
```python
from dask.distributed import Lock
lock = Lock()
def critical_section():
with lock:
# Only one task executes this at a time
shared_resource.update()
```
**Events**:
```python
from dask.distributed import Event
event = Event()
def waiter():
event.wait() # Blocks until event is set
return "Event occurred"
def setter():
time.sleep(5)
event.set()
# Start both tasks
wait_future = client.submit(waiter)
set_future = client.submit(setter)
result = wait_future.result() # Waits for setter to complete
```
**Variables**:
```python
from dask.distributed import Variable
var = Variable('my-var')
# Set value
var.set(42)
# Get value from tasks
def reader():
return var.get()
future = client.submit(reader)
print(future.result()) # 42
```
## Actors
For stateful, rapidly-changing workflows, actors enable worker-to-worker roundtrip latency around 1ms while bypassing scheduler coordination.
### Creating Actors
```python
from dask.distributed import Client
client = Client()
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
def get_count(self):
return self.count
# Create actor on worker
counter = client.submit(Counter, actor=True).result()
# Call methods
future1 = counter.increment()
future2 = counter.increment()
result = counter.get_count().result()
print(result) # 2
```
### Actor Use Cases
- Stateful services (databases, caches)
- Rapidly changing state
- Complex coordination patterns
- Real-time streaming applications
## Common Patterns
### Embarrassingly Parallel Tasks
```python
from dask.distributed import Client
client = Client()
def process_item(item):
# Independent computation
return expensive_computation(item)
# Process many items in parallel
items = range(1000)
futures = client.map(process_item, items)
# Gather all results
results = client.gather(futures)
```
### Dynamic Task Submission
```python
def recursive_compute(data, depth):
if depth == 0:
return process(data)
# Split and recurse
left, right = split(data)
left_future = client.submit(recursive_compute, left, depth - 1)
right_future = client.submit(recursive_compute, right, depth - 1)
# Combine results
return combine(left_future.result(), right_future.result())
# Start computation
result_future = client.submit(recursive_compute, initial_data, 5)
result = result_future.result()
```
### Parameter Sweep
```python
from itertools import product
def run_simulation(param1, param2, param3):
# Run simulation with parameters
return simulate(param1, param2, param3)
# Generate parameter combinations
params = product(range(10), range(10), range(10))
# Submit all combinations
futures = [client.submit(run_simulation, p1, p2, p3) for p1, p2, p3 in params]
# Gather results as they complete
from dask.distributed import as_completed
for future in as_completed(futures):
result = future.result()
process_result(result)
```
### Pipeline with Dependencies
```python
# Stage 1: Load data
load_futures = [client.submit(load_data, file) for file in files]
# Stage 2: Process (depends on stage 1)
process_futures = [client.submit(process, f) for f in load_futures]
# Stage 3: Aggregate (depends on stage 2)
agg_future = client.submit(aggregate, process_futures)
# Get final result
result = agg_future.result()
```
### Iterative Algorithm
```python
# Initialize
state = client.scatter(initial_state)
# Iterate
for iteration in range(num_iterations):
# Compute update based on current state
state = client.submit(update_function, state)
# Check convergence
converged = client.submit(check_convergence, state)
if converged.result():
break
# Get final state
final_state = state.result()
```
## Best Practices
### 1. Pre-scatter Large Data
```python
# Upload once, use many times
large_data = client.scatter(big_dataset)
futures = [client.submit(process, large_data, i) for i in range(100)]
```
### 2. Use Gather for Bulk Retrieval
```python
# Efficient: Parallel gathering
results = client.gather(futures)
# Inefficient: Sequential
results = [f.result() for f in futures]
```
### 3. Manage Memory with References
```python
# Keep important futures
important = client.submit(expensive_calc, data)
# Use multiple times
f1 = client.submit(use_result, important)
f2 = client.submit(use_result, important)
# Clean up when done
del important
```
### 4. Handle Errors Appropriately
```python
futures = client.map(might_fail, inputs)
# Check for errors
results = []
errors = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
errors.append(e)
```
### 5. Use as_completed for Progressive Processing
```python
from dask.distributed import as_completed
futures = client.map(process, items)
# Process results as they arrive
for future in as_completed(futures):
result = future.result()
handle_result(result)
```
## Debugging Tips
### Monitor Dashboard
View the Dask dashboard to see:
- Task progress
- Worker utilization
- Memory usage
- Task dependencies
### Check Task Status
```python
# Inspect future
print(future.status)
print(future.done())
# Get traceback on error
try:
future.result()
except Exception:
print(future.traceback())
```
### Profile Tasks
```python
# Get performance data
client.profile(filename='profile.html')
```

View File

@@ -0,0 +1,504 @@
# Dask Schedulers
## Overview
Dask provides multiple task schedulers, each suited to different workloads. The scheduler determines how tasks are executed: sequentially, in parallel threads, in parallel processes, or distributed across a cluster.
## Scheduler Types
### Single-Machine Schedulers
#### 1. Local Threads (Default)
**Description**: The threaded scheduler executes computations with a local `concurrent.futures.ThreadPoolExecutor`.
**When to Use**:
- Numeric computations in NumPy, Pandas, scikit-learn
- Libraries that release the GIL (Global Interpreter Lock)
- Operations benefit from shared memory access
- Default for Dask Arrays and DataFrames
**Characteristics**:
- Low overhead
- Shared memory between threads
- Best for GIL-releasing operations
- Poor for pure Python code (GIL contention)
**Example**:
```python
import dask.array as da
# Uses threads by default
x = da.random.random((10000, 10000), chunks=(1000, 1000))
result = x.mean().compute() # Computed with threads
```
**Explicit Configuration**:
```python
import dask
# Set globally
dask.config.set(scheduler='threads')
# Or per-compute
result = x.mean().compute(scheduler='threads')
```
#### 2. Local Processes
**Description**: Multiprocessing scheduler that uses `concurrent.futures.ProcessPoolExecutor`.
**When to Use**:
- Pure Python code with GIL contention
- Text processing and Python collections
- Operations that benefit from process isolation
- CPU-bound Python code
**Characteristics**:
- Bypasses GIL limitations
- Incurs data transfer costs between processes
- Higher overhead than threads
- Ideal for linear workflows with small inputs/outputs
**Example**:
```python
import dask.bag as db
# Good for Python object processing
bag = db.read_text('data/*.txt')
result = bag.map(complex_python_function).compute(scheduler='processes')
```
**Explicit Configuration**:
```python
import dask
# Set globally
dask.config.set(scheduler='processes')
# Or per-compute
result = computation.compute(scheduler='processes')
```
**Limitations**:
- Data must be serializable (pickle)
- Overhead from process creation
- Memory overhead from data copying
#### 3. Single Thread (Synchronous)
**Description**: The single-threaded synchronous scheduler executes all computations in the local thread with no parallelism at all.
**When to Use**:
- Debugging with pdb
- Profiling with standard Python tools
- Understanding errors in detail
- Development and testing
**Characteristics**:
- No parallelism
- Easy debugging
- No overhead
- Deterministic execution
**Example**:
```python
import dask
# Enable for debugging
dask.config.set(scheduler='synchronous')
# Now can use pdb
result = computation.compute() # Runs in single thread
```
**Debugging with IPython**:
```python
# In IPython/Jupyter
%pdb on
dask.config.set(scheduler='synchronous')
result = problematic_computation.compute() # Drops into debugger on error
```
### Distributed Schedulers
#### 4. Local Distributed
**Description**: Despite its name, this scheduler runs effectively on personal machines using the distributed scheduler infrastructure.
**When to Use**:
- Need diagnostic dashboard
- Asynchronous APIs
- Better data locality handling than multiprocessing
- Development before scaling to cluster
- Want distributed features on single machine
**Characteristics**:
- Provides dashboard for monitoring
- Better memory management
- More overhead than threads/processes
- Can scale to cluster later
**Example**:
```python
from dask.distributed import Client
import dask.dataframe as dd
# Create local cluster
client = Client() # Automatically uses all cores
# Use distributed scheduler
ddf = dd.read_csv('data.csv')
result = ddf.groupby('category').mean().compute()
# View dashboard
print(client.dashboard_link)
# Clean up
client.close()
```
**Configuration Options**:
```python
# Control resources
client = Client(
n_workers=4,
threads_per_worker=2,
memory_limit='4GB'
)
```
#### 5. Cluster Distributed
**Description**: For scaling across multiple machines using the distributed scheduler.
**When to Use**:
- Data exceeds single machine capacity
- Need computational power beyond one machine
- Production deployments
- Cluster computing environments (HPC, cloud)
**Characteristics**:
- Scales to hundreds of machines
- Requires cluster setup
- Network communication overhead
- Advanced features (adaptive scaling, task prioritization)
**Example with Dask-Jobqueue (HPC)**:
```python
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
# Create cluster on HPC with SLURM
cluster = SLURMCluster(
cores=24,
memory='100GB',
walltime='02:00:00',
queue='regular'
)
# Scale to 10 jobs
cluster.scale(jobs=10)
# Connect client
client = Client(cluster)
# Run computation
result = computation.compute()
client.close()
```
**Example with Dask on Kubernetes**:
```python
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster()
cluster.scale(20) # 20 workers
client = Client(cluster)
result = computation.compute()
client.close()
```
## Scheduler Configuration
### Global Configuration
```python
import dask
# Set scheduler globally for session
dask.config.set(scheduler='threads')
dask.config.set(scheduler='processes')
dask.config.set(scheduler='synchronous')
```
### Context Manager
```python
import dask
# Temporarily use different scheduler
with dask.config.set(scheduler='processes'):
result = computation.compute()
# Back to default scheduler
result2 = computation2.compute()
```
### Per-Compute
```python
# Specify scheduler per compute call
result = computation.compute(scheduler='threads')
result = computation.compute(scheduler='processes')
result = computation.compute(scheduler='synchronous')
```
### Distributed Client
```python
from dask.distributed import Client
# Using client automatically sets distributed scheduler
client = Client()
# All computations use distributed scheduler
result = computation.compute()
client.close()
```
## Choosing the Right Scheduler
### Decision Matrix
| Workload Type | Recommended Scheduler | Rationale |
|--------------|----------------------|-----------|
| NumPy/Pandas operations | Threads (default) | GIL-releasing, shared memory |
| Pure Python objects | Processes | Avoids GIL contention |
| Text/log processing | Processes | Python-heavy operations |
| Debugging | Synchronous | Easy debugging, deterministic |
| Need dashboard | Local Distributed | Monitoring and diagnostics |
| Multi-machine | Cluster Distributed | Exceeds single machine capacity |
| Small data, quick tasks | Threads | Lowest overhead |
| Large data, single machine | Local Distributed | Better memory management |
### Performance Considerations
**Threads**:
- Overhead: ~10 µs per task
- Best for: Numeric operations
- Memory: Shared
- GIL: Affected by GIL
**Processes**:
- Overhead: ~10 ms per task
- Best for: Python operations
- Memory: Copied between processes
- GIL: Not affected
**Synchronous**:
- Overhead: ~1 µs per task
- Best for: Debugging
- Memory: No parallelism
- GIL: Not relevant
**Distributed**:
- Overhead: ~1 ms per task
- Best for: Complex workflows, monitoring
- Memory: Managed by scheduler
- GIL: Workers can use threads or processes
## Thread Configuration for Distributed Scheduler
### Setting Thread Count
```python
from dask.distributed import Client
# Control thread/worker configuration
client = Client(
n_workers=4, # Number of worker processes
threads_per_worker=2 # Threads per worker process
)
```
### Recommended Configuration
**For Numeric Workloads**:
- Aim for roughly 4 threads per process
- Balance between parallelism and overhead
- Example: 8 cores → 2 workers with 4 threads each
**For Python Workloads**:
- Use more workers with fewer threads
- Example: 8 cores → 8 workers with 1 thread each
### Environment Variables
```bash
# Set thread count via environment
export DASK_NUM_WORKERS=4
export DASK_THREADS_PER_WORKER=2
# Or via config file
```
## Common Patterns
### Development to Production
```python
# Development: Use local distributed for testing
from dask.distributed import Client
client = Client(processes=False) # In-process for debugging
# Production: Scale to cluster
from dask.distributed import Client
client = Client('scheduler-address:8786')
```
### Mixed Workloads
```python
import dask
import dask.dataframe as dd
# Use threads for DataFrame operations
ddf = dd.read_parquet('data.parquet')
result1 = ddf.mean().compute(scheduler='threads')
# Use processes for Python code
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(parse_log).compute(scheduler='processes')
```
### Debugging Workflow
```python
import dask
# Step 1: Debug with synchronous scheduler
dask.config.set(scheduler='synchronous')
result = problematic_computation.compute()
# Step 2: Test with threads
dask.config.set(scheduler='threads')
result = computation.compute()
# Step 3: Scale with distributed
from dask.distributed import Client
client = Client()
result = computation.compute()
```
## Monitoring and Diagnostics
### Dashboard Access (Distributed Only)
```python
from dask.distributed import Client
client = Client()
# Get dashboard URL
print(client.dashboard_link)
# Opens dashboard in browser showing:
# - Task progress
# - Worker status
# - Memory usage
# - Task stream
# - Resource utilization
```
### Performance Profiling
```python
# Profile computation
from dask.distributed import Client
client = Client()
result = computation.compute()
# Get performance report
client.profile(filename='profile.html')
```
### Resource Monitoring
```python
# Check worker info
client.scheduler_info()
# Get current tasks
client.who_has()
# Memory usage
client.run(lambda: psutil.virtual_memory().percent)
```
## Advanced Configuration
### Custom Executors
```python
from concurrent.futures import ThreadPoolExecutor
import dask
# Use custom thread pool
with ThreadPoolExecutor(max_workers=4) as executor:
dask.config.set(pool=executor)
result = computation.compute(scheduler='threads')
```
### Adaptive Scaling (Distributed)
```python
from dask.distributed import Client
client = Client()
# Enable adaptive scaling
client.cluster.adapt(minimum=2, maximum=10)
# Cluster scales based on workload
result = computation.compute()
```
### Worker Plugins
```python
from dask.distributed import Client, WorkerPlugin
class CustomPlugin(WorkerPlugin):
def setup(self, worker):
# Initialize worker-specific resources
worker.custom_resource = initialize_resource()
client = Client()
client.register_worker_plugin(CustomPlugin())
```
## Troubleshooting
### Slow Performance with Threads
**Problem**: Pure Python code slow with threaded scheduler
**Solution**: Switch to processes or distributed scheduler
### Memory Errors with Processes
**Problem**: Data too large to pickle/copy between processes
**Solution**: Use threaded or distributed scheduler
### Debugging Difficult
**Problem**: Can't use pdb with parallel schedulers
**Solution**: Use synchronous scheduler for debugging
### Task Overhead High
**Problem**: Many tiny tasks causing overhead
**Solution**: Use threaded scheduler (lowest overhead) or increase chunk sizes