Skip to main content
Fenic provides robust error handling mechanisms for production workloads. Learn how to handle failures gracefully, configure retries, and ensure data quality.

Error Behavior

Fenic’s error handling follows these principles:
1

Graceful degradation

Failed operations return null rather than crashing the entire pipeline.
2

Automatic retries

Transient failures are automatically retried with exponential backoff.
3

Row-level isolation

A failure in one row doesn’t affect other rows in the batch.
4

Observable failures

Track which rows failed and why through filtering and logging.

Semantic Operation Failures

When semantic operations fail, they return null for that row.

Detecting Failures

import fenic as fc

# Extract structured data
df_result = df.select(
    "id",
    "text",
    fc.semantic.extract(
        "text",
        MySchema
    ).alias("extracted_data")
)

# Check for failures
failures = df_result.filter(fc.col("extracted_data").is_null())
successes = df_result.filter(fc.col("extracted_data").is_not_null())

print(f"Successes: {successes.count()}")
print(f"Failures: {failures.count()}")

# Inspect failed rows
failures.select("id", "text").show()

Handling Failures

# Option 1: Filter out failures
df_clean = df_result.filter(
    fc.col("extracted_data").is_not_null()
)

# Option 2: Retry with different model
df_retried = failures.select(
    "id",
    "text",
    fc.semantic.extract(
        "text",
        MySchema,
        model_alias="gpt-4o",  # More capable model
        max_output_tokens=2048  # More tokens
    ).alias("extracted_data")
)

# Combine successes and retries
df_final = successes.union(df_retried)

# Option 3: Use default values
df_with_defaults = df_result.select(
    "id",
    "text",
    fc.when(
        fc.col("extracted_data").is_null(),
        fc.struct(
            fc.lit("unknown").alias("category"),
            fc.lit(0).alias("confidence")
        )
    ).otherwise(fc.col("extracted_data")).alias("extracted_data")
)

Timeouts

Control how long to wait for LLM responses.

Default Timeouts

All semantic operations have a default timeout of 120 seconds.

Custom Timeouts

# Short timeout for simple operations
df.select(
    fc.semantic.classify(
        "text",
        ["positive", "negative", "neutral"],
        request_timeout=30  # 30 seconds
    ).alias("sentiment")
)

# Longer timeout for complex extraction
df.select(
    fc.semantic.extract(
        "long_document",
        ComplexSchema,
        request_timeout=180  # 3 minutes
    ).alias("data")
)

# Very short timeout for testing
df_test = df.limit(10).select(
    fc.semantic.map(
        "Summarize: {{ text }}",
        text=fc.col("text"),
        request_timeout=10  # Fail fast for testing
    ).alias("summary")
)

Timeout Handling

# Operations that timeout return null
df_result = df.select(
    "id",
    fc.semantic.extract(
        "text",
        Schema,
        request_timeout=60
    ).alias("data")
)

# Identify timeouts (null results)
timeouts = df_result.filter(fc.col("data").is_null())

# Retry with longer timeout
df_retried = timeouts.select(
    "id",
    "text",
    fc.semantic.extract(
        "text",
        Schema,
        request_timeout=180  # Longer timeout
    ).alias("data")
)

Async UDF Error Handling

Custom async functions have configurable retry and timeout behavior.

Retry Configuration

import aiohttp
import fenic as fc

