Skip to main content

Overview

The Application SDK provides a powerful I/O system with Readers and Writers that abstract away the complexity of data storage and retrieval. The system automatically handles object store downloads and uploads, making your code simple and portable.

Key Features

  • Automatic object store downloads and uploads
  • Support for Parquet and JSON formats
  • Memory-efficient batched reading and writing
  • Context manager support for automatic cleanup
  • Flexible DataFrame backends (Pandas and Daft)
  • Transparent caching of downloaded files

Architecture

The I/O system follows a clean separation between reading and writing:

Readers

Read data from various sources with automatic object store downloads

Writers

Write data to various destinations with automatic object store uploads

Data Flow

You never need to manually download or upload files - the readers and writers handle it automatically!

Readers

Readers implement a common interface for reading data from various sources.

Reader Types

ParquetFileReader

Read data from Parquet files with automatic download

JsonFileReader

Read data from JSON Lines files with automatic download

ParquetFileReader

Read Parquet files with automatic object store downloads:
from application_sdk.io.parquet import ParquetFileReader
from application_sdk.io import DataframeType
from temporalio import activity

class MyActivities:
    @activity.defn
    async def transform_data(self, workflow_args: dict):
        output_path = workflow_args["output_path"]
        typename = workflow_args.get("typename", "data")
        
        # Path where previous activity wrote files
        input_path = f"{output_path}/raw/{typename}"
        
        # Use context manager for automatic cleanup
        async with ParquetFileReader(
            path=input_path,
            dataframe_type=DataframeType.daft,  # Better for large datasets
            chunk_size=50000  # Process 50K rows at a time
        ) as reader:
            # Read data in batches for memory efficiency
            async for batch_df in reader.read_batches():
                # Process each batch
                transformed = await self.process_batch(batch_df)
                # Write transformed data...
        # Temp files automatically cleaned up here
Files are automatically downloaded from object store if they don’t exist locally. Downloaded files are cached for subsequent reads.

JsonFileReader

Read JSON Lines files:
from application_sdk.io.json import JsonFileReader
from application_sdk.io import DataframeType

@activity.defn
async def analyze_logs(self, workflow_args: dict):
    log_path = workflow_args["log_path"]
    
    async with JsonFileReader(
        path=log_path,
        dataframe_type=DataframeType.pandas,  # Good for smaller datasets
        chunk_size=10000
    ) as reader:
        # Read all data at once
        df = await reader.read()
        
        # Analyze the data
        error_count = len(df[df['level'] == 'ERROR'])
        return {"error_count": error_count}

Reader API

# Recommended: Use async with for automatic cleanup
async with ParquetFileReader(path="/data/input") as reader:
    df = await reader.read()
    # Process data
# close() called automatically, temp files cleaned up

Reader Parameters

path
str
required
Local path where files are or should be downloaded to
file_names
list[str]
Specific files to read. If not provided, reads all files in the directory
chunk_size
int
default:"100000"
Number of rows per batch when using read_batches()
dataframe_type
DataframeType
default:"pandas"
DataFrame backend: DataframeType.pandas or DataframeType.daft
cleanup_on_close
bool
default:"true"
Whether to delete downloaded files when the reader is closed

DataFrame Type Selection

Pandas

Best for:
  • Small to medium datasets (< 1GB)
  • Rich API and ecosystem
  • Complex transformations
  • Data analysis and exploration
DataframeType.pandas

Daft

Best for:
  • Large datasets (> 1GB)
  • Distributed processing
  • Memory efficiency
  • Parallel operations
DataframeType.daft

Writers

Writers implement a common interface for writing data to various destinations.

Writer Types

ParquetFileWriter

Write data to Parquet files with automatic upload

JsonFileWriter

Write data to JSON Lines files with automatic upload

ParquetFileWriter

Write Parquet files with automatic object store uploads:
from application_sdk.io.parquet import ParquetFileWriter
import pandas as pd

