Skip to main content

Batch Processing Multiple Samples

Sequential Processing

from pathlib import Path
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
import pandas as pd

config = Config.default()
pipeline = ComprehensivePipeline(config)

# Process multiple samples
data_dir = Path('data/samples/')
results_summary = []

for sample_file in data_dir.glob('sample_*.h5ad'):
    sample_name = sample_file.stem
    print(f"\n{'='*60}")
    print(f"Processing: {sample_name}")
    print('='*60)
    
    output_dir = f'results/{sample_name}/'
    
    try:
        results = pipeline.run(str(sample_file), output_dir)
        
        # Extract summary statistics
        adata = results['adata']
        summary = {
            'sample': sample_name,
            'n_cells': adata.n_obs,
            'n_genes': adata.n_vars,
            'n_clusters': adata.obs['leiden'].nunique(),
            'status': 'success'
        }
        
        if 'chamber' in adata.obs:
            chamber_dist = adata.obs['chamber'].value_counts().to_dict()
            summary.update(chamber_dist)
        
        results_summary.append(summary)
        print(f"✓ Completed: {sample_name}")
        
    except Exception as e:
        print(f"✗ Failed: {sample_name}")
        print(f"   Error: {e}")
        results_summary.append({
            'sample': sample_name,
            'status': 'failed',
            'error': str(e)
        })

# Save batch summary
summary_df = pd.DataFrame(results_summary)
summary_df.to_csv('results/batch_summary.csv', index=False)
print("\n" + "="*60)
print("Batch processing complete!")
print(summary_df.to_string())

Parallel Processing

from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
import multiprocessing as mp

def process_single_sample(sample_path, config_dict):
    """Process a single sample (must be picklable for multiprocessing)"""
    from heartmap import Config
    from heartmap.pipelines import ComprehensivePipeline
    
    # Reconstruct config from dict
    config = Config.from_dict(config_dict)
    pipeline = ComprehensivePipeline(config)
    
    sample_name = Path(sample_path).stem
    output_dir = f'results/{sample_name}/'
    
    try:
        results = pipeline.run(sample_path, output_dir)
        return {
            'sample': sample_name,
            'status': 'success',
            'n_cells': results['adata'].n_obs,
            'n_genes': results['adata'].n_vars
        }
    except Exception as e:
        return {
            'sample': sample_name,
            'status': 'failed',
            'error': str(e)
        }

# Parallel execution
config = Config.default()
config_dict = config.to_dict()

data_files = list(Path('data/samples/').glob('*.h5ad'))
n_workers = min(4, mp.cpu_count() - 1)  # Leave one CPU free

print(f"Processing {len(data_files)} samples with {n_workers} workers...")

with ProcessPoolExecutor(max_workers=n_workers) as executor:
    # Submit all jobs
    futures = {
        executor.submit(process_single_sample, str(f), config_dict): f.name
        for f in data_files
    }
    
    # Collect results as they complete
    results = []
    for future in as_completed(futures):
        sample_name = futures[future]
        try:
            result = future.result()
            results.append(result)
            print(f"✓ Completed: {result['sample']}")
        except Exception as e:
            print(f"✗ Failed: {sample_name} - {e}")

# Save summary
import pandas as pd
df = pd.DataFrame(results)
df.to_csv('results/parallel_batch_summary.csv', index=False)
print("\nParallel processing complete!")
print(df)

Multi-Sample Integration

Combining Multiple Datasets

import scanpy as sc
import pandas as pd
from pathlib import Path

# Load all processed samples
samples = []
for sample_dir in Path('results/').glob('sample_*/'):
    adata_path = sample_dir / 'heartmap_complete.h5ad'
    if adata_path.exists():
        adata = sc.read_h5ad(adata_path)
        adata.obs['sample_id'] = sample_dir.name
        samples.append(adata)

print(f"Loading {len(samples)} samples...")

# Concatenate samples
adata_combined = sc.concat(samples, label='batch', keys=[s.obs['sample_id'].iloc[0] for s in samples])
print(f"Combined: {adata_combined.n_obs} cells × {adata_combined.n_vars} genes")

# Batch correction
print("Performing batch correction...")
sc.pp.combat(adata_combined, key='batch')