@fc.async_udf(
    return_type=fc.StringType,
    max_concurrency=50,
    timeout_seconds=10,     # Per-request timeout
    num_retries=3          # Retry up to 3 times
)
async def fetch_data(id: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/data/{id}") as resp:
            if resp.status != 200:
                raise ValueError(f"API returned {resp.status}")
            return await resp.text()

# Use in pipeline
df_result = df.select(
    "id",
    fetch_data(fc.col("id")).alias("api_data")
)

# Failed requests return null after retries
failures = df_result.filter(fc.col("api_data").is_null())

Error Behavior

  • Failed async UDFs return null for that row
  • Automatic exponential backoff between retries
  • Retries only for transient errors (network, timeout)
  • Permanent errors (4xx responses) not retried

Schema Validation

Validate Pydantic schemas before execution to catch errors early.

Invalid Schema Example

from pydantic import BaseModel, Field

# Bad: Missing descriptions
class BadSchema(BaseModel):
    name: str  # No description!
    value: int

try:
    df.select(
        fc.semantic.extract("text", BadSchema).alias("data")
    )
except fc.ValidationError as e:
    print(f"Schema validation failed: {e}")
    # Error: All fields must have descriptions

# Good: All fields described
class GoodSchema(BaseModel):
    name: str = Field(description="The name of the entity")
    value: int = Field(description="The numeric value")

df.select(
    fc.semantic.extract("text", GoodSchema).alias("data")
)

Supported Types

Fenic validates that your schema uses supported types:
  • ✅ Primitives: str, int, float, bool
  • ✅ Optional: Optional[T]
  • ✅ Lists: List[T], list[T]
  • ✅ Literals: Literal["a", "b", "c"]
  • ✅ Nested models
  • ❌ Unions (except Optional)
  • ❌ Custom classes
  • ❌ Circular references
from typing import List, Optional, Literal

class ValidSchema(BaseModel):
    name: str = Field(description="Name")
    count: Optional[int] = Field(description="Count if available")
    tags: List[str] = Field(description="List of tags")
    status: Literal["active", "inactive"] = Field(description="Status")
    metadata: Optional[MetadataSchema] = Field(description="Nested metadata")

class MetadataSchema(BaseModel):
    key: str = Field(description="Metadata key")
    value: str = Field(description="Metadata value")

Data Quality Checks

Validate results after semantic operations.

Null Checks

# Extract and validate
df_result = df.select(
    "id",
    fc.semantic.extract("text", Schema).alias("data")
)

# Check success rate
total = df_result.count()
successes = df_result.filter(fc.col("data").is_not_null()).count()
failure_rate = (total - successes) / total * 100

print(f"Failure rate: {failure_rate:.2f}%")

if failure_rate > 5.0:  # Alert if >5% failures
    print("WARNING: High failure rate detected!")

Field Validation

# Validate extracted fields
df_validated = df_result.select(
    "id",
    "data",
    # Check for reasonable values
    fc.when(
        (fc.col("data").get_item("confidence") >= 0) &
        (fc.col("data").get_item("confidence") <= 1.0),
        fc.lit(True)
    ).otherwise(fc.lit(False)).alias("valid_confidence")
)

# Filter invalid results
invalid = df_validated.filter(
    ~fc.col("valid_confidence") |
    fc.col("data").is_null()
)

if invalid.count() > 0:
    print(f"Found {invalid.count()} invalid results")
    invalid.show()

Schema Conformance

# Validate all expected fields are present
df_result = df.select(
    "id",
    fc.semantic.extract("text", Schema).alias("data")
).unnest("data")

# Check for nulls in required fields
required_fields = ["name", "category", "value"]

for field in required_fields:
    null_count = df_result.filter(fc.col(field).is_null()).count()
    if null_count > 0:
        print(f"WARNING: {null_count} rows with null {field}")

Rate Limit Handling

Fenic automatically handles rate limits, but you can tune behavior.

Automatic Throttling

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "gpt": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,      # Fenic throttles to stay under this
                tpm=200_000   # And this
            )
        }
    )
)

# Process large dataset - Fenic handles throttling
large_df = session.create_dataframe([{"text": f"text {i}"} for i in range(10_000)])

result = large_df.select(
    fc.semantic.classify("text", ["a", "b", "c"]).alias("category")
)

# No rate limit errors - automatically throttled
result.write.save_as_table("output", mode="overwrite")

Conservative Limits

# Set limits below API maximum to account for other workloads
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "gpt": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=400,      # 20% below max of 500
                tpm=160_000   # 20% below max of 200k
            )
        }
    )
)

Pipeline Error Handling

Try Multiple Approaches

def extract_with_fallback(df, schema):
    """Try extraction with fallback strategies."""
    
    # Try 1: Fast model
    result = df.select(
        "id",
        "text",
        fc.semantic.extract(
            "text",
            schema,
            model_alias="gpt-4o-mini"
        ).alias("data")
    )
    
    successes = result.filter(fc.col("data").is_not_null())
    failures = result.filter(fc.col("data").is_null())
    
    if failures.count() == 0:
        return successes
    
    # Try 2: Powerful model for failures
    print(f"Retrying {failures.count()} failures with gpt-4o")
    
    retried = failures.select(
        "id",
        "text",
        fc.semantic.extract(
            "text",
            schema,
            model_alias="gpt-4o",
            max_output_tokens=2048
        ).alias("data")
    )
    
    # Combine results
    return successes.union(retried.filter(fc.col("data").is_not_null()))

df_final = extract_with_fallback(df, MySchema)

Checkpoint Progress

# Process in batches with checkpoints
batch_size = 1000
total = df.count()

for i in range(0, total, batch_size):
    batch = session.sql(
        f"SELECT * FROM {{df}} LIMIT {batch_size} OFFSET {i}",
        df=df
    )
    
    try:
        result = batch.select(
            fc.semantic.extract("text", Schema).alias("data")
        )
        
        # Checkpoint successful batches
        result.write.save_as_table(
            "output",
            mode="append" if i > 0 else "overwrite"
        )
        
        print(f"Completed batch {i//batch_size + 1}/{(total + batch_size - 1)//batch_size}")
        
    except Exception as e:
        print(f"Batch {i//batch_size + 1} failed: {e}")
        # Save progress so far
        print("Continuing with next batch...")
        continue

Logging and Debugging

Execution Metrics

# Capture detailed metrics
metrics = df.select(
    fc.semantic.extract("text", Schema).alias("data")
).write.save_as_table("output", mode="overwrite")

# Print execution details
print("\n=== Execution Metrics ===")
print(metrics.get_execution_plan_details())

