Improve the scikit-learn skill

This commit is contained in:
Timothy Kassis
2025-11-04 10:11:46 -08:00
parent 63a4293f1a
commit 4ad4f9970f
10 changed files with 3293 additions and 3606 deletions

View File

@@ -1,291 +1,386 @@
#!/usr/bin/env python3
"""
Clustering analysis script with multiple algorithms and evaluation.
Demonstrates k-means, DBSCAN, and hierarchical clustering with visualization.
Clustering analysis example with multiple algorithms, evaluation, and visualization.
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.mixture import GaussianMixture
from sklearn.metrics import (
silhouette_score, calinski_harabasz_score, davies_bouldin_score
)
import warnings
warnings.filterwarnings('ignore')
def scale_data(X):
def preprocess_for_clustering(X, scale=True, pca_components=None):
"""
Scale features using StandardScaler.
ALWAYS scale data before clustering!
Preprocess data for clustering.
Args:
X: Feature matrix
Parameters:
-----------
X : array-like
Feature matrix
scale : bool
Whether to standardize features
pca_components : int or None
Number of PCA components (None to skip PCA)
Returns:
Scaled feature matrix and fitted scaler
--------
array
Preprocessed data
"""
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
return X_scaled, scaler
X_processed = X.copy()
if scale:
scaler = StandardScaler()
X_processed = scaler.fit_transform(X_processed)
if pca_components is not None:
pca = PCA(n_components=pca_components)
X_processed = pca.fit_transform(X_processed)
print(f"PCA: Explained variance ratio = {pca.explained_variance_ratio_.sum():.3f}")
return X_processed
def find_optimal_k(X_scaled, k_range=range(2, 11)):
def find_optimal_k_kmeans(X, k_range=range(2, 11)):
"""
Find optimal number of clusters using elbow method and silhouette score.
Find optimal K for K-Means using elbow method and silhouette score.
Args:
X_scaled: Scaled feature matrix
k_range: Range of k values to try
Parameters:
-----------
X : array-like
Feature matrix (should be scaled)
k_range : range
Range of K values to test
Returns:
Dictionary with inertias and silhouette scores
--------
dict
Dictionary with inertia and silhouette scores for each K
"""
inertias = []
silhouette_scores = []
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(X_scaled)
labels = kmeans.fit_predict(X)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(X_scaled, labels))
silhouette_scores.append(silhouette_score(X, labels))
# Plot results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# Elbow plot
ax1.plot(k_range, inertias, 'bo-')
ax1.set_xlabel('Number of clusters (K)')
ax1.set_ylabel('Inertia')
ax1.set_title('Elbow Method')
ax1.grid(True)
# Silhouette plot
ax2.plot(k_range, silhouette_scores, 'ro-')
ax2.set_xlabel('Number of clusters (K)')
ax2.set_ylabel('Silhouette Score')
ax2.set_title('Silhouette Analysis')
ax2.grid(True)
plt.tight_layout()
plt.savefig('clustering_optimization.png', dpi=300, bbox_inches='tight')
print("Saved: clustering_optimization.png")
plt.close()
# Find best K based on silhouette score
best_k = k_range[np.argmax(silhouette_scores)]
print(f"\nRecommended K based on silhouette score: {best_k}")
return {
'k_values': list(k_range),
'inertias': inertias,
'silhouette_scores': silhouette_scores
'silhouette_scores': silhouette_scores,
'best_k': best_k
}
def plot_elbow_silhouette(results):
def compare_clustering_algorithms(X, n_clusters=3):
"""
Plot elbow method and silhouette scores.
Compare different clustering algorithms.
Args:
results: Dictionary from find_optimal_k
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
# Elbow plot
ax1.plot(results['k_values'], results['inertias'], 'bo-')
ax1.set_xlabel('Number of clusters (k)')
ax1.set_ylabel('Inertia')
ax1.set_title('Elbow Method')
ax1.grid(True, alpha=0.3)
# Silhouette plot
ax2.plot(results['k_values'], results['silhouette_scores'], 'ro-')
ax2.set_xlabel('Number of clusters (k)')
ax2.set_ylabel('Silhouette Score')
ax2.set_title('Silhouette Score vs k')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('elbow_silhouette.png', dpi=300, bbox_inches='tight')
print("Saved elbow and silhouette plots to 'elbow_silhouette.png'")
plt.close()
def evaluate_clustering(X_scaled, labels, algorithm_name):
"""
Evaluate clustering using multiple metrics.
Args:
X_scaled: Scaled feature matrix
labels: Cluster labels
algorithm_name: Name of clustering algorithm
Parameters:
-----------
X : array-like
Feature matrix (should be scaled)
n_clusters : int
Number of clusters
Returns:
Dictionary with evaluation metrics
--------
dict
Dictionary with results for each algorithm
"""
# Filter out noise points for DBSCAN (-1 labels)
mask = labels != -1
X_filtered = X_scaled[mask]
labels_filtered = labels[mask]
print("="*60)
print(f"Comparing Clustering Algorithms (n_clusters={n_clusters})")
print("="*60)
n_clusters = len(set(labels_filtered))
n_noise = list(labels).count(-1)
results = {
'algorithm': algorithm_name,
'n_clusters': n_clusters,
'n_noise': n_noise
algorithms = {
'K-Means': KMeans(n_clusters=n_clusters, random_state=42, n_init=10),
'Agglomerative': AgglomerativeClustering(n_clusters=n_clusters, linkage='ward'),
'Gaussian Mixture': GaussianMixture(n_components=n_clusters, random_state=42)
}
# Calculate metrics if we have valid clusters
if n_clusters > 1:
results['silhouette'] = silhouette_score(X_filtered, labels_filtered)
results['davies_bouldin'] = davies_bouldin_score(X_filtered, labels_filtered)
results['calinski_harabasz'] = calinski_harabasz_score(X_filtered, labels_filtered)
# DBSCAN doesn't require n_clusters
# We'll add it separately
dbscan = DBSCAN(eps=0.5, min_samples=5)
dbscan_labels = dbscan.fit_predict(X)
results = {}
for name, algorithm in algorithms.items():
labels = algorithm.fit_predict(X)
# Calculate metrics
silhouette = silhouette_score(X, labels)
calinski = calinski_harabasz_score(X, labels)
davies = davies_bouldin_score(X, labels)
results[name] = {
'labels': labels,
'n_clusters': n_clusters,
'silhouette': silhouette,
'calinski_harabasz': calinski,
'davies_bouldin': davies
}
print(f"\n{name}:")
print(f" Silhouette Score: {silhouette:.4f} (higher is better)")
print(f" Calinski-Harabasz: {calinski:.4f} (higher is better)")
print(f" Davies-Bouldin: {davies:.4f} (lower is better)")
# DBSCAN results
n_clusters_dbscan = len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0)
n_noise = list(dbscan_labels).count(-1)
if n_clusters_dbscan > 1:
# Only calculate metrics if we have multiple clusters
mask = dbscan_labels != -1 # Exclude noise
if mask.sum() > 0:
silhouette = silhouette_score(X[mask], dbscan_labels[mask])
calinski = calinski_harabasz_score(X[mask], dbscan_labels[mask])
davies = davies_bouldin_score(X[mask], dbscan_labels[mask])
results['DBSCAN'] = {
'labels': dbscan_labels,
'n_clusters': n_clusters_dbscan,
'n_noise': n_noise,
'silhouette': silhouette,
'calinski_harabasz': calinski,
'davies_bouldin': davies
}
print(f"\nDBSCAN:")
print(f" Clusters found: {n_clusters_dbscan}")
print(f" Noise points: {n_noise}")
print(f" Silhouette Score: {silhouette:.4f} (higher is better)")
print(f" Calinski-Harabasz: {calinski:.4f} (higher is better)")
print(f" Davies-Bouldin: {davies:.4f} (lower is better)")
else:
results['silhouette'] = None
results['davies_bouldin'] = None
results['calinski_harabasz'] = None
print(f"\nDBSCAN:")
print(f" Clusters found: {n_clusters_dbscan}")
print(f" Noise points: {n_noise}")
print(" Note: Insufficient clusters for metric calculation")
return results
def perform_kmeans(X_scaled, n_clusters=3):
def visualize_clusters(X, results, true_labels=None):
"""
Perform k-means clustering.
Visualize clustering results using PCA for 2D projection.
Args:
X_scaled: Scaled feature matrix
n_clusters: Number of clusters
Returns:
Fitted KMeans model and labels
Parameters:
-----------
X : array-like
Feature matrix
results : dict
Dictionary with clustering results
true_labels : array-like or None
True labels (if available) for comparison
"""
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(X_scaled)
return kmeans, labels
# Reduce to 2D using PCA
pca = PCA(n_components=2)
X_2d = pca.fit_transform(X)
# Determine number of subplots
n_plots = len(results)
if true_labels is not None:
n_plots += 1
def perform_dbscan(X_scaled, eps=0.5, min_samples=5):
"""
Perform DBSCAN clustering.
n_cols = min(3, n_plots)
n_rows = (n_plots + n_cols - 1) // n_cols
Args:
X_scaled: Scaled feature matrix
eps: Maximum distance between neighbors
min_samples: Minimum points to form dense region
fig, axes = plt.subplots(n_rows, n_cols, figsize=(5*n_cols, 4*n_rows))
if n_plots == 1:
axes = np.array([axes])
axes = axes.flatten()
Returns:
Fitted DBSCAN model and labels
"""
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
labels = dbscan.fit_predict(X_scaled)
return dbscan, labels
plot_idx = 0
# Plot true labels if available
if true_labels is not None:
ax = axes[plot_idx]
scatter = ax.scatter(X_2d[:, 0], X_2d[:, 1], c=true_labels, cmap='viridis', alpha=0.6)
ax.set_title('True Labels')
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
plt.colorbar(scatter, ax=ax)
plot_idx += 1
def perform_hierarchical(X_scaled, n_clusters=3, linkage='ward'):
"""
Perform hierarchical clustering.
# Plot clustering results
for name, result in results.items():
ax = axes[plot_idx]
labels = result['labels']
Args:
X_scaled: Scaled feature matrix
n_clusters: Number of clusters
linkage: Linkage criterion ('ward', 'complete', 'average', 'single')
scatter = ax.scatter(X_2d[:, 0], X_2d[:, 1], c=labels, cmap='viridis', alpha=0.6)
Returns:
Fitted AgglomerativeClustering model and labels
"""
hierarchical = AgglomerativeClustering(n_clusters=n_clusters, linkage=linkage)
labels = hierarchical.fit_predict(X_scaled)
return hierarchical, labels
# Highlight noise points for DBSCAN
if name == 'DBSCAN' and -1 in labels:
noise_mask = labels == -1
ax.scatter(X_2d[noise_mask, 0], X_2d[noise_mask, 1],
c='red', marker='x', s=100, label='Noise', alpha=0.8)
ax.legend()
title = f"{name} (K={result['n_clusters']})"
if 'silhouette' in result:
title += f"\nSilhouette: {result['silhouette']:.3f}"
ax.set_title(title)
ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
plt.colorbar(scatter, ax=ax)
def visualize_clusters_2d(X_scaled, labels, algorithm_name, method='pca'):
"""
Visualize clusters in 2D using PCA or t-SNE.
plot_idx += 1
Args:
X_scaled: Scaled feature matrix
labels: Cluster labels
algorithm_name: Name of algorithm for title
method: 'pca' or 'tsne'
"""
# Reduce to 2D
if method == 'pca':
pca = PCA(n_components=2, random_state=42)
X_2d = pca.fit_transform(X_scaled)
variance = pca.explained_variance_ratio_
xlabel = f'PC1 ({variance[0]:.1%} variance)'
ylabel = f'PC2 ({variance[1]:.1%} variance)'
else:
from sklearn.manifold import TSNE
# Use PCA first to speed up t-SNE
pca = PCA(n_components=min(50, X_scaled.shape[1]), random_state=42)
X_pca = pca.fit_transform(X_scaled)
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
X_2d = tsne.fit_transform(X_pca)
xlabel = 't-SNE 1'
ylabel = 't-SNE 2'
# Hide unused subplots
for idx in range(plot_idx, len(axes)):
axes[idx].axis('off')
# Plot
plt.figure(figsize=(10, 8))
scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], c=labels, cmap='viridis', alpha=0.6, s=50)
plt.colorbar(scatter, label='Cluster')
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.title(f'{algorithm_name} Clustering ({method.upper()})')
plt.grid(True, alpha=0.3)
filename = f'{algorithm_name.lower().replace(" ", "_")}_{method}.png'
plt.savefig(filename, dpi=300, bbox_inches='tight')
print(f"Saved visualization to '{filename}'")
plt.tight_layout()
plt.savefig('clustering_results.png', dpi=300, bbox_inches='tight')
print("\nSaved: clustering_results.png")
plt.close()
def main():
def complete_clustering_analysis(X, true_labels=None, scale=True,
find_k=True, k_range=range(2, 11), n_clusters=3):
"""
Example clustering analysis workflow.
Complete clustering analysis workflow.
Parameters:
-----------
X : array-like
Feature matrix
true_labels : array-like or None
True labels (for comparison only, not used in clustering)
scale : bool
Whether to scale features
find_k : bool
Whether to search for optimal K
k_range : range
Range of K values to test
n_clusters : int
Number of clusters to use in comparison
Returns:
--------
dict
Dictionary with all analysis results
"""
# Load your data here
# X = load_data()
print("="*60)
print("Clustering Analysis")
print("="*60)
print(f"Data shape: {X.shape}")
# Example with synthetic data
from sklearn.datasets import make_blobs
X, y_true = make_blobs(
n_samples=500,
n_features=10,
centers=4,
cluster_std=1.0,
random_state=42
)
# Preprocess data
X_processed = preprocess_for_clustering(X, scale=scale)
print(f"Dataset shape: {X.shape}")
# Find optimal K if requested
optimization_results = None
if find_k:
print("\n" + "="*60)
print("Finding Optimal Number of Clusters")
print("="*60)
optimization_results = find_optimal_k_kmeans(X_processed, k_range=k_range)
# Scale data (ALWAYS scale for clustering!)
print("\nScaling data...")
X_scaled, scaler = scale_data(X)
# Use recommended K
if optimization_results:
n_clusters = optimization_results['best_k']
# Find optimal k
print("\nFinding optimal number of clusters...")
results = find_optimal_k(X_scaled)
plot_elbow_silhouette(results)
# Compare clustering algorithms
comparison_results = compare_clustering_algorithms(X_processed, n_clusters=n_clusters)
# Based on elbow/silhouette, choose optimal k
optimal_k = 4 # Adjust based on plots
# Perform k-means
print(f"\nPerforming k-means with k={optimal_k}...")
kmeans, kmeans_labels = perform_kmeans(X_scaled, n_clusters=optimal_k)
kmeans_results = evaluate_clustering(X_scaled, kmeans_labels, 'K-Means')
# Perform DBSCAN
print("\nPerforming DBSCAN...")
dbscan, dbscan_labels = perform_dbscan(X_scaled, eps=0.5, min_samples=5)
dbscan_results = evaluate_clustering(X_scaled, dbscan_labels, 'DBSCAN')
# Perform hierarchical clustering
print("\nPerforming hierarchical clustering...")
hierarchical, hier_labels = perform_hierarchical(X_scaled, n_clusters=optimal_k)
hier_results = evaluate_clustering(X_scaled, hier_labels, 'Hierarchical')
# Print results
# Visualize results
print("\n" + "="*60)
print("CLUSTERING RESULTS")
print("Visualizing Results")
print("="*60)
visualize_clusters(X_processed, comparison_results, true_labels=true_labels)
return {
'X_processed': X_processed,
'optimization': optimization_results,
'comparison': comparison_results
}
# Example usage
if __name__ == "__main__":
from sklearn.datasets import load_iris, make_blobs
print("="*60)
print("Example 1: Iris Dataset")
print("="*60)
for results in [kmeans_results, dbscan_results, hier_results]:
print(f"\n{results['algorithm']}:")
print(f" Clusters: {results['n_clusters']}")
if results['n_noise'] > 0:
print(f" Noise points: {results['n_noise']}")
if results['silhouette']:
print(f" Silhouette Score: {results['silhouette']:.3f}")
print(f" Davies-Bouldin Index: {results['davies_bouldin']:.3f} (lower is better)")
print(f" Calinski-Harabasz Index: {results['calinski_harabasz']:.1f} (higher is better)")
# Load Iris dataset
iris = load_iris()
X_iris = iris.data
y_iris = iris.target
# Visualize clusters
print("\nCreating visualizations...")
visualize_clusters_2d(X_scaled, kmeans_labels, 'K-Means', method='pca')
visualize_clusters_2d(X_scaled, dbscan_labels, 'DBSCAN', method='pca')
visualize_clusters_2d(X_scaled, hier_labels, 'Hierarchical', method='pca')
results_iris = complete_clustering_analysis(
X_iris,
true_labels=y_iris,
scale=True,
find_k=True,
k_range=range(2, 8),
n_clusters=3
)
print("\nClustering analysis complete!")
print("\n" + "="*60)
print("Example 2: Synthetic Dataset with Noise")
print("="*60)
# Create synthetic dataset
X_synth, y_synth = make_blobs(
n_samples=500, n_features=2, centers=4,
cluster_std=0.5, random_state=42
)
if __name__ == "__main__":
main()
# Add noise points
noise = np.random.randn(50, 2) * 3
X_synth = np.vstack([X_synth, noise])
y_synth_with_noise = np.concatenate([y_synth, np.full(50, -1)])
results_synth = complete_clustering_analysis(
X_synth,
true_labels=y_synth_with_noise,
scale=True,
find_k=True,
k_range=range(2, 8),
n_clusters=4
)
print("\n" + "="*60)
print("Analysis Complete!")
print("="*60)