# Re-analyze integrated data
print("Re-clustering integrated data...")
sc.pp.highly_variable_genes(adata_combined, n_top_genes=3000)
sc.tl.pca(adata_combined)
sc.pp.neighbors(adata_combined)
sc.tl.leiden(adata_combined, resolution=0.8)
sc.tl.umap(adata_combined)

# Visualize batch effects
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(16, 6))
sc.pl.umap(adata_combined, color='batch', ax=axes[0], show=False, title='By Sample')
sc.pl.umap(adata_combined, color='leiden', ax=axes[1], show=False, title='By Cluster')
plt.tight_layout()
plt.savefig('results/integrated_analysis.png', dpi=300)

# Save integrated data
adata_combined.write('results/integrated_all_samples.h5ad')
print("Integration complete!")

Cross-Sample Comparison

import scanpy as sc
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Load integrated data
adata = sc.read_h5ad('results/integrated_all_samples.h5ad')

# Compare cell type proportions across samples
cell_type_props = pd.crosstab(
    adata.obs['sample_id'],
    adata.obs['leiden'],
    normalize='index'
)

plt.figure(figsize=(14, 6))
sns.heatmap(cell_type_props, annot=True, fmt='.2%', cmap='YlOrRd')
plt.title('Cell Type Proportions by Sample')
plt.xlabel('Cluster')
plt.ylabel('Sample')
plt.tight_layout()
plt.savefig('results/cell_type_comparison.png', dpi=300)
plt.close()

# Statistical comparison
from scipy.stats import chi2_contingency

contingency = pd.crosstab(adata.obs['sample_id'], adata.obs['leiden'])
chi2, pval, dof, expected = chi2_contingency(contingency)

print(f"Chi-square test for cell type distribution:")
print(f"  χ² = {chi2:.2f}, p-value = {pval:.2e}")
if pval < 0.05:
    print("  → Significant difference in cell type distribution across samples")
else:
    print("  → No significant difference in cell type distribution")

# Differential expression between samples
print("\nPerforming differential expression between samples...")
for cluster in adata.obs['leiden'].unique():
    cluster_data = adata[adata.obs['leiden'] == cluster].copy()
    
    if cluster_data.obs['sample_id'].nunique() > 1:
        sc.tl.rank_genes_groups(
            cluster_data,
            groupby='sample_id',
            method='wilcoxon',
            key_added=f'de_sample_cluster{cluster}'
        )
        
        # Save top DE genes
        de_results = sc.get.rank_genes_groups_df(
            cluster_data,
            group=None,
            key=f'de_sample_cluster{cluster}'
        )
        de_results.to_csv(f'results/de_cluster{cluster}_samples.csv', index=False)

print("Cross-sample comparison complete!")

Comparative Analysis Workflows

Healthy vs Disease Comparison

import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import mannwhitneyu

# Load datasets
print("Loading healthy and disease datasets...")
healthy = sc.read_h5ad('data/healthy_heart.h5ad')
disease = sc.read_h5ad('data/diseased_heart.h5ad')

# Add condition labels
healthy.obs['condition'] = 'healthy'
disease.obs['condition'] = 'disease'

# Combine for integrated analysis
adata_combined = sc.concat([healthy, disease], label='condition')

# Process combined data
print("Processing combined data...")
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline

config = Config.default()
pipeline = ComprehensivePipeline(config)

# Save temporarily and process
adata_combined.write('temp_combined.h5ad')
results = pipeline.run('temp_combined.h5ad', 'results/comparison/')
adata = results['adata']

# Compare cell type proportions
print("\nComparing cell type proportions...")
cell_type_comparison = pd.crosstab(
    adata.obs['leiden'],
    adata.obs['condition'],
    normalize='columns'
)

fig, ax = plt.subplots(figsize=(10, 6))
cell_type_comparison.plot(kind='bar', ax=ax)
ax.set_ylabel('Proportion')
ax.set_xlabel('Cell Type (Cluster)')
ax.set_title('Cell Type Distribution: Healthy vs Disease')
ax.legend(title='Condition')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('results/comparison/cell_type_props.png', dpi=300)

# Differential expression analysis
print("\nPerforming differential expression analysis...")
sc.tl.rank_genes_groups(
    adata,
    groupby='condition',
    groups=['disease'],
    reference='healthy',
    method='wilcoxon',
    key_added='disease_vs_healthy'
)

