Python’s Global Interpreter Lock (GIL) limits native multi-threading, but we can still leverage multi-threading effectively for I/O-bound tasks since the GIL is released during I/O operations.
The SDK combines Python’s I/O capabilities with Temporal’s workflow orchestration to enable scalable execution across multiple workers.
For additional scaling beyond Temporal activity parallelism, use Python’s multiprocessing library:
import multiprocessing as mpfrom typing import List, Dict, Anydef process_table_chunk(tables: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Process a chunk of tables.""" results = [] for table in tables: # Perform CPU-intensive transformation transformed = transform_table(table) results.append(transformed) return resultsdef parallel_table_processing(all_tables: List[Dict[str, Any]], num_processes: int = 4): """Process tables in parallel across multiple CPU cores.""" # Split tables into chunks chunk_size = len(all_tables) // num_processes chunks = [all_tables[i:i + chunk_size] for i in range(0, len(all_tables), chunk_size)] # Process chunks in parallel with mp.Pool(processes=num_processes) as pool: results = pool.map(process_table_chunk, chunks) # Flatten results return [item for sublist in results for item in sublist]
Be mindful of memory usage when using multiprocessing with large datasets. Consider processing data in batches.
Use the state store system to save intermediate results at regular intervals:
from application_sdk.clients.state_store import StateStoreClientclass WorkflowActivity: def __init__(self): self.state_store = StateStoreClient() async def extract_large_dataset(self, checkpoint_interval: int = 1000): """Extract data with checkpointing.""" processed_count = 0 checkpoint_key = "extraction_checkpoint" # Try to resume from checkpoint checkpoint = await self.state_store.get(checkpoint_key) start_index = checkpoint.get("last_processed", 0) if checkpoint else 0 for i in range(start_index, total_records): # Process record process_record(i) processed_count += 1 # Save checkpoint periodically if processed_count % checkpoint_interval == 0: await self.state_store.set(checkpoint_key, { "last_processed": i, "timestamp": time.time() })
Recovery Benefits
Checkpointing enables activity recovery from the state store if pods are deleted and activities are re-run.
from temporalio import activityclass MetadataActivities: @activity.defn async def extract_tables_with_progress(self, schema_name: str) -> List[Dict]: """Extract tables with manual heartbeating and progress reporting.""" tables = [] total = 1000 for i in range(total): # Send heartbeat with progress information activity.heartbeat({"processed": i, "total": total}) table = await self.fetch_table(schema_name, i) tables.append(table) return tables
The auto_heartbeater decorator sends heartbeats 3x per timeout interval by default.
BATCH_SIZE = 1000for i in range(0, len(tables), BATCH_SIZE): batch = tables[i:i + BATCH_SIZE] await process_batch(batch)
2
Connection Pooling
Use connection pooling for database clients:
from sqlalchemy.pool import QueuePoolengine = create_engine( connection_string, poolclass=QueuePool, pool_size=10, max_overflow=20)
3
Lazy Loading
Load data only when needed:
def get_table_details(table_name: str): # Fetch basic info first table = get_table_info(table_name) # Load detailed info only if needed if requires_details: table.columns = get_column_details(table_name) return table
4
Parallel Execution
Use asyncio.gather() for concurrent operations:
schemas = ["public", "sales", "analytics"]results = await asyncio.gather(*[ extract_schema(schema) for schema in schemas])