Skip to main content

Understanding BasePipeline

All HeartMAP pipelines inherit from BasePipeline, which provides:
  • Configuration management
  • Data processing utilities
  • Visualization tools
  • Results export functionality
from heartmap.pipelines import BasePipeline
from heartmap.config import Config
from typing import Dict, Any, Optional

class BasePipeline:
    """Base class for analysis pipelines"""
    
    def __init__(self, config: Config):
        self.config = config
        self.data_processor = DataProcessor(config)
        self.visualizer = Visualizer(config)
        self.exporter = ResultsExporter(config)
        self.results: Dict[str, Any] = {}
    
    def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
        """Run the complete pipeline"""
        pass  # Implement in subclass
    
    def save_results(self, output_dir: str) -> None:
        """Save pipeline results"""
        self.exporter.export_results(self.results, output_dir)

Creating a Custom Pipeline

Simple QC Pipeline

from heartmap.pipelines import BasePipeline
from heartmap.config import Config
import scanpy as sc
import matplotlib.pyplot as plt
from pathlib import Path

class QualityControlPipeline(BasePipeline):
    """Custom pipeline for quality control only"""
    
    def __init__(self, config: Config):
        super().__init__(config)
    
    def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
        """Run QC analysis"""
        print("=== Quality Control Pipeline ===")
        
        # Step 1: Load data
        print("1. Loading data...")
        adata = sc.read_h5ad(data_path)
        adata.var_names_make_unique()
        
        # Step 2: Calculate QC metrics
        print("2. Calculating QC metrics...")
        adata.var['mt'] = adata.var_names.str.startswith('MT-')
        adata.var['ribo'] = adata.var_names.str.startswith(('RPS', 'RPL'))
        
        sc.pp.calculate_qc_metrics(
            adata,
            qc_vars=['mt', 'ribo'],
            percent_top=None,
            log1p=False,
            inplace=True
        )
        
        # Step 3: Filter cells
        print("3. Filtering cells...")
        initial_cells = adata.n_obs
        
        # Filter based on QC metrics
        adata = adata[adata.obs.n_genes_by_counts > 200, :]
        adata = adata[adata.obs.pct_counts_mt < 20, :]
        
        filtered_cells = initial_cells - adata.n_obs
        print(f"   Filtered {filtered_cells} cells ({filtered_cells/initial_cells*100:.1f}%)")
        
        # Step 4: Generate QC plots
        print("4. Generating QC visualizations...")
        if output_dir:
            viz_dir = Path(output_dir) / "figures"
            viz_dir.mkdir(parents=True, exist_ok=True)
            
            # Violin plots
            sc.pl.violin(
                adata,
                ['n_genes_by_counts', 'total_counts', 'pct_counts_mt'],
                jitter=0.4,
                multi_panel=True,
                show=False
            )
            plt.savefig(viz_dir / "qc_violin.png", dpi=300, bbox_inches='tight')
            plt.close()
            
            # Scatter plots
            sc.pl.scatter(adata, x='total_counts', y='n_genes_by_counts', show=False)
            plt.savefig(viz_dir / "qc_scatter.png", dpi=300, bbox_inches='tight')
            plt.close()
        
        # Step 5: Store results
        self.results = {
            'adata': adata,
            'qc_metrics': {
                'initial_cells': initial_cells,
                'filtered_cells': filtered_cells,
                'final_cells': adata.n_obs,
                'mean_genes': adata.obs.n_genes_by_counts.mean(),
                'mean_counts': adata.obs.total_counts.mean(),
                'mean_mt_pct': adata.obs.pct_counts_mt.mean()
            }
        }
        
        # Step 6: Save results
        if output_dir:
            self.save_results(output_dir)
            adata.write(Path(output_dir) / "qc_filtered.h5ad")
        
        print("QC pipeline completed!")
        return self.results

# Usage
config = Config.default()
pipeline = QualityControlPipeline(config)
results = pipeline.run('data.h5ad', 'results/qc/')

Chamber-Specific Marker Pipeline

