Understanding BasePipeline
All HeartMAP pipelines inherit fromBasePipeline, which provides:
- Configuration management
- Data processing utilities
- Visualization tools
- Results export functionality
from heartmap.pipelines import BasePipeline
from heartmap.config import Config
from typing import Dict, Any, Optional
class BasePipeline:
"""Base class for analysis pipelines"""
def __init__(self, config: Config):
self.config = config
self.data_processor = DataProcessor(config)
self.visualizer = Visualizer(config)
self.exporter = ResultsExporter(config)
self.results: Dict[str, Any] = {}
def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
"""Run the complete pipeline"""
pass # Implement in subclass
def save_results(self, output_dir: str) -> None:
"""Save pipeline results"""
self.exporter.export_results(self.results, output_dir)
Creating a Custom Pipeline
Simple QC Pipeline
from heartmap.pipelines import BasePipeline
from heartmap.config import Config
import scanpy as sc
import matplotlib.pyplot as plt
from pathlib import Path
class QualityControlPipeline(BasePipeline):
"""Custom pipeline for quality control only"""
def __init__(self, config: Config):
super().__init__(config)
def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
"""Run QC analysis"""
print("=== Quality Control Pipeline ===")
# Step 1: Load data
print("1. Loading data...")
adata = sc.read_h5ad(data_path)
adata.var_names_make_unique()
# Step 2: Calculate QC metrics
print("2. Calculating QC metrics...")
adata.var['mt'] = adata.var_names.str.startswith('MT-')
adata.var['ribo'] = adata.var_names.str.startswith(('RPS', 'RPL'))
sc.pp.calculate_qc_metrics(
adata,
qc_vars=['mt', 'ribo'],
percent_top=None,
log1p=False,
inplace=True
)
# Step 3: Filter cells
print("3. Filtering cells...")
initial_cells = adata.n_obs
# Filter based on QC metrics
adata = adata[adata.obs.n_genes_by_counts > 200, :]
adata = adata[adata.obs.pct_counts_mt < 20, :]
filtered_cells = initial_cells - adata.n_obs
print(f" Filtered {filtered_cells} cells ({filtered_cells/initial_cells*100:.1f}%)")
# Step 4: Generate QC plots
print("4. Generating QC visualizations...")
if output_dir:
viz_dir = Path(output_dir) / "figures"
viz_dir.mkdir(parents=True, exist_ok=True)
# Violin plots
sc.pl.violin(
adata,
['n_genes_by_counts', 'total_counts', 'pct_counts_mt'],
jitter=0.4,
multi_panel=True,
show=False
)
plt.savefig(viz_dir / "qc_violin.png", dpi=300, bbox_inches='tight')
plt.close()
# Scatter plots
sc.pl.scatter(adata, x='total_counts', y='n_genes_by_counts', show=False)
plt.savefig(viz_dir / "qc_scatter.png", dpi=300, bbox_inches='tight')
plt.close()
# Step 5: Store results
self.results = {
'adata': adata,
'qc_metrics': {
'initial_cells': initial_cells,
'filtered_cells': filtered_cells,
'final_cells': adata.n_obs,
'mean_genes': adata.obs.n_genes_by_counts.mean(),
'mean_counts': adata.obs.total_counts.mean(),
'mean_mt_pct': adata.obs.pct_counts_mt.mean()
}
}
# Step 6: Save results
if output_dir:
self.save_results(output_dir)
adata.write(Path(output_dir) / "qc_filtered.h5ad")
print("QC pipeline completed!")
return self.results
# Usage
config = Config.default()
pipeline = QualityControlPipeline(config)
results = pipeline.run('data.h5ad', 'results/qc/')
Chamber-Specific Marker Pipeline
class ChamberMarkerPipeline(BasePipeline):
"""Find chamber-specific marker genes"""
def __init__(self, config: Config, chambers=None):
super().__init__(config)
self.chambers = chambers or ['RA', 'RV', 'LA', 'LV']
def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
print("=== Chamber Marker Pipeline ===")
# Load data with chamber info
adata = sc.read_h5ad(data_path)
if 'chamber' not in adata.obs:
raise ValueError("Data must have 'chamber' column")
# Find markers for each chamber
chamber_markers = {}
for chamber in self.chambers:
print(f"Finding markers for {chamber}...")
# Create binary annotation
adata.obs['is_chamber'] = (adata.obs['chamber'] == chamber).astype(str)
# Find differential genes
sc.tl.rank_genes_groups(
adata,
groupby='is_chamber',
method='wilcoxon',
key_added=f'{chamber}_markers'
)
# Extract top markers
markers = sc.get.rank_genes_groups_df(adata, group='True', key=f'{chamber}_markers')
markers = markers[markers['pvals_adj'] < 0.05] # Significant only
markers = markers.nlargest(50, 'scores') # Top 50
chamber_markers[chamber] = markers
# Save to CSV
if output_dir:
markers.to_csv(
Path(output_dir) / f"markers_{chamber}.csv",
index=False
)
# Create comparison heatmap
if output_dir:
self._plot_chamber_comparison(adata, chamber_markers, output_dir)
self.results = {
'adata': adata,
'chamber_markers': chamber_markers
}
if output_dir:
self.save_results(output_dir)
return self.results
def _plot_chamber_comparison(self, adata, chamber_markers, output_dir):
"""Plot heatmap comparing chamber markers"""
import seaborn as sns
# Get top 10 markers per chamber
top_markers = []
for chamber, markers in chamber_markers.items():
top_markers.extend(markers.head(10)['names'].tolist())
top_markers = list(set(top_markers)) # Remove duplicates
# Calculate mean expression per chamber
expr_data = {}
for chamber in self.chambers:
chamber_cells = adata[adata.obs['chamber'] == chamber]
mean_expr = chamber_cells[:, top_markers].X.mean(axis=0)
if hasattr(mean_expr, 'A1'):
mean_expr = mean_expr.A1
expr_data[chamber] = mean_expr
# Create heatmap
import pandas as pd
expr_df = pd.DataFrame(expr_data, index=top_markers)
plt.figure(figsize=(10, 12))
sns.heatmap(expr_df, cmap='RdBu_r', center=0, cbar_kws={'label': 'Mean Expression'})
plt.title('Chamber-Specific Marker Expression')
plt.tight_layout()
plt.savefig(Path(output_dir) / 'chamber_markers_heatmap.png', dpi=300)
plt.close()
# Usage
config = Config.default()
pipeline = ChamberMarkerPipeline(config)
results = pipeline.run('multi_chamber_data.h5ad', 'results/chamber_markers/')
Cell Type Annotation Pipeline
class CustomAnnotationPipeline(BasePipeline):
"""Custom cell type annotation with manual markers"""
def __init__(self, config: Config, marker_dict: Dict[str, List[str]]):
super().__init__(config)
self.marker_dict = marker_dict
def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
print("=== Custom Annotation Pipeline ===")
# Load preprocessed data
adata = self.data_processor.process_from_raw(data_path)
# Ensure clustering is done
if 'leiden' not in adata.obs:
print("Computing neighbors and clustering...")
if 'X_pca' not in adata.obsm:
sc.tl.pca(adata)
sc.pp.neighbors(adata, n_neighbors=15)
sc.tl.leiden(adata, resolution=0.5)
sc.tl.umap(adata)
# Annotate based on marker genes
print("Annotating cell types...")
annotations = self._annotate_clusters(adata)
adata.obs['cell_type'] = annotations
# Generate visualization
if output_dir:
viz_dir = Path(output_dir) / "figures"
viz_dir.mkdir(parents=True, exist_ok=True)
sc.pl.umap(adata, color='cell_type', legend_loc='on data', show=False)
plt.savefig(viz_dir / 'annotated_umap.png', dpi=300, bbox_inches='tight')
plt.close()
# Plot marker genes
for cell_type, markers in self.marker_dict.items():
available_markers = [m for m in markers if m in adata.var_names]
if available_markers:
sc.pl.umap(adata, color=available_markers[:4], show=False)
plt.savefig(viz_dir / f'markers_{cell_type}.png', dpi=300)
plt.close()
self.results = {
'adata': adata,
'cell_type_counts': adata.obs['cell_type'].value_counts().to_dict()
}
if output_dir:
self.save_results(output_dir)
adata.write(Path(output_dir) / 'annotated.h5ad')
return self.results
def _annotate_clusters(self, adata):
"""Annotate clusters based on marker expression"""
annotations = []
for idx, cluster in enumerate(adata.obs['leiden']):
# Calculate marker scores for each cell type
scores = {}
for cell_type, markers in self.marker_dict.items():
available_markers = [m for m in markers if m in adata.var_names]
if available_markers:
marker_expr = adata[idx, available_markers].X
if hasattr(marker_expr, 'toarray'):
marker_expr = marker_expr.toarray()
scores[cell_type] = marker_expr.mean()
else:
scores[cell_type] = 0
# Assign cell type with highest score
best_type = max(scores, key=scores.get)
annotations.append(best_type)
return annotations
# Usage with cardiac markers
cardiac_markers = {
'Cardiomyocytes': ['MYH7', 'TNNT2', 'ACTC1', 'NPPA'],
'Fibroblasts': ['COL1A1', 'COL1A2', 'DCN', 'LUM'],
'Endothelial': ['PECAM1', 'VWF', 'CDH5', 'CLDN5'],
'Smooth_Muscle': ['ACTA2', 'TAGLN', 'MYH11', 'MYLK'],
'Immune': ['PTPRC', 'CD68', 'LYZ', 'CD14'],
'Adipocytes': ['ADIPOQ', 'LEP', 'FABP4', 'PPARG']
}
config = Config.default()
pipeline = CustomAnnotationPipeline(config, cardiac_markers)
results = pipeline.run('preprocessed_data.h5ad', 'results/annotation/')
Advanced Pipeline Patterns
Multi-Step Pipeline
class MultiStepPipeline(BasePipeline):
"""Pipeline with multiple configurable steps"""
def __init__(self, config: Config, steps: List[str]):
super().__init__(config)
self.steps = steps
self.available_steps = {
'qc': self._step_qc,
'normalize': self._step_normalize,
'clustering': self._step_clustering,
'markers': self._step_markers,
'communication': self._step_communication
}
def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
print(f"=== Multi-Step Pipeline: {' → '.join(self.steps)} ===")
# Load initial data
adata = sc.read_h5ad(data_path)
self.results = {'adata': adata}
# Execute each step
for step_name in self.steps:
if step_name in self.available_steps:
print(f"\nExecuting step: {step_name}")
adata = self.available_steps[step_name](adata, output_dir)
self.results['adata'] = adata
else:
print(f"Warning: Unknown step '{step_name}' skipped")
if output_dir:
self.save_results(output_dir)
adata.write(Path(output_dir) / 'final_output.h5ad')
return self.results
def _step_qc(self, adata, output_dir):
"""Quality control step"""
sc.pp.filter_cells(adata, min_genes=200)
sc.pp.filter_genes(adata, min_cells=3)
adata.var['mt'] = adata.var_names.str.startswith('MT-')
sc.pp.calculate_qc_metrics(adata, qc_vars=['mt'], inplace=True)
return adata
def _step_normalize(self, adata, output_dir):
"""Normalization step"""
sc.pp.normalize_total(adata, target_sum=1e4)
sc.pp.log1p(adata)
return adata
def _step_clustering(self, adata, output_dir):
"""Clustering step"""
sc.pp.highly_variable_genes(adata, n_top_genes=2000)
sc.tl.pca(adata)
sc.pp.neighbors(adata)
sc.tl.leiden(adata)
sc.tl.umap(adata)
return adata
def _step_markers(self, adata, output_dir):
"""Marker gene identification"""
sc.tl.rank_genes_groups(adata, 'leiden', method='wilcoxon')
self.results['markers'] = sc.get.rank_genes_groups_df(adata, group=None)
return adata
def _step_communication(self, adata, output_dir):
"""Cell-cell communication analysis"""
# Simplified communication analysis
# In practice, use LIANA or similar
return adata
# Usage - flexible workflow
config = Config.default()
# Quick QC and clustering
pipeline = MultiStepPipeline(config, steps=['qc', 'normalize', 'clustering'])
results = pipeline.run('raw_data.h5ad', 'results/quick/')
# Full analysis
pipeline = MultiStepPipeline(config, steps=['qc', 'normalize', 'clustering', 'markers', 'communication'])
results = pipeline.run('raw_data.h5ad', 'results/full/')
Pipeline Composition
class CompositePipeline(BasePipeline):
"""Combine multiple pipelines"""
def __init__(self, config: Config, pipelines: List[BasePipeline]):
super().__init__(config)
self.pipelines = pipelines
def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
print("=== Composite Pipeline ===")
current_data = data_path
all_results = {}
for i, pipeline in enumerate(self.pipelines):
pipeline_name = pipeline.__class__.__name__
print(f"\nRunning pipeline {i+1}/{len(self.pipelines)}: {pipeline_name}")
sub_output = Path(output_dir) / f"step_{i+1}_{pipeline_name}" if output_dir else None
results = pipeline.run(current_data, str(sub_output) if sub_output else None)
all_results[pipeline_name] = results
# Use output from previous pipeline as input to next
if sub_output and (sub_output / "final_output.h5ad").exists():
current_data = str(sub_output / "final_output.h5ad")
self.results = all_results
return self.results
# Usage - chain pipelines
config = Config.default()
qc_pipeline = QualityControlPipeline(config)
annotation_pipeline = CustomAnnotationPipeline(config, cardiac_markers)
chamber_pipeline = ChamberMarkerPipeline(config)
composite = CompositePipeline(config, [
qc_pipeline,
annotation_pipeline,
chamber_pipeline
])
results = composite.run('raw_data.h5ad', 'results/composite/')
Testing Custom Pipelines
import pytest
from pathlib import Path
def test_qc_pipeline():
"""Test custom QC pipeline"""
# Create test data
import anndata as ad
import numpy as np
X = np.random.negative_binomial(5, 0.3, (100, 50))
adata = ad.AnnData(X)
test_file = 'test_data.h5ad'
adata.write(test_file)
# Run pipeline
config = Config.default()
pipeline = QualityControlPipeline(config)
results = pipeline.run(test_file, 'test_results/')
# Assertions
assert results['qc_metrics']['final_cells'] > 0
assert Path('test_results/qc_filtered.h5ad').exists()
# Cleanup
Path(test_file).unlink()
if __name__ == '__main__':
test_qc_pipeline()
print("✓ All tests passed")
Next Steps
Advanced Workflows
Complex batch processing and integration patterns
API Reference
Complete pipeline API documentation
Contributing
Contribute your custom pipelines
Best Practices
Pipeline development guidelines