Basic Usage
Loading and Preprocessing
import scanpy as sc
from heartmap import Config
from heartmap.data import DataProcessor
# Initialize data processor
config = Config.default()
processor = DataProcessor(config)
# Load and preprocess raw data
adata = processor.process_from_raw('heart_data.h5ad')
print(f"Loaded: {adata.n_obs} cells × {adata.n_vars} genes")
- Quality control filtering (min_genes=200, min_cells=3)
- Mitochondrial/ribosomal gene annotation
- Normalization (target_sum=10000)
- Log transformation
- Highly variable gene selection
Running Complete Analysis
from heartmap.pipelines import ComprehensivePipeline
# Run comprehensive analysis pipeline
pipeline = ComprehensivePipeline(config)
results = pipeline.run('heart_data.h5ad', 'results/')
# Access results
adata = results['adata'] # Processed AnnData object
annotation_results = results['results']['annotation']
communication_results = results['results']['communication']
chamber_results = results['results']['multi_chamber']
print("✓ Analysis complete!")
Configuration Management
Loading from YAML
from heartmap import Config
# Load from file
config = Config.from_yaml('config.yaml')
# Access configuration
print(config.data.max_cells_subset) # 50000
print(config.analysis.resolution) # 0.5
print(config.paths.results_dir) # 'results'
Programmatic Configuration
config = Config.default()
# Customize data parameters
config.data.max_cells_subset = 30000
config.data.max_genes_subset = 4000
config.data.target_sum = 10000.0
config.data.n_top_genes = 2000
# Customize analysis parameters
config.analysis.n_components_pca = 50
config.analysis.n_neighbors = 15
config.analysis.resolution = 0.8
config.analysis.use_leiden = True
# Model settings
config.model.save_intermediate = True
config.model.use_gpu = False
# Save custom config
config.save('my_config.yaml')
Pipeline Examples
Basic Pipeline
- Complete Example
- Step by Step
from heartmap import Config
from heartmap.pipelines import BasicPipeline
import scanpy as sc
# Setup
config = Config.default()
pipeline = BasicPipeline(config)
# Run analysis
results = pipeline.run(
data_path='data/heart_data.h5ad',
output_dir='results/basic/'
)
# Access results
adata = results['adata']
cluster_labels = results['results']['cluster_labels']
# Explore clusters
sc.pl.umap(adata, color='leiden', legend_loc='on data')
from heartmap.data import DataProcessor
import scanpy as sc
# Step 1: Load data
config = Config.default()
processor = DataProcessor(config)
adata = processor.process_from_raw('heart_data.h5ad')
# Step 2: Compute neighbors graph
if 'neighbors' not in adata.uns:
if 'X_pca' not in adata.obsm:
sc.tl.pca(adata, svd_solver='arpack')
sc.pp.neighbors(adata, n_neighbors=15, n_pcs=40)
# Step 3: Clustering
sc.tl.leiden(adata, resolution=0.5)
# Step 4: UMAP
sc.tl.umap(adata)
# Step 5: Find marker genes
sc.tl.rank_genes_groups(adata, 'leiden', method='wilcoxon')
# Save
adata.write('results/annotated_data.h5ad')
Communication Pipeline
from heartmap.pipelines import AdvancedCommunicationPipeline
import pandas as pd
# Must start with annotated data (with cell type clusters)
config = Config.default()
pipeline = AdvancedCommunicationPipeline(config)
results = pipeline.run(
data_path='results/annotated_data.h5ad',
output_dir='results/communication/'
)
# Access communication results
comm_scores = results['results']['communication_scores']
hub_scores = results['results']['hub_scores']
# Analyze top interactions
top_interactions = comm_scores.nlargest(10, 'communication_score')
for _, row in top_interactions.iterrows():
print(f"{row['source']} → {row['target']}")
print(f" {row['ligand']} → {row['receptor']}")
print(f" Score: {row['communication_score']:.3f}")
- Annotated data with cell type clusters (
leidenorcell_typecolumn) - Raw or normalized counts in
adata.raworadata.X - Optional: ligand-receptor database (auto-loaded from CellPhoneDB/LIANA)
Multi-Chamber Pipeline
from heartmap.pipelines import MultiChamberPipeline
config = Config.default()
pipeline = MultiChamberPipeline(config)
results = pipeline.run(
data_path='data/multi_chamber_heart.h5ad',
output_dir='results/multi_chamber/'
)
# Access chamber-specific results
chamber_markers = results['results']['chamber_markers']
cross_chamber_corr = results['results']['cross_chamber_correlations']
# View chamber-specific markers
for chamber, markers in chamber_markers.items():
print(f"\n{chamber} top markers:")
print(markers['names'].head())
- Data must have
chambercolumn inadata.obs - Valid chamber values:
RA,RV,LA,LV - Minimum 100 cells per chamber recommended
Working with Results
Accessing Processed Data
import scanpy as sc
import pandas as pd
# Load processed data
adata = sc.read_h5ad('results/heartmap_complete.h5ad')
# View metadata
print(adata.obs.columns.tolist())
# ['leiden', 'chamber', 'hub_score', 'n_genes', 'n_counts', ...]
# Cell type distribution
print(adata.obs['leiden'].value_counts())
# Chamber distribution (if multi-chamber data)
if 'chamber' in adata.obs:
print(adata.obs['chamber'].value_counts())
Extracting Specific Results
# Marker genes per cluster
marker_df = pd.DataFrame(adata.uns['rank_genes_groups']['names'])
print(marker_df.head(10)) # Top 10 markers per cluster
# Get markers for specific cluster
cluster_0_markers = marker_df['0'].tolist()
# Hub scores (from communication analysis)
if 'hub_score' in adata.obs:
hub_df = pd.DataFrame({
'cell_type': adata.obs['leiden'],
'hub_score': adata.obs['hub_score']
}).groupby('cell_type').mean()
print(hub_df.sort_values('hub_score', ascending=False))
Visualization
import matplotlib.pyplot as plt
import seaborn as sns
# UMAP colored by different features
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
sc.pl.umap(adata, color='leiden', ax=axes[0], show=False)
sc.pl.umap(adata, color='chamber', ax=axes[1], show=False)
sc.pl.umap(adata, color='hub_score', ax=axes[2], show=False)
plt.tight_layout()
plt.savefig('results/umap_overview.png', dpi=300)
# Gene expression heatmap
sc.pl.heatmap(
adata,
var_names=['NPPA', 'MYH7', 'CD36', 'FHL2', 'VEGFA'],
groupby='leiden',
cmap='viridis',
save='_marker_heatmap.png'
)
Data Integrity and Reproducibility
Using Fixed Random Seeds
import numpy as np
import random
# Set all random seeds for reproducibility
np.random.seed(42)
random.seed(42)
config = Config.default()
config.data.random_seed = 42
# All stochastic operations will be reproducible
pipeline = ComprehensivePipeline(config)
results = pipeline.run('data.h5ad', 'results/')
Verifying Data Checksums
from heartmap.utils import verify_checksum
from pathlib import Path
# Calculate checksum
data_path = Path('data/heart_data.h5ad')
checksum = verify_checksum(data_path)
print(f"SHA-256: {checksum}")
# Verify against known checksum
expected = "abc123..."
if checksum == expected:
print("✓ Data integrity verified")
else:
print("✗ Checksum mismatch - data may be corrupted")
Batch Processing
Multiple Datasets
from pathlib import Path
from heartmap.pipelines import ComprehensivePipeline
config = Config.default()
pipeline = ComprehensivePipeline(config)
# Process multiple files
data_dir = Path('data/raw/')
for data_file in data_dir.glob('*.h5ad'):
print(f"Processing {data_file.name}...")
output_dir = f'results/{data_file.stem}/'
results = pipeline.run(str(data_file), output_dir)
print(f" ✓ Completed: {output_dir}")
Parallel Processing
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def process_sample(data_file, config):
"""Process a single sample"""
from heartmap.pipelines import ComprehensivePipeline
pipeline = ComprehensivePipeline(config)
output_dir = f'results/{data_file.stem}/'
results = pipeline.run(str(data_file), output_dir)
return data_file.name, output_dir
# Parallel processing
config = Config.default()
data_files = list(Path('data/').glob('*.h5ad'))
with ProcessPoolExecutor(max_workers=4) as executor:
process_func = partial(process_sample, config=config)
results = list(executor.map(process_func, data_files))
for filename, output_dir in results:
print(f"✓ {filename} → {output_dir}")
Error Handling
from heartmap import Config
from heartmap.pipelines import ComprehensivePipeline
try:
config = Config.from_yaml('config.yaml')
pipeline = ComprehensivePipeline(config)
results = pipeline.run('data.h5ad', 'output/')
except FileNotFoundError as e:
print(f"Data file not found: {e}")
except ImportError as e:
print(f"Missing dependency: {e}")
print("Install with: pip install heartmap[all]")
except MemoryError:
print("Out of memory!")
print("Reduce config.data.max_cells_subset and max_genes_subset")
except Exception as e:
print(f"Unexpected error: {e}")
import traceback
traceback.print_exc()
Integration Examples
With Scanpy Workflow
import scanpy as sc
from heartmap import Config
from heartmap.pipelines import BasicPipeline
# Start with scanpy preprocessing
adata = sc.read_h5ad('raw_data.h5ad')
sc.pp.filter_cells(adata, min_genes=200)
sc.pp.filter_genes(adata, min_cells=3)
# Continue with HeartMAP
config = Config.default()
pipeline = BasicPipeline(config)
# Save preprocessed data temporarily
adata.write('temp_preprocessed.h5ad')
# Run HeartMAP pipeline
results = pipeline.run('temp_preprocessed.h5ad', 'results/')
# Continue with custom scanpy analysis
adata = results['adata']
sc.tl.rank_genes_groups(adata, 'leiden', method='t-test')
With CellPhoneDB
from heartmap.pipelines import AdvancedCommunicationPipeline
import pandas as pd
# Run HeartMAP communication analysis
config = Config.default()
pipeline = AdvancedCommunicationPipeline(config)
results = pipeline.run('annotated_data.h5ad', 'results/')
# Export for CellPhoneDB
adata = results['adata']
# Save metadata
meta = adata.obs[['leiden']].copy()
meta.columns = ['cell_type']
meta.to_csv('cellphonedb_meta.txt', sep='\t')
# Save counts
counts = pd.DataFrame(
adata.raw.X.toarray() if hasattr(adata.raw.X, 'toarray') else adata.raw.X,
index=adata.obs_names,
columns=adata.raw.var_names
).T
counts.to_csv('cellphonedb_counts.txt', sep='\t')
Next Steps
Custom Pipelines
Build custom analysis workflows
Advanced Workflows
Complex batch processing and integrations
API Reference
Complete API documentation
Configuration
All configuration options