class ChamberMarkerPipeline(BasePipeline):
    """Find chamber-specific marker genes"""
    
    def __init__(self, config: Config, chambers=None):
        super().__init__(config)
        self.chambers = chambers or ['RA', 'RV', 'LA', 'LV']
    
    def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
        print("=== Chamber Marker Pipeline ===")
        
        # Load data with chamber info
        adata = sc.read_h5ad(data_path)
        
        if 'chamber' not in adata.obs:
            raise ValueError("Data must have 'chamber' column")
        
        # Find markers for each chamber
        chamber_markers = {}
        
        for chamber in self.chambers:
            print(f"Finding markers for {chamber}...")
            
            # Create binary annotation
            adata.obs['is_chamber'] = (adata.obs['chamber'] == chamber).astype(str)
            
            # Find differential genes
            sc.tl.rank_genes_groups(
                adata,
                groupby='is_chamber',
                method='wilcoxon',
                key_added=f'{chamber}_markers'
            )
            
            # Extract top markers
            markers = sc.get.rank_genes_groups_df(adata, group='True', key=f'{chamber}_markers')
            markers = markers[markers['pvals_adj'] < 0.05]  # Significant only
            markers = markers.nlargest(50, 'scores')  # Top 50
            
            chamber_markers[chamber] = markers
            
            # Save to CSV
            if output_dir:
                markers.to_csv(
                    Path(output_dir) / f"markers_{chamber}.csv",
                    index=False
                )
        
        # Create comparison heatmap
        if output_dir:
            self._plot_chamber_comparison(adata, chamber_markers, output_dir)
        
        self.results = {
            'adata': adata,
            'chamber_markers': chamber_markers
        }
        
        if output_dir:
            self.save_results(output_dir)
        
        return self.results
    
    def _plot_chamber_comparison(self, adata, chamber_markers, output_dir):
        """Plot heatmap comparing chamber markers"""
        import seaborn as sns
        
        # Get top 10 markers per chamber
        top_markers = []
        for chamber, markers in chamber_markers.items():
            top_markers.extend(markers.head(10)['names'].tolist())
        
        top_markers = list(set(top_markers))  # Remove duplicates
        
        # Calculate mean expression per chamber
        expr_data = {}
        for chamber in self.chambers:
            chamber_cells = adata[adata.obs['chamber'] == chamber]
            mean_expr = chamber_cells[:, top_markers].X.mean(axis=0)
            if hasattr(mean_expr, 'A1'):
                mean_expr = mean_expr.A1
            expr_data[chamber] = mean_expr
        
        # Create heatmap
        import pandas as pd
        expr_df = pd.DataFrame(expr_data, index=top_markers)
        
        plt.figure(figsize=(10, 12))
        sns.heatmap(expr_df, cmap='RdBu_r', center=0, cbar_kws={'label': 'Mean Expression'})
        plt.title('Chamber-Specific Marker Expression')
        plt.tight_layout()
        plt.savefig(Path(output_dir) / 'chamber_markers_heatmap.png', dpi=300)
        plt.close()

# Usage
config = Config.default()
pipeline = ChamberMarkerPipeline(config)
results = pipeline.run('multi_chamber_data.h5ad', 'results/chamber_markers/')

Cell Type Annotation Pipeline