@activity.defn
async def extract_data(self, workflow_args: dict):
    output_path = workflow_args["output_path"]
    typename = "users"
    
    # Create writer
    async with ParquetFileWriter(
        path=f"{output_path}/raw/{typename}",
        typename=typename,
        chunk_size=50000  # Split into 50K record files
    ) as writer:
        # Fetch data from source
        async for batch in self.fetch_users_batched():
            df = pd.DataFrame(batch)
            await writer.write(df)
        
        # Get statistics
        stats = writer.statistics
        return {
            "total_records": stats.total_record_count,
            "chunks": stats.chunk_count
        }
    # Files automatically uploaded to object store here
Files are written locally first, then automatically uploaded to object store when the writer is closed.

JsonFileWriter

Write JSON Lines files:
from application_sdk.io.json import JsonFileWriter

@activity.defn
async def export_results(self, workflow_args: dict):
    output_path = workflow_args["output_path"]
    
    async with JsonFileWriter(
        path=f"{output_path}/results",
        typename="analysis",
        chunk_size=10000
    ) as writer:
        # Write DataFrame
        await writer.write(results_df)
        
        # Write dict directly
        await writer.write({"summary": "Analysis complete"})
        
        # Write list of dicts
        await writer.write([
            {"id": 1, "value": 100},
            {"id": 2, "value": 200}
        ])
        
        # Get statistics
        stats = writer.statistics
        return stats.model_dump()

Writer API

# Recommended: Use async with
async with JsonFileWriter(path="/data/output") as writer:
    await writer.write(dataframe)
    await writer.write({"key": "value"})
# close() called automatically, files uploaded

Writer Parameters

path
str
required
Full path where files will be written (e.g., /data/workflow_run_123/transformed)
typename
str
Subdirectory name under path for organizing output (e.g., tables, columns)
chunk_start
int
Starting index for chunk numbering in filenames
chunk_size
int
default:"50000"
Maximum number of records per output file chunk
chunk_part
int
Part number for file naming (useful for parallel writes)

Writer Output

Writers automatically create organized output:
/data/workflow_run_123/
├── raw/
   ├── tables/
   ├── 1.parquet
   ├── 2-50000.parquet
   ├── 50001-100000.parquet
   └── statistics.json.ignore
   └── columns/
       ├── 1.json
       └── statistics.json.ignore
└── transformed/
    ├── enriched_tables/
   ├── 1.parquet
   └── statistics.json.ignore
.ignore files contain metadata but are not uploaded to object store.

Object Store Integration

How It Works

The SDK provides transparent object store integration:
1

Write Locally

Writer writes files to local filesystem first
2

Auto-Upload

On close(), files are automatically uploaded to object store
3

Optional Cleanup

Local files can be retained or deleted after upload
For reading:
1

Check Local

Reader checks if files exist locally
2

Auto-Download

If not found, automatically downloads from object store
3

Cache Locally

Downloads are cached for subsequent reads
4

Cleanup

Downloaded files cleaned up on close() (if enabled)

Path Normalization

The SDK automatically normalizes paths for object store operations:
# Both of these are equivalent
path1 = "./local/tmp/artifacts/apps/my-app/workflows/wf-123/run-456"
path2 = "artifacts/apps/my-app/workflows/wf-123/run-456"

# Both normalize to: artifacts/apps/my-app/workflows/wf-123/run-456
See Output Paths for details on path structure and conventions.

Advanced Patterns

File Filtering

Read only specific files from a directory:
async with ParquetFileReader(
    path="/data/partitioned",
    file_names=[
        "chunk-0-0.parquet",
        "chunk-0-1.parquet",
        "chunk-0-2.parquet"
    ]
) as reader:
    df = await reader.read()

Multi-Stage Pipeline

Chain readers and writers for data pipelines:
@activity.defn
async def multi_stage_pipeline(self, workflow_args: dict):
    output_path = workflow_args["output_path"]
    
    # Stage 1: Read raw data
    async with JsonFileReader(
        path=f"{output_path}/raw/data"
    ) as reader:
        raw_df = await reader.read()
    
    # Stage 2: Transform
    transformed_df = await self.transform(raw_df)
    
    # Stage 3: Write transformed data
    async with ParquetFileWriter(
        path=f"{output_path}/transformed/data",
        typename="enriched"
    ) as writer:
        await writer.write(transformed_df)
        return writer.statistics.model_dump()

