Batch Processing Multiple Samples
Sequential Processing
from pathlib import Path
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
import pandas as pd
config = Config.default()
pipeline = ComprehensivePipeline(config)
# Process multiple samples
data_dir = Path('data/samples/')
results_summary = []
for sample_file in data_dir.glob('sample_*.h5ad'):
sample_name = sample_file.stem
print(f"\n{'='*60}")
print(f"Processing: {sample_name}")
print('='*60)
output_dir = f'results/{sample_name}/'
try:
results = pipeline.run(str(sample_file), output_dir)
# Extract summary statistics
adata = results['adata']
summary = {
'sample': sample_name,
'n_cells': adata.n_obs,
'n_genes': adata.n_vars,
'n_clusters': adata.obs['leiden'].nunique(),
'status': 'success'
}
if 'chamber' in adata.obs:
chamber_dist = adata.obs['chamber'].value_counts().to_dict()
summary.update(chamber_dist)
results_summary.append(summary)
print(f"✓ Completed: {sample_name}")
except Exception as e:
print(f"✗ Failed: {sample_name}")
print(f" Error: {e}")
results_summary.append({
'sample': sample_name,
'status': 'failed',
'error': str(e)
})
# Save batch summary
summary_df = pd.DataFrame(results_summary)
summary_df.to_csv('results/batch_summary.csv', index=False)
print("\n" + "="*60)
print("Batch processing complete!")
print(summary_df.to_string())
Parallel Processing
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import partial
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
import multiprocessing as mp
def process_single_sample(sample_path, config_dict):
"""Process a single sample (must be picklable for multiprocessing)"""
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
# Reconstruct config from dict
config = Config.from_dict(config_dict)
pipeline = ComprehensivePipeline(config)
sample_name = Path(sample_path).stem
output_dir = f'results/{sample_name}/'
try:
results = pipeline.run(sample_path, output_dir)
return {
'sample': sample_name,
'status': 'success',
'n_cells': results['adata'].n_obs,
'n_genes': results['adata'].n_vars
}
except Exception as e:
return {
'sample': sample_name,
'status': 'failed',
'error': str(e)
}
# Parallel execution
config = Config.default()
config_dict = config.to_dict()
data_files = list(Path('data/samples/').glob('*.h5ad'))
n_workers = min(4, mp.cpu_count() - 1) # Leave one CPU free
print(f"Processing {len(data_files)} samples with {n_workers} workers...")
with ProcessPoolExecutor(max_workers=n_workers) as executor:
# Submit all jobs
futures = {
executor.submit(process_single_sample, str(f), config_dict): f.name
for f in data_files
}
# Collect results as they complete
results = []
for future in as_completed(futures):
sample_name = futures[future]
try:
result = future.result()
results.append(result)
print(f"✓ Completed: {result['sample']}")
except Exception as e:
print(f"✗ Failed: {sample_name} - {e}")
# Save summary
import pandas as pd
df = pd.DataFrame(results)
df.to_csv('results/parallel_batch_summary.csv', index=False)
print("\nParallel processing complete!")
print(df)
Multi-Sample Integration
Combining Multiple Datasets
import scanpy as sc
import pandas as pd
from pathlib import Path
# Load all processed samples
samples = []
for sample_dir in Path('results/').glob('sample_*/'):
adata_path = sample_dir / 'heartmap_complete.h5ad'
if adata_path.exists():
adata = sc.read_h5ad(adata_path)
adata.obs['sample_id'] = sample_dir.name
samples.append(adata)
print(f"Loading {len(samples)} samples...")
# Concatenate samples
adata_combined = sc.concat(samples, label='batch', keys=[s.obs['sample_id'].iloc[0] for s in samples])
print(f"Combined: {adata_combined.n_obs} cells × {adata_combined.n_vars} genes")
# Batch correction
print("Performing batch correction...")
sc.pp.combat(adata_combined, key='batch')
# Re-analyze integrated data
print("Re-clustering integrated data...")
sc.pp.highly_variable_genes(adata_combined, n_top_genes=3000)
sc.tl.pca(adata_combined)
sc.pp.neighbors(adata_combined)
sc.tl.leiden(adata_combined, resolution=0.8)
sc.tl.umap(adata_combined)
# Visualize batch effects
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
sc.pl.umap(adata_combined, color='batch', ax=axes[0], show=False, title='By Sample')
sc.pl.umap(adata_combined, color='leiden', ax=axes[1], show=False, title='By Cluster')
plt.tight_layout()
plt.savefig('results/integrated_analysis.png', dpi=300)
# Save integrated data
adata_combined.write('results/integrated_all_samples.h5ad')
print("Integration complete!")
Cross-Sample Comparison
import scanpy as sc
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
# Load integrated data
adata = sc.read_h5ad('results/integrated_all_samples.h5ad')
# Compare cell type proportions across samples
cell_type_props = pd.crosstab(
adata.obs['sample_id'],
adata.obs['leiden'],
normalize='index'
)
plt.figure(figsize=(14, 6))
sns.heatmap(cell_type_props, annot=True, fmt='.2%', cmap='YlOrRd')
plt.title('Cell Type Proportions by Sample')
plt.xlabel('Cluster')
plt.ylabel('Sample')
plt.tight_layout()
plt.savefig('results/cell_type_comparison.png', dpi=300)
plt.close()
# Statistical comparison
from scipy.stats import chi2_contingency
contingency = pd.crosstab(adata.obs['sample_id'], adata.obs['leiden'])
chi2, pval, dof, expected = chi2_contingency(contingency)
print(f"Chi-square test for cell type distribution:")
print(f" χ² = {chi2:.2f}, p-value = {pval:.2e}")
if pval < 0.05:
print(" → Significant difference in cell type distribution across samples")
else:
print(" → No significant difference in cell type distribution")
# Differential expression between samples
print("\nPerforming differential expression between samples...")
for cluster in adata.obs['leiden'].unique():
cluster_data = adata[adata.obs['leiden'] == cluster].copy()
if cluster_data.obs['sample_id'].nunique() > 1:
sc.tl.rank_genes_groups(
cluster_data,
groupby='sample_id',
method='wilcoxon',
key_added=f'de_sample_cluster{cluster}'
)
# Save top DE genes
de_results = sc.get.rank_genes_groups_df(
cluster_data,
group=None,
key=f'de_sample_cluster{cluster}'
)
de_results.to_csv(f'results/de_cluster{cluster}_samples.csv', index=False)
print("Cross-sample comparison complete!")
Comparative Analysis Workflows
Healthy vs Disease Comparison
import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import mannwhitneyu
# Load datasets
print("Loading healthy and disease datasets...")
healthy = sc.read_h5ad('data/healthy_heart.h5ad')
disease = sc.read_h5ad('data/diseased_heart.h5ad')
# Add condition labels
healthy.obs['condition'] = 'healthy'
disease.obs['condition'] = 'disease'
# Combine for integrated analysis
adata_combined = sc.concat([healthy, disease], label='condition')
# Process combined data
print("Processing combined data...")
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
config = Config.default()
pipeline = ComprehensivePipeline(config)
# Save temporarily and process
adata_combined.write('temp_combined.h5ad')
results = pipeline.run('temp_combined.h5ad', 'results/comparison/')
adata = results['adata']
# Compare cell type proportions
print("\nComparing cell type proportions...")
cell_type_comparison = pd.crosstab(
adata.obs['leiden'],
adata.obs['condition'],
normalize='columns'
)
fig, ax = plt.subplots(figsize=(10, 6))
cell_type_comparison.plot(kind='bar', ax=ax)
ax.set_ylabel('Proportion')
ax.set_xlabel('Cell Type (Cluster)')
ax.set_title('Cell Type Distribution: Healthy vs Disease')
ax.legend(title='Condition')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('results/comparison/cell_type_props.png', dpi=300)
# Differential expression analysis
print("\nPerforming differential expression analysis...")
sc.tl.rank_genes_groups(
adata,
groupby='condition',
groups=['disease'],
reference='healthy',
method='wilcoxon',
key_added='disease_vs_healthy'
)
# Extract and save DE genes
de_genes = sc.get.rank_genes_groups_df(adata, group='disease', key='disease_vs_healthy')
de_genes_sig = de_genes[de_genes['pvals_adj'] < 0.05]
print(f"Found {len(de_genes_sig)} significantly DE genes")
de_genes_sig.to_csv('results/comparison/de_genes_disease_vs_healthy.csv', index=False)
# Plot top DE genes
top_de = de_genes_sig.head(20)['names'].tolist()
sc.pl.heatmap(
adata,
var_names=top_de,
groupby='condition',
cmap='RdBu_r',
save='_top_de_genes_comparison.png'
)
# Pathway analysis (simplified)
print("\nAnalyzing disease-enriched pathways...")
upregulated = de_genes_sig[de_genes_sig['scores'] > 0]['names'].tolist()
downregulated = de_genes_sig[de_genes_sig['scores'] < 0]['names'].tolist()
print(f"Upregulated in disease: {len(upregulated)} genes")
print(f"Downregulated in disease: {len(downregulated)} genes")
# Save gene lists for external pathway analysis
with open('results/comparison/upregulated_disease.txt', 'w') as f:
f.write('\n'.join(upregulated))
with open('results/comparison/downregulated_disease.txt', 'w') as f:
f.write('\n'.join(downregulated))
print("\nComparative analysis complete!")
Chamber-Specific Disease Effects
import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# Load data with chamber and condition info
adata = sc.read_h5ad('results/comparison/heartmap_complete.h5ad')
if 'chamber' not in adata.obs or 'condition' not in adata.obs:
raise ValueError("Data must have both 'chamber' and 'condition' columns")
chambers = ['RA', 'RV', 'LA', 'LV']
# Chamber-specific DE analysis
print("Analyzing chamber-specific disease effects...")
chamber_de_results = {}
for chamber in chambers:
print(f"\nAnalyzing {chamber}...")
# Subset to this chamber
chamber_data = adata[adata.obs['chamber'] == chamber].copy()
if chamber_data.obs['condition'].nunique() < 2:
print(f" Skipping {chamber} - only one condition present")
continue
# DE analysis
sc.tl.rank_genes_groups(
chamber_data,
groupby='condition',
groups=['disease'],
reference='healthy',
method='wilcoxon',
key_added='de'
)
# Extract results
de_df = sc.get.rank_genes_groups_df(chamber_data, group='disease', key='de')
de_df_sig = de_df[de_df['pvals_adj'] < 0.05]
chamber_de_results[chamber] = de_df_sig
print(f" Found {len(de_df_sig)} DE genes in {chamber}")
# Save chamber-specific results
de_df_sig.to_csv(f'results/comparison/de_{chamber}_disease_vs_healthy.csv', index=False)
# Compare DE genes across chambers
print("\nComparing DE genes across chambers...")
# Venn diagram data
de_gene_sets = {chamber: set(df['names'].tolist()) for chamber, df in chamber_de_results.items()}
# Shared vs unique genes
all_de_genes = set.union(*de_gene_sets.values()) if de_gene_sets else set()
shared_genes = set.intersection(*de_gene_sets.values()) if len(de_gene_sets) > 1 else set()
print(f"Total unique DE genes: {len(all_de_genes)}")
print(f"Shared across all chambers: {len(shared_genes)}")
for chamber, genes in de_gene_sets.items():
unique = genes - set.union(*(s for c, s in de_gene_sets.items() if c != chamber))
print(f" {chamber} unique: {len(unique)}")
# Create comparison matrix
comparison_matrix = pd.DataFrame(index=chambers, columns=chambers)
for c1 in chambers:
for c2 in chambers:
if c1 in de_gene_sets and c2 in de_gene_sets:
overlap = len(de_gene_sets[c1] & de_gene_sets[c2])
union = len(de_gene_sets[c1] | de_gene_sets[c2])
jaccard = overlap / union if union > 0 else 0
comparison_matrix.loc[c1, c2] = jaccard
plt.figure(figsize=(10, 8))
sns.heatmap(comparison_matrix.astype(float), annot=True, fmt='.2f', cmap='YlOrRd')
plt.title('DE Gene Overlap Between Chambers (Jaccard Index)')
plt.tight_layout()
plt.savefig('results/comparison/chamber_de_overlap.png', dpi=300)
print("\nChamber-specific analysis complete!")
Integration with External Tools
CellPhoneDB Integration
import scanpy as sc
import pandas as pd
from pathlib import Path
# Load HeartMAP results
adata = sc.read_h5ad('results/heartmap_complete.h5ad')
# Prepare CellPhoneDB input files
print("Preparing CellPhoneDB input files...")
# 1. Metadata file
meta = pd.DataFrame({
'Cell': adata.obs_names,
'cell_type': adata.obs['leiden'].astype(str)
})
meta.to_csv('cellphonedb_input/meta.txt', sep='\t', index=False)
# 2. Counts matrix (normalized)
if adata.raw is not None:
counts = pd.DataFrame(
adata.raw.X.toarray() if hasattr(adata.raw.X, 'toarray') else adata.raw.X,
index=adata.obs_names,
columns=adata.raw.var_names
).T
else:
counts = pd.DataFrame(
adata.X.toarray() if hasattr(adata.X, 'toarray') else adata.X,
index=adata.obs_names,
columns=adata.var_names
).T
counts.index.name = 'Gene'
counts.to_csv('cellphonedb_input/counts.txt', sep='\t')
print("Files prepared for CellPhoneDB:")
print(" - cellphonedb_input/meta.txt")
print(" - cellphonedb_input/counts.txt")
print("\nRun CellPhoneDB with:")
print(" cellphonedb method statistical_analysis \\")
print(" cellphonedb_input/meta.txt \\")
print(" cellphonedb_input/counts.txt \\")
print(" --output-path cellphonedb_results/")
Seurat Integration
import scanpy as sc
import anndata as ad
# Load HeartMAP processed data
adata = sc.read_h5ad('results/heartmap_complete.h5ad')
# Convert to Seurat-compatible format
print("Converting to Seurat format...")
# Save as h5ad with specific structure for SeuratDisk
adata_seurat = adata.copy()
# Ensure all required layers
if adata_seurat.raw is not None:
adata_seurat.layers['counts'] = adata_seurat.raw.X
adata_seurat.write('seurat_input/heartmap_for_seurat.h5ad')
print("\nConversion complete! Use in R with:")
print("""library(Seurat)
library(SeuratDisk)
# Load h5ad file
Convert('heartmap_for_seurat.h5ad', dest='h5seurat', overwrite=TRUE)
seurat_obj <- LoadH5Seurat('heartmap_for_seurat.h5seurat')
# Continue with Seurat analysis
DimPlot(seurat_obj, reduction='umap', group.by='leiden')
""")
SCENIC Integration
import scanpy as sc
import pandas as pd
import numpy as np
from pathlib import Path
# Load HeartMAP results
adata = sc.read_h5ad('results/heartmap_complete.h5ad')
# Prepare loom file for SCENIC
print("Preparing data for SCENIC...")
# SCENIC requires raw counts
if adata.raw is not None:
counts_adata = adata.raw.to_adata()
else:
counts_adata = adata.copy()
# Add embeddings and metadata
counts_adata.obsm['X_umap'] = adata.obsm['X_umap']
counts_adata.obs['leiden'] = adata.obs['leiden']
if 'chamber' in adata.obs:
counts_adata.obs['chamber'] = adata.obs['chamber']
# Export to loom for SCENIC
scenic_dir = Path('scenic_input/')
scenic_dir.mkdir(exist_ok=True)
counts_adata.write_loom(scenic_dir / 'heartmap_for_scenic.loom')
print("\nLoom file created for SCENIC!")
print("\nRun SCENIC with:")
print("""# Python/pySCENIC
from pyscenic.cli.pyscenic import run_scenic
run_scenic(
expr_matrix='scenic_input/heartmap_for_scenic.loom',
grn_output='scenic_output/grn.tsv',
ctx_output='scenic_output/regulons.csv',
auc_output='scenic_output/auc.csv'
)
""")
Automated Reporting
Generate Comprehensive Report
import scanpy as sc
import pandas as pd
import matplotlib.pyplot as plt
from jinja2 import Template
from pathlib import Path
import base64
from io import BytesIO
def fig_to_base64(fig):
"""Convert matplotlib figure to base64 for HTML embedding"""
buf = BytesIO()
fig.savefig(buf, format='png', dpi=150, bbox_inches='tight')
buf.seek(0)
img_str = base64.b64encode(buf.read()).decode()
plt.close(fig)
return f"data:image/png;base64,{img_str}"
def generate_html_report(adata, output_path):
"""Generate comprehensive HTML report"""
# Generate plots
plots = {}
# UMAP by cluster
fig, ax = plt.subplots(figsize=(10, 8))
sc.pl.umap(adata, color='leiden', ax=ax, show=False, legend_loc='on data')
plots['umap_cluster'] = fig_to_base64(fig)
# UMAP by chamber (if available)
if 'chamber' in adata.obs:
fig, ax = plt.subplots(figsize=(10, 8))
sc.pl.umap(adata, color='chamber', ax=ax, show=False, legend_loc='on data')
plots['umap_chamber'] = fig_to_base64(fig)
# Cell type proportions
fig, ax = plt.subplots(figsize=(10, 6))
adata.obs['leiden'].value_counts().plot(kind='bar', ax=ax)
ax.set_xlabel('Cluster')
ax.set_ylabel('Number of Cells')
ax.set_title('Cell Type Distribution')
plt.xticks(rotation=45)
plots['cell_dist'] = fig_to_base64(fig)
# Calculate summary statistics
summary = {
'n_cells': f"{adata.n_obs:,}",
'n_genes': f"{adata.n_vars:,}",
'n_clusters': adata.obs['leiden'].nunique(),
'has_chamber': 'chamber' in adata.obs,
'chambers': list(adata.obs['chamber'].unique()) if 'chamber' in adata.obs else []
}
# HTML template
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>HeartMAP Analysis Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
h1 { color: #2c3e50; }
h2 { color: #34495e; margin-top: 30px; }
.summary { background: #ecf0f1; padding: 20px; border-radius: 5px; }
.stat { display: inline-block; margin: 10px 20px; }
.stat-label { font-weight: bold; }
img { max-width: 100%; height: auto; margin: 20px 0; }
</style>
</head>
<body>
<h1>🫀 HeartMAP Analysis Report</h1>
<div class="summary">
<h2>Dataset Overview</h2>
<div class="stat">
<span class="stat-label">Cells:</span> {{ summary.n_cells }}
</div>
<div class="stat">
<span class="stat-label">Genes:</span> {{ summary.n_genes }}
</div>
<div class="stat">
<span class="stat-label">Clusters:</span> {{ summary.n_clusters }}
</div>
{% if summary.has_chamber %}
<div class="stat">
<span class="stat-label">Chambers:</span> {{ summary.chambers|join(', ') }}
</div>
{% endif %}
</div>
<h2>Cell Type Distribution</h2>
<img src="{{ plots.cell_dist }}" alt="Cell Distribution">
<h2>UMAP Visualization</h2>
<h3>By Cluster</h3>
<img src="{{ plots.umap_cluster }}" alt="UMAP by Cluster">
{% if plots.umap_chamber %}
<h3>By Chamber</h3>
<img src="{{ plots.umap_chamber }}" alt="UMAP by Chamber">
{% endif %}
<hr>
<p><em>Generated by HeartMAP</em></p>
</body>
</html>
"""
# Render template
template = Template(html_template)
html = template.render(summary=summary, plots=plots)
# Save report
with open(output_path, 'w') as f:
f.write(html)
print(f"Report generated: {output_path}")
# Usage
adata = sc.read_h5ad('results/heartmap_complete.h5ad')
generate_html_report(adata, 'results/analysis_report.html')
Next Steps
Quick Analysis
Start with simple 30-second examples
Python API
Comprehensive API examples
Best Practices
Learn analysis best practices
Troubleshooting
Common issues and solutions