class CustomAnnotationPipeline(BasePipeline):
    """Custom cell type annotation with manual markers"""
    
    def __init__(self, config: Config, marker_dict: Dict[str, List[str]]):
        super().__init__(config)
        self.marker_dict = marker_dict
    
    def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
        print("=== Custom Annotation Pipeline ===")
        
        # Load preprocessed data
        adata = self.data_processor.process_from_raw(data_path)
        
        # Ensure clustering is done
        if 'leiden' not in adata.obs:
            print("Computing neighbors and clustering...")
            if 'X_pca' not in adata.obsm:
                sc.tl.pca(adata)
            sc.pp.neighbors(adata, n_neighbors=15)
            sc.tl.leiden(adata, resolution=0.5)
            sc.tl.umap(adata)
        
        # Annotate based on marker genes
        print("Annotating cell types...")
        annotations = self._annotate_clusters(adata)
        adata.obs['cell_type'] = annotations
        
        # Generate visualization
        if output_dir:
            viz_dir = Path(output_dir) / "figures"
            viz_dir.mkdir(parents=True, exist_ok=True)
            
            sc.pl.umap(adata, color='cell_type', legend_loc='on data', show=False)
            plt.savefig(viz_dir / 'annotated_umap.png', dpi=300, bbox_inches='tight')
            plt.close()
            
            # Plot marker genes
            for cell_type, markers in self.marker_dict.items():
                available_markers = [m for m in markers if m in adata.var_names]
                if available_markers:
                    sc.pl.umap(adata, color=available_markers[:4], show=False)
                    plt.savefig(viz_dir / f'markers_{cell_type}.png', dpi=300)
                    plt.close()
        
        self.results = {
            'adata': adata,
            'cell_type_counts': adata.obs['cell_type'].value_counts().to_dict()
        }
        
        if output_dir:
            self.save_results(output_dir)
            adata.write(Path(output_dir) / 'annotated.h5ad')
        
        return self.results
    
    def _annotate_clusters(self, adata):
        """Annotate clusters based on marker expression"""
        annotations = []
        
        for idx, cluster in enumerate(adata.obs['leiden']):
            # Calculate marker scores for each cell type
            scores = {}
            for cell_type, markers in self.marker_dict.items():
                available_markers = [m for m in markers if m in adata.var_names]
                if available_markers:
                    marker_expr = adata[idx, available_markers].X
                    if hasattr(marker_expr, 'toarray'):
                        marker_expr = marker_expr.toarray()
                    scores[cell_type] = marker_expr.mean()
                else:
                    scores[cell_type] = 0
            
            # Assign cell type with highest score
            best_type = max(scores, key=scores.get)
            annotations.append(best_type)
        
        return annotations

# Usage with cardiac markers
cardiac_markers = {
    'Cardiomyocytes': ['MYH7', 'TNNT2', 'ACTC1', 'NPPA'],
    'Fibroblasts': ['COL1A1', 'COL1A2', 'DCN', 'LUM'],
    'Endothelial': ['PECAM1', 'VWF', 'CDH5', 'CLDN5'],
    'Smooth_Muscle': ['ACTA2', 'TAGLN', 'MYH11', 'MYLK'],
    'Immune': ['PTPRC', 'CD68', 'LYZ', 'CD14'],
    'Adipocytes': ['ADIPOQ', 'LEP', 'FABP4', 'PPARG']
}

config = Config.default()
pipeline = CustomAnnotationPipeline(config, cardiac_markers)
results = pipeline.run('preprocessed_data.h5ad', 'results/annotation/')

Advanced Pipeline Patterns

Multi-Step Pipeline

class MultiStepPipeline(BasePipeline):
    """Pipeline with multiple configurable steps"""
    
    def __init__(self, config: Config, steps: List[str]):
        super().__init__(config)
        self.steps = steps
        self.available_steps = {
            'qc': self._step_qc,
            'normalize': self._step_normalize,
            'clustering': self._step_clustering,
            'markers': self._step_markers,
            'communication': self._step_communication
        }
    
    def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
        print(f"=== Multi-Step Pipeline: {' → '.join(self.steps)} ===")
        
        # Load initial data
        adata = sc.read_h5ad(data_path)
        self.results = {'adata': adata}
        
        # Execute each step
        for step_name in self.steps:
            if step_name in self.available_steps:
                print(f"\nExecuting step: {step_name}")
                adata = self.available_steps[step_name](adata, output_dir)
                self.results['adata'] = adata
            else:
                print(f"Warning: Unknown step '{step_name}' skipped")
        
        if output_dir:
            self.save_results(output_dir)
            adata.write(Path(output_dir) / 'final_output.h5ad')
        
        return self.results
    
    def _step_qc(self, adata, output_dir):
        """Quality control step"""
        sc.pp.filter_cells(adata, min_genes=200)
        sc.pp.filter_genes(adata, min_cells=3)
        adata.var['mt'] = adata.var_names.str.startswith('MT-')
        sc.pp.calculate_qc_metrics(adata, qc_vars=['mt'], inplace=True)
        return adata
    
    def _step_normalize(self, adata, output_dir):
        """Normalization step"""
        sc.pp.normalize_total(adata, target_sum=1e4)
        sc.pp.log1p(adata)
        return adata
    
    def _step_clustering(self, adata, output_dir):
        """Clustering step"""
        sc.pp.highly_variable_genes(adata, n_top_genes=2000)
        sc.tl.pca(adata)
        sc.pp.neighbors(adata)
        sc.tl.leiden(adata)
        sc.tl.umap(adata)
        return adata
    
    def _step_markers(self, adata, output_dir):
        """Marker gene identification"""
        sc.tl.rank_genes_groups(adata, 'leiden', method='wilcoxon')
        self.results['markers'] = sc.get.rank_genes_groups_df(adata, group=None)
        return adata
    
    def _step_communication(self, adata, output_dir):
        """Cell-cell communication analysis"""
        # Simplified communication analysis
        # In practice, use LIANA or similar
        return adata