Parallel Processing

Process large datasets in parallel:
import asyncio

@activity.defn
async def parallel_processing(self, workflow_args: dict):
    input_path = workflow_args["input_path"]
    output_path = workflow_args["output_path"]
    
    async with ParquetFileReader(
        path=input_path,
        chunk_size=10000
    ) as reader:
        # Process batches in parallel (with concurrency limit)
        async def process_batch(batch_num, batch_df):
            processed = await self.process(batch_df)
            
            # Write each batch to separate file
            async with ParquetFileWriter(
                path=output_path,
                chunk_part=batch_num
            ) as writer:
                await writer.write(processed)
        
        # Create tasks for parallel processing
        tasks = []
        async for idx, batch in enumerate(reader.read_batches()):
            tasks.append(process_batch(idx, batch))
            
            # Limit concurrency to 5 batches at a time
            if len(tasks) >= 5:
                await asyncio.gather(*tasks)
                tasks = []
        
        # Process remaining batches
        if tasks:
            await asyncio.gather(*tasks)

Conditional Cleanup

Control when downloaded files are cleaned up:
# Keep downloaded files for debugging
async with ParquetFileReader(
    path=input_path,
    cleanup_on_close=False  # Don't delete on close
) as reader:
    df = await reader.read()
    # Files remain after this block

# Later, manually clean up if needed
import shutil
shutil.rmtree(input_path)

Retry on Failure

async def write_with_retry(
    data: pd.DataFrame,
    output_path: str,
    max_retries: int = 3
):
    """Write data with retry logic."""
    for attempt in range(max_retries):
        try:
            async with ParquetFileWriter(path=output_path) as writer:
                await writer.write(data)
                return writer.statistics
        except Exception as e:
            if attempt < max_retries - 1:
                logger.warning(
                    f"Write attempt {attempt + 1} failed: {e}. Retrying..."
                )
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                logger.error(f"Write failed after {max_retries} attempts")
                raise

Best Practices

Use Context Managers

Always use async with for automatic cleanup and resource management

Batch Large Datasets

Use read_batches() and write() in loops for memory efficiency

Choose Right DataFrame

Pandas for small data, Daft for large distributed processing

Monitor Statistics

Use writer statistics to track record counts and chunks

Handle Errors

Implement retry logic for network and storage failures

Organize Output

Use typename parameter to organize different data types

Performance Tips

Adjust chunk size based on your data:
# Small records (< 1KB each): Larger chunks
chunk_size = 100000

# Medium records (1-10KB): Default chunks
chunk_size = 50000

# Large records (> 10KB): Smaller chunks
chunk_size = 10000
Cache downloaded files when processing in multiple stages:
# Stage 1: Download and keep files
async with ParquetFileReader(
    path=input_path,
    cleanup_on_close=False
) as reader:
    df = await reader.read()

# Stage 2: Reuse cached files
async with ParquetFileReader(path=input_path) as reader:
    df2 = await reader.read()  # No download needed
Write to different paths in parallel:
async def write_multiple(
    tables_df: pd.DataFrame,
    columns_df: pd.DataFrame,
    output_path: str
):
    # Write tables and columns in parallel
    await asyncio.gather(
        write_data(tables_df, f"{output_path}/tables"),
        write_data(columns_df, f"{output_path}/columns")
    )

Troubleshooting

If reader can’t find files:
  1. Check the path is correct
  2. Verify files exist in object store
  3. Check object store configuration
  4. Enable debug logging to see download attempts
import logging
logging.getLogger("application_sdk.io").setLevel(logging.DEBUG)
If writer fails to upload:
  1. Check object store credentials
  2. Verify network connectivity
  3. Check object store permissions
  4. Look for error logs in writer output
If running out of memory:
  1. Use read_batches() instead of read()
  2. Reduce chunk size
  3. Use Daft instead of Pandas
  4. Process data in smaller batches

Output Paths

Path structure and organization

Activities

Using I/O in activities

Object Store

Direct object store operations

State Store

Persistent state management

Build docs developers (and LLMs) love