# Extract and save DE genes
de_genes = sc.get.rank_genes_groups_df(adata, group='disease', key='disease_vs_healthy')
de_genes_sig = de_genes[de_genes['pvals_adj'] < 0.05]

print(f"Found {len(de_genes_sig)} significantly DE genes")
de_genes_sig.to_csv('results/comparison/de_genes_disease_vs_healthy.csv', index=False)

# Plot top DE genes
top_de = de_genes_sig.head(20)['names'].tolist()
sc.pl.heatmap(
    adata,
    var_names=top_de,
    groupby='condition',
    cmap='RdBu_r',
    save='_top_de_genes_comparison.png'
)

# Pathway analysis (simplified)
print("\nAnalyzing disease-enriched pathways...")
upregulated = de_genes_sig[de_genes_sig['scores'] > 0]['names'].tolist()
downregulated = de_genes_sig[de_genes_sig['scores'] < 0]['names'].tolist()

print(f"Upregulated in disease: {len(upregulated)} genes")
print(f"Downregulated in disease: {len(downregulated)} genes")

# Save gene lists for external pathway analysis
with open('results/comparison/upregulated_disease.txt', 'w') as f:
    f.write('\n'.join(upregulated))

with open('results/comparison/downregulated_disease.txt', 'w') as f:
    f.write('\n'.join(downregulated))

print("\nComparative analysis complete!")

Chamber-Specific Disease Effects

import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load data with chamber and condition info
adata = sc.read_h5ad('results/comparison/heartmap_complete.h5ad')

if 'chamber' not in adata.obs or 'condition' not in adata.obs:
    raise ValueError("Data must have both 'chamber' and 'condition' columns")

chambers = ['RA', 'RV', 'LA', 'LV']

# Chamber-specific DE analysis
print("Analyzing chamber-specific disease effects...")

chamber_de_results = {}
for chamber in chambers:
    print(f"\nAnalyzing {chamber}...")
    
    # Subset to this chamber
    chamber_data = adata[adata.obs['chamber'] == chamber].copy()
    
    if chamber_data.obs['condition'].nunique() < 2:
        print(f"  Skipping {chamber} - only one condition present")
        continue
    
    # DE analysis
    sc.tl.rank_genes_groups(
        chamber_data,
        groupby='condition',
        groups=['disease'],
        reference='healthy',
        method='wilcoxon',
        key_added='de'
    )
    
    # Extract results
    de_df = sc.get.rank_genes_groups_df(chamber_data, group='disease', key='de')
    de_df_sig = de_df[de_df['pvals_adj'] < 0.05]
    
    chamber_de_results[chamber] = de_df_sig
    
    print(f"  Found {len(de_df_sig)} DE genes in {chamber}")
    
    # Save chamber-specific results
    de_df_sig.to_csv(f'results/comparison/de_{chamber}_disease_vs_healthy.csv', index=False)

# Compare DE genes across chambers
print("\nComparing DE genes across chambers...")

# Venn diagram data
de_gene_sets = {chamber: set(df['names'].tolist()) for chamber, df in chamber_de_results.items()}

# Shared vs unique genes
all_de_genes = set.union(*de_gene_sets.values()) if de_gene_sets else set()
shared_genes = set.intersection(*de_gene_sets.values()) if len(de_gene_sets) > 1 else set()

print(f"Total unique DE genes: {len(all_de_genes)}")
print(f"Shared across all chambers: {len(shared_genes)}")

for chamber, genes in de_gene_sets.items():
    unique = genes - set.union(*(s for c, s in de_gene_sets.items() if c != chamber))
    print(f"  {chamber} unique: {len(unique)}")

# Create comparison matrix
comparison_matrix = pd.DataFrame(index=chambers, columns=chambers)
for c1 in chambers:
    for c2 in chambers:
        if c1 in de_gene_sets and c2 in de_gene_sets:
            overlap = len(de_gene_sets[c1] & de_gene_sets[c2])
            union = len(de_gene_sets[c1] | de_gene_sets[c2])
            jaccard = overlap / union if union > 0 else 0
            comparison_matrix.loc[c1, c2] = jaccard