# Usage - flexible workflow
config = Config.default()

# Quick QC and clustering
pipeline = MultiStepPipeline(config, steps=['qc', 'normalize', 'clustering'])
results = pipeline.run('raw_data.h5ad', 'results/quick/')

# Full analysis
pipeline = MultiStepPipeline(config, steps=['qc', 'normalize', 'clustering', 'markers', 'communication'])
results = pipeline.run('raw_data.h5ad', 'results/full/')

Pipeline Composition

class CompositePipeline(BasePipeline):
    """Combine multiple pipelines"""
    
    def __init__(self, config: Config, pipelines: List[BasePipeline]):
        super().__init__(config)
        self.pipelines = pipelines
    
    def run(self, data_path: str, output_dir: Optional[str] = None) -> Dict[str, Any]:
        print("=== Composite Pipeline ===")
        
        current_data = data_path
        all_results = {}
        
        for i, pipeline in enumerate(self.pipelines):
            pipeline_name = pipeline.__class__.__name__
            print(f"\nRunning pipeline {i+1}/{len(self.pipelines)}: {pipeline_name}")
            
            sub_output = Path(output_dir) / f"step_{i+1}_{pipeline_name}" if output_dir else None
            
            results = pipeline.run(current_data, str(sub_output) if sub_output else None)
            all_results[pipeline_name] = results
            
            # Use output from previous pipeline as input to next
            if sub_output and (sub_output / "final_output.h5ad").exists():
                current_data = str(sub_output / "final_output.h5ad")
        
        self.results = all_results
        return self.results

# Usage - chain pipelines
config = Config.default()

qc_pipeline = QualityControlPipeline(config)
annotation_pipeline = CustomAnnotationPipeline(config, cardiac_markers)
chamber_pipeline = ChamberMarkerPipeline(config)

composite = CompositePipeline(config, [
    qc_pipeline,
    annotation_pipeline,
    chamber_pipeline
])

results = composite.run('raw_data.h5ad', 'results/composite/')

Testing Custom Pipelines

import pytest
from pathlib import Path

def test_qc_pipeline():
    """Test custom QC pipeline"""
    # Create test data
    import anndata as ad
    import numpy as np
    
    X = np.random.negative_binomial(5, 0.3, (100, 50))
    adata = ad.AnnData(X)
    test_file = 'test_data.h5ad'
    adata.write(test_file)
    
    # Run pipeline
    config = Config.default()
    pipeline = QualityControlPipeline(config)
    results = pipeline.run(test_file, 'test_results/')
    
    # Assertions
    assert results['qc_metrics']['final_cells'] > 0
    assert Path('test_results/qc_filtered.h5ad').exists()
    
    # Cleanup
    Path(test_file).unlink()

if __name__ == '__main__':
    test_qc_pipeline()
    print("✓ All tests passed")

Next Steps

Advanced Workflows

Complex batch processing and integration patterns

API Reference

Complete pipeline API documentation

Contributing

Contribute your custom pipelines

Best Practices

Pipeline development guidelines

Build docs developers (and LLMs) love