The Application SDK provides a powerful I/O system with Readers and Writers that abstract away the complexity of data storage and retrieval. The system automatically handles object store downloads and uploads, making your code simple and portable.
Read Parquet files with automatic object store downloads:
from application_sdk.io.parquet import ParquetFileReaderfrom application_sdk.io import DataframeTypefrom temporalio import activityclass MyActivities: @activity.defn async def transform_data(self, workflow_args: dict): output_path = workflow_args["output_path"] typename = workflow_args.get("typename", "data") # Path where previous activity wrote files input_path = f"{output_path}/raw/{typename}" # Use context manager for automatic cleanup async with ParquetFileReader( path=input_path, dataframe_type=DataframeType.daft, # Better for large datasets chunk_size=50000 # Process 50K rows at a time ) as reader: # Read data in batches for memory efficiency async for batch_df in reader.read_batches(): # Process each batch transformed = await self.process_batch(batch_df) # Write transformed data... # Temp files automatically cleaned up here
Files are automatically downloaded from object store if they don’t exist locally. Downloaded files are cached for subsequent reads.
from application_sdk.io.json import JsonFileReaderfrom application_sdk.io import DataframeType@activity.defnasync def analyze_logs(self, workflow_args: dict): log_path = workflow_args["log_path"] async with JsonFileReader( path=log_path, dataframe_type=DataframeType.pandas, # Good for smaller datasets chunk_size=10000 ) as reader: # Read all data at once df = await reader.read() # Analyze the data error_count = len(df[df['level'] == 'ERROR']) return {"error_count": error_count}
# Recommended: Use async with for automatic cleanupasync with ParquetFileReader(path="/data/input") as reader: df = await reader.read() # Process data# close() called automatically, temp files cleaned up
# Memory-efficient batched readingasync with ParquetFileReader( path="/data/large_dataset", chunk_size=100000) as reader: async for batch in reader.read_batches(): # Process each batch independently await process_batch(batch)
# Recommended: Use async withasync with JsonFileWriter(path="/data/output") as writer: await writer.write(dataframe) await writer.write({"key": "value"})# close() called automatically, files uploaded
# Write data in batchesasync with ParquetFileWriter(path="/data/output") as writer: async for batch in fetch_data_batches(): await writer.write(batch) # Each batch written incrementally
The SDK automatically normalizes paths for object store operations:
# Both of these are equivalentpath1 = "./local/tmp/artifacts/apps/my-app/workflows/wf-123/run-456"path2 = "artifacts/apps/my-app/workflows/wf-123/run-456"# Both normalize to: artifacts/apps/my-app/workflows/wf-123/run-456
See Output Paths for details on path structure and conventions.
# Keep downloaded files for debuggingasync with ParquetFileReader( path=input_path, cleanup_on_close=False # Don't delete on close) as reader: df = await reader.read() # Files remain after this block# Later, manually clean up if neededimport shutilshutil.rmtree(input_path)
# Small records (< 1KB each): Larger chunkschunk_size = 100000# Medium records (1-10KB): Default chunkschunk_size = 50000# Large records (> 10KB): Smaller chunkschunk_size = 10000
Minimize Downloads
Cache downloaded files when processing in multiple stages:
# Stage 1: Download and keep filesasync with ParquetFileReader( path=input_path, cleanup_on_close=False) as reader: df = await reader.read()# Stage 2: Reuse cached filesasync with ParquetFileReader(path=input_path) as reader: df2 = await reader.read() # No download needed