plt.figure(figsize=(10, 8))
sns.heatmap(comparison_matrix.astype(float), annot=True, fmt='.2f', cmap='YlOrRd')
plt.title('DE Gene Overlap Between Chambers (Jaccard Index)')
plt.tight_layout()
plt.savefig('results/comparison/chamber_de_overlap.png', dpi=300)

print("\nChamber-specific analysis complete!")

Integration with External Tools

CellPhoneDB Integration

import scanpy as sc
import pandas as pd
from pathlib import Path

# Load HeartMAP results
adata = sc.read_h5ad('results/heartmap_complete.h5ad')

# Prepare CellPhoneDB input files
print("Preparing CellPhoneDB input files...")

# 1. Metadata file
meta = pd.DataFrame({
    'Cell': adata.obs_names,
    'cell_type': adata.obs['leiden'].astype(str)
})
meta.to_csv('cellphonedb_input/meta.txt', sep='\t', index=False)

# 2. Counts matrix (normalized)
if adata.raw is not None:
    counts = pd.DataFrame(
        adata.raw.X.toarray() if hasattr(adata.raw.X, 'toarray') else adata.raw.X,
        index=adata.obs_names,
        columns=adata.raw.var_names
    ).T
else:
    counts = pd.DataFrame(
        adata.X.toarray() if hasattr(adata.X, 'toarray') else adata.X,
        index=adata.obs_names,
        columns=adata.var_names
    ).T

counts.index.name = 'Gene'
counts.to_csv('cellphonedb_input/counts.txt', sep='\t')

print("Files prepared for CellPhoneDB:")
print("  - cellphonedb_input/meta.txt")
print("  - cellphonedb_input/counts.txt")
print("\nRun CellPhoneDB with:")
print("  cellphonedb method statistical_analysis \\")
print("    cellphonedb_input/meta.txt \\")
print("    cellphonedb_input/counts.txt \\")
print("    --output-path cellphonedb_results/")

Seurat Integration

import scanpy as sc
import anndata as ad

# Load HeartMAP processed data
adata = sc.read_h5ad('results/heartmap_complete.h5ad')

# Convert to Seurat-compatible format
print("Converting to Seurat format...")

# Save as h5ad with specific structure for SeuratDisk
adata_seurat = adata.copy()

# Ensure all required layers
if adata_seurat.raw is not None:
    adata_seurat.layers['counts'] = adata_seurat.raw.X

adata_seurat.write('seurat_input/heartmap_for_seurat.h5ad')

print("\nConversion complete! Use in R with:")
print("""library(Seurat)
library(SeuratDisk)

# Load h5ad file
Convert('heartmap_for_seurat.h5ad', dest='h5seurat', overwrite=TRUE)
seurat_obj <- LoadH5Seurat('heartmap_for_seurat.h5seurat')

# Continue with Seurat analysis
DimPlot(seurat_obj, reduction='umap', group.by='leiden')
""")

SCENIC Integration

import scanpy as sc
import pandas as pd
import numpy as np
from pathlib import Path

# Load HeartMAP results
adata = sc.read_h5ad('results/heartmap_complete.h5ad')

# Prepare loom file for SCENIC
print("Preparing data for SCENIC...")

# SCENIC requires raw counts
if adata.raw is not None:
    counts_adata = adata.raw.to_adata()
else:
    counts_adata = adata.copy()

# Add embeddings and metadata
counts_adata.obsm['X_umap'] = adata.obsm['X_umap']
counts_adata.obs['leiden'] = adata.obs['leiden']
if 'chamber' in adata.obs:
    counts_adata.obs['chamber'] = adata.obs['chamber']

# Export to loom for SCENIC
scenic_dir = Path('scenic_input/')
scenic_dir.mkdir(exist_ok=True)

counts_adata.write_loom(scenic_dir / 'heartmap_for_scenic.loom')

print("\nLoom file created for SCENIC!")
print("\nRun SCENIC with:")
print("""# Python/pySCENIC
from pyscenic.cli.pyscenic import run_scenic

run_scenic(
    expr_matrix='scenic_input/heartmap_for_scenic.loom',
    grn_output='scenic_output/grn.tsv',
    ctx_output='scenic_output/regulons.csv',
    auc_output='scenic_output/auc.csv'
)
""")

Automated Reporting

Generate Comprehensive Report

import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
from jinja2 import Template
from pathlib import Path
import base64
from io import BytesIO

