Fenic provides robust error handling mechanisms for production workloads. Learn how to handle failures gracefully, configure retries, and ensure data quality.
Error Behavior
Fenic’s error handling follows these principles:
Graceful degradation
Failed operations return null rather than crashing the entire pipeline.
Automatic retries
Transient failures are automatically retried with exponential backoff.
Row-level isolation
A failure in one row doesn’t affect other rows in the batch.
Observable failures
Track which rows failed and why through filtering and logging.
Semantic Operation Failures
When semantic operations fail, they return null for that row.
Detecting Failures
import fenic as fc
# Extract structured data
df_result = df.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
MySchema
).alias( "extracted_data" )
)
# Check for failures
failures = df_result.filter(fc.col( "extracted_data" ).is_null())
successes = df_result.filter(fc.col( "extracted_data" ).is_not_null())
print ( f "Successes: { successes.count() } " )
print ( f "Failures: { failures.count() } " )
# Inspect failed rows
failures.select( "id" , "text" ).show()
Handling Failures
# Option 1: Filter out failures
df_clean = df_result.filter(
fc.col( "extracted_data" ).is_not_null()
)
# Option 2: Retry with different model
df_retried = failures.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
MySchema,
model_alias = "gpt-4o" , # More capable model
max_output_tokens = 2048 # More tokens
).alias( "extracted_data" )
)
# Combine successes and retries
df_final = successes.union(df_retried)
# Option 3: Use default values
df_with_defaults = df_result.select(
"id" ,
"text" ,
fc.when(
fc.col( "extracted_data" ).is_null(),
fc.struct(
fc.lit( "unknown" ).alias( "category" ),
fc.lit( 0 ).alias( "confidence" )
)
).otherwise(fc.col( "extracted_data" )).alias( "extracted_data" )
)
Timeouts
Control how long to wait for LLM responses.
Default Timeouts
All semantic operations have a default timeout of 120 seconds.
Custom Timeouts
# Short timeout for simple operations
df.select(
fc.semantic.classify(
"text" ,
[ "positive" , "negative" , "neutral" ],
request_timeout = 30 # 30 seconds
).alias( "sentiment" )
)
# Longer timeout for complex extraction
df.select(
fc.semantic.extract(
"long_document" ,
ComplexSchema,
request_timeout = 180 # 3 minutes
).alias( "data" )
)
# Very short timeout for testing
df_test = df.limit( 10 ).select(
fc.semantic.map(
"Summarize: {{ text }} " ,
text = fc.col( "text" ),
request_timeout = 10 # Fail fast for testing
).alias( "summary" )
)
Timeout Handling
# Operations that timeout return null
df_result = df.select(
"id" ,
fc.semantic.extract(
"text" ,
Schema,
request_timeout = 60
).alias( "data" )
)
# Identify timeouts (null results)
timeouts = df_result.filter(fc.col( "data" ).is_null())
# Retry with longer timeout
df_retried = timeouts.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
Schema,
request_timeout = 180 # Longer timeout
).alias( "data" )
)
Async UDF Error Handling
Custom async functions have configurable retry and timeout behavior.
Retry Configuration
import aiohttp
import fenic as fc
@fc.async_udf (
return_type = fc.StringType,
max_concurrency = 50 ,
timeout_seconds = 10 , # Per-request timeout
num_retries = 3 # Retry up to 3 times
)
async def fetch_data ( id : str ) -> str :
async with aiohttp.ClientSession() as session:
async with session.get( f "https://api.example.com/data/ { id } " ) as resp:
if resp.status != 200 :
raise ValueError ( f "API returned { resp.status } " )
return await resp.text()
# Use in pipeline
df_result = df.select(
"id" ,
fetch_data(fc.col( "id" )).alias( "api_data" )
)
# Failed requests return null after retries
failures = df_result.filter(fc.col( "api_data" ).is_null())
Error Behavior
Failed async UDFs return null for that row
Automatic exponential backoff between retries
Retries only for transient errors (network, timeout)
Permanent errors (4xx responses) not retried
Schema Validation
Validate Pydantic schemas before execution to catch errors early.
Invalid Schema Example
from pydantic import BaseModel, Field
# Bad: Missing descriptions
class BadSchema ( BaseModel ):
name: str # No description!
value: int
try :
df.select(
fc.semantic.extract( "text" , BadSchema).alias( "data" )
)
except fc.ValidationError as e:
print ( f "Schema validation failed: { e } " )
# Error: All fields must have descriptions
# Good: All fields described
class GoodSchema ( BaseModel ):
name: str = Field( description = "The name of the entity" )
value: int = Field( description = "The numeric value" )
df.select(
fc.semantic.extract( "text" , GoodSchema).alias( "data" )
)
Supported Types
Fenic validates that your schema uses supported types:
✅ Primitives: str, int, float, bool
✅ Optional: Optional[T]
✅ Lists: List[T], list[T]
✅ Literals: Literal["a", "b", "c"]
✅ Nested models
❌ Unions (except Optional)
❌ Custom classes
❌ Circular references
from typing import List, Optional, Literal
class ValidSchema ( BaseModel ):
name: str = Field( description = "Name" )
count: Optional[ int ] = Field( description = "Count if available" )
tags: List[ str ] = Field( description = "List of tags" )
status: Literal[ "active" , "inactive" ] = Field( description = "Status" )
metadata: Optional[MetadataSchema] = Field( description = "Nested metadata" )
class MetadataSchema ( BaseModel ):
key: str = Field( description = "Metadata key" )
value: str = Field( description = "Metadata value" )
Data Quality Checks
Validate results after semantic operations.
Null Checks
# Extract and validate
df_result = df.select(
"id" ,
fc.semantic.extract( "text" , Schema).alias( "data" )
)
# Check success rate
total = df_result.count()
successes = df_result.filter(fc.col( "data" ).is_not_null()).count()
failure_rate = (total - successes) / total * 100
print ( f "Failure rate: { failure_rate :.2f} %" )
if failure_rate > 5.0 : # Alert if >5% failures
print ( "WARNING: High failure rate detected!" )
Field Validation
# Validate extracted fields
df_validated = df_result.select(
"id" ,
"data" ,
# Check for reasonable values
fc.when(
(fc.col( "data" ).get_item( "confidence" ) >= 0 ) &
(fc.col( "data" ).get_item( "confidence" ) <= 1.0 ),
fc.lit( True )
).otherwise(fc.lit( False )).alias( "valid_confidence" )
)
# Filter invalid results
invalid = df_validated.filter(
~ fc.col( "valid_confidence" ) |
fc.col( "data" ).is_null()
)
if invalid.count() > 0 :
print ( f "Found { invalid.count() } invalid results" )
invalid.show()
# Validate all expected fields are present
df_result = df.select(
"id" ,
fc.semantic.extract( "text" , Schema).alias( "data" )
).unnest( "data" )
# Check for nulls in required fields
required_fields = [ "name" , "category" , "value" ]
for field in required_fields:
null_count = df_result.filter(fc.col(field).is_null()).count()
if null_count > 0 :
print ( f "WARNING: { null_count } rows with null { field } " )
Rate Limit Handling
Fenic automatically handles rate limits, but you can tune behavior.
Automatic Throttling
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"gpt" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 500 , # Fenic throttles to stay under this
tpm = 200_000 # And this
)
}
)
)
# Process large dataset - Fenic handles throttling
large_df = session.create_dataframe([{ "text" : f "text { i } " } for i in range ( 10_000 )])
result = large_df.select(
fc.semantic.classify( "text" , [ "a" , "b" , "c" ]).alias( "category" )
)
# No rate limit errors - automatically throttled
result.write.save_as_table( "output" , mode = "overwrite" )
Conservative Limits
# Set limits below API maximum to account for other workloads
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"gpt" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 400 , # 20% below max of 500
tpm = 160_000 # 20% below max of 200k
)
}
)
)
Pipeline Error Handling
Try Multiple Approaches
def extract_with_fallback ( df , schema ):
"""Try extraction with fallback strategies."""
# Try 1: Fast model
result = df.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
schema,
model_alias = "gpt-4o-mini"
).alias( "data" )
)
successes = result.filter(fc.col( "data" ).is_not_null())
failures = result.filter(fc.col( "data" ).is_null())
if failures.count() == 0 :
return successes
# Try 2: Powerful model for failures
print ( f "Retrying { failures.count() } failures with gpt-4o" )
retried = failures.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
schema,
model_alias = "gpt-4o" ,
max_output_tokens = 2048
).alias( "data" )
)
# Combine results
return successes.union(retried.filter(fc.col( "data" ).is_not_null()))
df_final = extract_with_fallback(df, MySchema)
Checkpoint Progress
# Process in batches with checkpoints
batch_size = 1000
total = df.count()
for i in range ( 0 , total, batch_size):
batch = session.sql(
f "SELECT * FROM {{ df }} LIMIT { batch_size } OFFSET { i } " ,
df = df
)
try :
result = batch.select(
fc.semantic.extract( "text" , Schema).alias( "data" )
)
# Checkpoint successful batches
result.write.save_as_table(
"output" ,
mode = "append" if i > 0 else "overwrite"
)
print ( f "Completed batch { i // batch_size + 1 } / { (total + batch_size - 1 ) // batch_size } " )
except Exception as e:
print ( f "Batch { i // batch_size + 1 } failed: { e } " )
# Save progress so far
print ( "Continuing with next batch..." )
continue
Logging and Debugging
Execution Metrics
# Capture detailed metrics
metrics = df.select(
fc.semantic.extract( "text" , Schema).alias( "data" )
).write.save_as_table( "output" , mode = "overwrite" )
# Print execution details
print ( " \n === Execution Metrics ===" )
print (metrics.get_execution_plan_details())
lm = metrics.total_lm_metrics
print ( f " \n Token Usage:" )
print ( f " Input tokens: { lm.num_uncached_input_tokens :,} " )
print ( f " Output tokens: { lm.num_output_tokens :,} " )
print ( f " Cached tokens: { lm.num_cached_input_tokens :,} " )
print ( f " Total cost: $ { lm.cost :.4f} " )
Debug Failed Rows
# Add debug info to results
df_debug = df.select(
"id" ,
"text" ,
fc.text.length(fc.col( "text" )).alias( "text_length" ),
fc.text.count_tokens(fc.col( "text" )).alias( "token_count" ),
fc.semantic.extract( "text" , Schema).alias( "data" )
)
# Analyze failures
failures = df_debug.filter(fc.col( "data" ).is_null())
print ( " \n Failure Analysis:" )
failures.select(
"id" ,
"text_length" ,
"token_count"
).show()
# Check if failures correlate with length
avg_token_count_success = df_debug.filter(
fc.col( "data" ).is_not_null()
).select(
fc.avg( "token_count" ).alias( "avg_tokens" )
).collect()[ 0 ][ 0 ]
avg_token_count_failure = df_debug.filter(
fc.col( "data" ).is_null()
).select(
fc.avg( "token_count" ).alias( "avg_tokens" )
).collect()[ 0 ][ 0 ]
print ( f "Avg tokens (success): { avg_token_count_success :.0f} " )
print ( f "Avg tokens (failure): { avg_token_count_failure :.0f} " )
Best Practices
Always Check for Nulls : Filter out null results from semantic operations before downstream processing.
Set Realistic Timeouts : Balance between catching real failures and allowing complex operations to complete.
Use Checkpointing : For large pipelines, save progress periodically so failures don’t require full reruns.
Error Handling Checklist
def robust_semantic_pipeline ( df , schema ):
"""Production-ready semantic extraction pipeline."""
# 1. Validate inputs
if df.count() == 0 :
raise ValueError ( "Empty DataFrame" )
# 2. Add debug columns
df_prep = df.select(
"id" ,
"text" ,
fc.text.count_tokens(fc.col( "text" )).alias( "tokens" )
)
# 3. Filter out obviously problematic rows
df_valid = df_prep.filter(
(fc.col( "tokens" ) > 10 ) & # Too short
(fc.col( "tokens" ) < 8000 ) # Too long
)
# 4. Extract with appropriate timeout
df_result = df_valid.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
schema,
request_timeout = 120
).alias( "data" )
)
# 5. Check failure rate
total = df_result.count()
successes = df_result.filter(fc.col( "data" ).is_not_null()).count()
failure_rate = (total - successes) / total * 100
print ( f "Success rate: { 100 - failure_rate :.1f} %" )
# 6. Retry failures with stronger model if needed
if failure_rate > 10.0 :
print ( f "High failure rate ( { failure_rate :.1f} %), retrying..." )
failures = df_result.filter(fc.col( "data" ).is_null())
retried = failures.select(
"id" ,
"text" ,
fc.semantic.extract(
"text" ,
schema,
model_alias = "gpt-4o" ,
max_output_tokens = 2048 ,
request_timeout = 180
).alias( "data" )
)
successes_df = df_result.filter(fc.col( "data" ).is_not_null())
df_result = successes_df.union(retried)
# 7. Final validation
final = df_result.filter(fc.col( "data" ).is_not_null())
if final.count() == 0 :
raise ValueError ( "All extractions failed" )
return final
Common Error Patterns
Context Length Exceeded
# Problem: Input too long
df.select(
fc.semantic.extract(
"very_long_text" , # 50k tokens
Schema
)
)
# Returns null - context length exceeded
# Solution: Chunk first
df_chunked = df.select(
"id" ,
fc.text.recursive_token_chunk(
fc.col( "very_long_text" ),
chunk_size = 4000 ,
chunk_overlap_percentage = 10
).alias( "chunks" )
).explode( "chunks" )
df_extracted = df_chunked.select(
"id" ,
fc.semantic.extract(
"chunks" ,
Schema
).alias( "data" )
)
Invalid JSON in Schema
# Problem: LLM returns invalid JSON
class Schema ( BaseModel ):
value: int = Field( description = "A number" )
df.select(
fc.semantic.extract( "text" , Schema)
)
# May return null if LLM generates invalid JSON
# Solution: Use simpler schema or more capable model
class Schema ( BaseModel ):
value: int = Field( description = "Extract the numeric value. Return only the number with no formatting." )
df.select(
fc.semantic.extract(
"text" ,
Schema,
model_alias = "gpt-4o" # More reliable
)
)
Rate Limit Errors
# Problem: Exceeding API limits
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"gpt" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 1000 , # Too high for tier
tpm = 500_000
)
}
)
)
# Solution: Set conservative limits
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"gpt" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 400 , # 20% below tier limit
tpm = 160_000
)
}
)
)
Next Steps
Batch Inference Configure rate limiting and throughput optimization
Semantic Operators Learn about semantic operations and their error behavior