lm = metrics.total_lm_metrics
print(f"\nToken Usage:")
print(f"  Input tokens: {lm.num_uncached_input_tokens:,}")
print(f"  Output tokens: {lm.num_output_tokens:,}")
print(f"  Cached tokens: {lm.num_cached_input_tokens:,}")
print(f"  Total cost: ${lm.cost:.4f}")

Debug Failed Rows

# Add debug info to results
df_debug = df.select(
    "id",
    "text",
    fc.text.length(fc.col("text")).alias("text_length"),
    fc.text.count_tokens(fc.col("text")).alias("token_count"),
    fc.semantic.extract("text", Schema).alias("data")
)

# Analyze failures
failures = df_debug.filter(fc.col("data").is_null())

print("\nFailure Analysis:")
failures.select(
    "id",
    "text_length",
    "token_count"
).show()

# Check if failures correlate with length
avg_token_count_success = df_debug.filter(
    fc.col("data").is_not_null()
).select(
    fc.avg("token_count").alias("avg_tokens")
).collect()[0][0]

avg_token_count_failure = df_debug.filter(
    fc.col("data").is_null()
).select(
    fc.avg("token_count").alias("avg_tokens")
).collect()[0][0]

print(f"Avg tokens (success): {avg_token_count_success:.0f}")
print(f"Avg tokens (failure): {avg_token_count_failure:.0f}")

Best Practices

Always Check for Nulls: Filter out null results from semantic operations before downstream processing.
Set Realistic Timeouts: Balance between catching real failures and allowing complex operations to complete.
Use Checkpointing: For large pipelines, save progress periodically so failures don’t require full reruns.

Error Handling Checklist

def robust_semantic_pipeline(df, schema):
    """Production-ready semantic extraction pipeline."""
    
    # 1. Validate inputs
    if df.count() == 0:
        raise ValueError("Empty DataFrame")
    
    # 2. Add debug columns
    df_prep = df.select(
        "id",
        "text",
        fc.text.count_tokens(fc.col("text")).alias("tokens")
    )
    
    # 3. Filter out obviously problematic rows
    df_valid = df_prep.filter(
        (fc.col("tokens") > 10) &  # Too short
        (fc.col("tokens") < 8000)  # Too long
    )
    
    # 4. Extract with appropriate timeout
    df_result = df_valid.select(
        "id",
        "text",
        fc.semantic.extract(
            "text",
            schema,
            request_timeout=120
        ).alias("data")
    )
    
    # 5. Check failure rate
    total = df_result.count()
    successes = df_result.filter(fc.col("data").is_not_null()).count()
    failure_rate = (total - successes) / total * 100
    
    print(f"Success rate: {100 - failure_rate:.1f}%")
    
    # 6. Retry failures with stronger model if needed
    if failure_rate > 10.0:
        print(f"High failure rate ({failure_rate:.1f}%), retrying...")
        
        failures = df_result.filter(fc.col("data").is_null())
        retried = failures.select(
            "id",
            "text",
            fc.semantic.extract(
                "text",
                schema,
                model_alias="gpt-4o",
                max_output_tokens=2048,
                request_timeout=180
            ).alias("data")
        )
        
        successes_df = df_result.filter(fc.col("data").is_not_null())
        df_result = successes_df.union(retried)
    
    # 7. Final validation
    final = df_result.filter(fc.col("data").is_not_null())
    
    if final.count() == 0:
        raise ValueError("All extractions failed")
    
    return final

Common Error Patterns

Context Length Exceeded

# Problem: Input too long
df.select(
    fc.semantic.extract(
        "very_long_text",  # 50k tokens
        Schema
    )
)
# Returns null - context length exceeded

# Solution: Chunk first
df_chunked = df.select(
    "id",
    fc.text.recursive_token_chunk(
        fc.col("very_long_text"),
        chunk_size=4000,
        chunk_overlap_percentage=10
    ).alias("chunks")
).explode("chunks")

df_extracted = df_chunked.select(
    "id",
    fc.semantic.extract(
        "chunks",
        Schema
    ).alias("data")
)

Invalid JSON in Schema

# Problem: LLM returns invalid JSON
class Schema(BaseModel):
    value: int = Field(description="A number")

df.select(
    fc.semantic.extract("text", Schema)
)
# May return null if LLM generates invalid JSON

# Solution: Use simpler schema or more capable model
class Schema(BaseModel):
    value: int = Field(description="Extract the numeric value. Return only the number with no formatting.")

df.select(
    fc.semantic.extract(
        "text",
        Schema,
        model_alias="gpt-4o"  # More reliable
    )
)

Rate Limit Errors

# Problem: Exceeding API limits
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "gpt": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=1000,  # Too high for tier
                tpm=500_000
            )
        }
    )
)

# Solution: Set conservative limits
config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "gpt": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=400,    # 20% below tier limit
                tpm=160_000
            )
        }
    )
)

Next Steps

Batch Inference

Configure rate limiting and throughput optimization

Semantic Operators

Learn about semantic operations and their error behavior

Build docs developers (and LLMs) love