def fig_to_base64(fig):
    """Convert matplotlib figure to base64 for HTML embedding"""
    buf = BytesIO()
    fig.savefig(buf, format='png', dpi=150, bbox_inches='tight')
    buf.seek(0)
    img_str = base64.b64encode(buf.read()).decode()
    plt.close(fig)
    return f"data:image/png;base64,{img_str}"

def generate_html_report(adata, output_path):
    """Generate comprehensive HTML report"""
    
    # Generate plots
    plots = {}
    
    # UMAP by cluster
    fig, ax = plt.subplots(figsize=(10, 8))
    sc.pl.umap(adata, color='leiden', ax=ax, show=False, legend_loc='on data')
    plots['umap_cluster'] = fig_to_base64(fig)
    
    # UMAP by chamber (if available)
    if 'chamber' in adata.obs:
        fig, ax = plt.subplots(figsize=(10, 8))
        sc.pl.umap(adata, color='chamber', ax=ax, show=False, legend_loc='on data')
        plots['umap_chamber'] = fig_to_base64(fig)
    
    # Cell type proportions
    fig, ax = plt.subplots(figsize=(10, 6))
    adata.obs['leiden'].value_counts().plot(kind='bar', ax=ax)
    ax.set_xlabel('Cluster')
    ax.set_ylabel('Number of Cells')
    ax.set_title('Cell Type Distribution')
    plt.xticks(rotation=45)
    plots['cell_dist'] = fig_to_base64(fig)
    
    # Calculate summary statistics
    summary = {
        'n_cells': f"{adata.n_obs:,}",
        'n_genes': f"{adata.n_vars:,}",
        'n_clusters': adata.obs['leiden'].nunique(),
        'has_chamber': 'chamber' in adata.obs,
        'chambers': list(adata.obs['chamber'].unique()) if 'chamber' in adata.obs else []
    }
    
    # HTML template
    html_template = """
<!DOCTYPE html>
<html>
<head>
    <title>HeartMAP Analysis Report</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 40px; }
        h1 { color: #2c3e50; }
        h2 { color: #34495e; margin-top: 30px; }
        .summary { background: #ecf0f1; padding: 20px; border-radius: 5px; }
        .stat { display: inline-block; margin: 10px 20px; }
        .stat-label { font-weight: bold; }
        img { max-width: 100%; height: auto; margin: 20px 0; }
    </style>
</head>
<body>
    <h1>🫀 HeartMAP Analysis Report</h1>
    
    <div class="summary">
        <h2>Dataset Overview</h2>
        <div class="stat">
            <span class="stat-label">Cells:</span> {{ summary.n_cells }}
        </div>
        <div class="stat">
            <span class="stat-label">Genes:</span> {{ summary.n_genes }}
        </div>
        <div class="stat">
            <span class="stat-label">Clusters:</span> {{ summary.n_clusters }}
        </div>
        {% if summary.has_chamber %}
        <div class="stat">
            <span class="stat-label">Chambers:</span> {{ summary.chambers|join(', ') }}
        </div>
        {% endif %}
    </div>
    
    <h2>Cell Type Distribution</h2>
    <img src="{{ plots.cell_dist }}" alt="Cell Distribution">
    
    <h2>UMAP Visualization</h2>
    <h3>By Cluster</h3>
    <img src="{{ plots.umap_cluster }}" alt="UMAP by Cluster">
    
    {% if plots.umap_chamber %}
    <h3>By Chamber</h3>
    <img src="{{ plots.umap_chamber }}" alt="UMAP by Chamber">
    {% endif %}
    
    <hr>
    <p><em>Generated by HeartMAP</em></p>
</body>
</html>
    """
    
    # Render template
    template = Template(html_template)
    html = template.render(summary=summary, plots=plots)
    
    # Save report
    with open(output_path, 'w') as f:
        f.write(html)
    
    print(f"Report generated: {output_path}")

# Usage
adata = sc.read_h5ad('results/heartmap_complete.h5ad')
generate_html_report(adata, 'results/analysis_report.html')

Next Steps

Quick Analysis

Start with simple 30-second examples

Python API

Comprehensive API examples

Best Practices

Learn analysis best practices

Troubleshooting

Common issues and solutions

Build docs developers (and LLMs) love