Fenic provides sophisticated rate limiting and batch processing capabilities to maximize throughput while respecting API provider limits. Configure models, control concurrency, and optimize costs.
Session Configuration
All rate limiting and batching behavior is configured through SessionConfig and SemanticConfig.
Basic Setup
import fenic as fc
config = fc.SessionConfig(
app_name = "production_app" ,
semantic = fc.SemanticConfig(
language_models = {
"fast" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 500 , # Requests per minute
tpm = 200_000 # Tokens per minute
)
},
embedding_models = {
"embed" : fc.OpenAIEmbeddingModel(
model_name = "text-embedding-3-small" ,
rpm = 500 ,
tpm = 1_000_000
)
},
default_language_model = "fast" ,
default_embedding_model = "embed"
)
)
session = fc.Session.get_or_create(config)
Rate Limiting
Understanding RPM and TPM
Fenic respects both requests per minute (RPM) and tokens per minute (TPM) limits:
RPM : Maximum number of API requests per minute
TPM : Maximum number of tokens (input + output) per minute
Fenic automatically throttles requests to stay within both limits.
Set conservative limits
Start with limits 20-30% below your tier’s maximum to account for other workloads.
Monitor actual usage
Check metrics after runs to see if you’re hitting limits.
Adjust based on cost
Increase limits if you have budget, decrease if costs are too high.
Multi-Model Configuration
config = fc.SessionConfig(
app_name = "multi_model_app" ,
semantic = fc.SemanticConfig(
language_models = {
# Fast model for simple tasks
"mini" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 1000 ,
tpm = 500_000
),
# Powerful model for complex tasks
"powerful" : fc.OpenAILanguageModel(
model_name = "gpt-4o" ,
rpm = 100 ,
tpm = 100_000
),
# Reasoning model with profiles
"reasoning" : fc.OpenAILanguageModel(
model_name = "o4-mini" ,
rpm = 50 ,
tpm = 50_000 ,
profiles = {
"fast" : fc.OpenAILanguageModel.Profile( reasoning_effort = "low" ),
"thorough" : fc.OpenAILanguageModel.Profile( reasoning_effort = "high" )
},
default_profile = "fast"
)
},
embedding_models = {
# Small, fast embeddings
"small" : fc.OpenAIEmbeddingModel(
model_name = "text-embedding-3-small" ,
rpm = 1000 ,
tpm = 2_000_000
),
# Large, high-quality embeddings
"large" : fc.OpenAIEmbeddingModel(
model_name = "text-embedding-3-large" ,
rpm = 500 ,
tpm = 1_000_000
)
},
default_language_model = "mini" ,
default_embedding_model = "small"
)
)
# Use different models for different tasks
df.select(
# Simple classification with fast model
fc.semantic.classify(
"text" ,
[ "positive" , "negative" , "neutral" ],
model_alias = "mini"
).alias( "sentiment" ),
# Complex extraction with powerful model
fc.semantic.extract(
"text" ,
ComplexSchema,
model_alias = "powerful"
).alias( "structured_data" )
)
Provider-Specific Configuration
OpenAI
fc.OpenAILanguageModel(
model_name = "gpt-4o" ,
rpm = 500 , # Tier-based limit
tpm = 200_000 # Tier-based limit
)
Anthropic
# Anthropic has separate input and output token limits
fc.AnthropicLanguageModel(
model_name = "claude-3-5-sonnet-latest" ,
rpm = 100 ,
input_tpm = 100_000 , # Input token limit
output_tpm = 50_000 # Output token limit
)
Google (Developer)
fc.GoogleDeveloperLanguageModel(
model_name = "gemini-2.0-flash" ,
rpm = 100 ,
tpm = 1_000_000 ,
profiles = {
"fast" : fc.GoogleDeveloperLanguageModel.Profile(
thinking_token_budget = 1024
),
"thorough" : fc.GoogleDeveloperLanguageModel.Profile(
thinking_token_budget = 8192
)
}
)
Google (Vertex AI)
fc.GoogleVertexLanguageModel(
model_name = "gemini-2.0-flash" ,
rpm = 100 ,
tpm = 1_000_000
)
Cohere
fc.CohereEmbeddingModel(
model_name = "embed-v4.0" ,
rpm = 100 ,
tpm = 500_000
)
OpenRouter
from fenic.core.types.provider_routing import (
ProviderRouting,
ProviderSort,
DataCollection
)
fc.OpenRouterLanguageModel(
model_name = "anthropic/claude-3.5-sonnet" ,
rpm = 100 ,
tpm = 100_000 ,
provider_routing = ProviderRouting(
order = [ProviderSort. THROUGHPUT ], # or LATENCY, COST
data_collection = DataCollection. DENY
)
)
Request Timeouts
Control how long to wait for individual LLM requests.
Global Defaults
Default timeout is 120 seconds for all semantic operations.
Per-Operation Timeouts
# Extract with custom timeout
df.select(
fc.semantic.extract(
"text" ,
MySchema,
request_timeout = 60 # 60 second timeout
).alias( "data" )
)
# Map with timeout
df.select(
fc.semantic.map(
"Analyze: {{ text }} " ,
text = fc.col( "text" ),
request_timeout = 90
).alias( "analysis" )
)
# Predicate with timeout
df.filter(
fc.semantic.predicate(
"Is this urgent: {{ text }} " ,
text = fc.col( "text" ),
request_timeout = 30
)
)
# Reduce with timeout
df.group_by( "category" ).agg(
fc.semantic.reduce(
"Summarize" ,
fc.col( "text" ),
request_timeout = 120
).alias( "summary" )
)
Model Profiles
Use profiles to configure different reasoning/thinking settings for the same model.
OpenAI Reasoning Models
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"o4" : fc.OpenAILanguageModel(
model_name = "o4-mini" ,
rpm = 100 ,
tpm = 100_000 ,
profiles = {
"fast" : fc.OpenAILanguageModel.Profile(
reasoning_effort = "low"
),
"balanced" : fc.OpenAILanguageModel.Profile(
reasoning_effort = "medium"
),
"thorough" : fc.OpenAILanguageModel.Profile(
reasoning_effort = "high"
)
},
default_profile = "balanced"
)
}
)
)
# Use default profile
fc.semantic.extract( "text" , Schema, model_alias = "o4" )
# Use specific profile
from fenic.core.types.semantic import ModelAlias
fc.semantic.extract(
"text" ,
Schema,
model_alias = ModelAlias( name = "o4" , profile = "thorough" )
)
Google Gemini Thinking Models
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"gemini" : fc.GoogleDeveloperLanguageModel(
model_name = "gemini-2.5-flash" ,
rpm = 100 ,
tpm = 1_000_000 ,
profiles = {
"thinking_disabled" : fc.GoogleDeveloperLanguageModel.Profile(),
"fast" : fc.GoogleDeveloperLanguageModel.Profile(
thinking_token_budget = 1024
),
"thorough" : fc.GoogleDeveloperLanguageModel.Profile(
thinking_token_budget = 8192
)
},
default_profile = "fast"
)
}
)
)
Anthropic Extended Thinking
config = fc.SessionConfig(
semantic = fc.SemanticConfig(
language_models = {
"claude" : fc.AnthropicLanguageModel(
model_name = "claude-opus-4-0" ,
rpm = 100 ,
input_tpm = 100_000 ,
output_tpm = 50_000 ,
profiles = {
"thinking_disabled" : fc.AnthropicLanguageModel.Profile(),
"fast" : fc.AnthropicLanguageModel.Profile(
thinking_token_budget = 1024
),
"thorough" : fc.AnthropicLanguageModel.Profile(
thinking_token_budget = 4096
)
},
default_profile = "fast"
)
}
)
)
Temperature Restrictions : When reasoning/thinking is enabled, temperature cannot be customized. For GPT-5.1 with reasoning_effort='none', temperature CAN be customized.
Async UDFs and Concurrency
For custom async operations, control concurrency and retry behavior.
import aiohttp
import fenic as fc
@fc.async_udf (
return_type = fc.StringType,
max_concurrency = 50 , # Max parallel requests
timeout_seconds = 10 , # Per-request timeout
num_retries = 3 # Retry on failure
)
async def fetch_user_profile ( user_id : str ) -> str :
async with aiohttp.ClientSession() as session:
async with session.get( f "https://api.example.com/users/ { user_id } " ) as resp:
return await resp.text()
# Use in DataFrame
df.select(
"user_id" ,
fetch_user_profile(fc.col( "user_id" )).alias( "profile" )
)
Concurrency Parameters
max_concurrency: Maximum number of concurrent async operations
timeout_seconds: Timeout for individual operations
num_retries: Number of retry attempts on failure
Concurrency Limits : Set max_concurrency based on your API’s rate limits and available resources. Too high can cause throttling.
Response Caching
Cache LLM responses to reduce costs and improve performance.
Enable Caching
config = fc.SessionConfig(
app_name = "cached_app" ,
semantic = fc.SemanticConfig(
language_models = { "gpt" : fc.OpenAILanguageModel( model_name = "gpt-4o-mini" , rpm = 500 , tpm = 200_000 )},
cache_config = fc.LLMResponseCacheConfig(
backend = "sqlite" ,
cache_dir = "~/.fenic/cache"
)
)
)
Cache Behavior
Responses are cached by request fingerprint (model, prompt, parameters)
Cache hits return instantly without consuming API quota
Cache is persistent across sessions
Cache directory can be shared across projects
# First run: makes API call
result1 = df.select(
fc.semantic.classify( "text" , [ "positive" , "negative" ])
).collect()
# Second run: uses cache (instant, free)
result2 = df.select(
fc.semantic.classify( "text" , [ "positive" , "negative" ])
).collect()
Cost Tracking
Monitor token usage and costs throughout your pipelines.
Execution Metrics
# Execute and capture metrics
metrics = df.select(
fc.semantic.extract( "text" , MySchema).alias( "data" )
).write.save_as_table( "output" , mode = "overwrite" )
# Access aggregated metrics
lm_metrics = metrics.total_lm_metrics
print ( f "Total cost: $ { lm_metrics.cost :.4f} " )
print ( f "Input tokens: { lm_metrics.num_uncached_input_tokens :,} " )
print ( f "Output tokens: { lm_metrics.num_output_tokens :,} " )
print ( f "Cached input tokens: { lm_metrics.num_cached_input_tokens :,} " )
# Detailed execution plan
print (metrics.get_execution_plan_details())
Token Estimation
# Estimate before running
token_count = df.select(
fc.text.count_tokens(fc.col( "text" )).alias( "tokens" )
).agg(fc.sum( "tokens" )).collect()[ 0 ][ 0 ]
print ( f "Estimated input tokens: { token_count :,} " )
print ( f "Estimated cost: $ { token_count * 0.00015 :.2f} " ) # gpt-4o-mini rate
Batching Strategies
Process in Chunks
# Process large dataset in batches
batch_size = 1000
total_rows = df.count()
for offset in range ( 0 , total_rows, batch_size):
batch = session.sql(
f "SELECT * FROM {{ df }} LIMIT { batch_size } OFFSET { offset } " ,
df = df
)
result = batch.select(
fc.semantic.extract( "text" , Schema).alias( "data" )
)
result.write.save_as_table(
"output" ,
mode = "append" if offset > 0 else "overwrite"
)
print ( f "Processed { offset + batch_size } / { total_rows } " )
Parallel Processing
Fenic automatically parallelizes within rate limits.
# This processes 10,000 rows with automatic batching
large_df = session.create_dataframe([{ "text" : f "text { i } " } for i in range ( 10_000 )])
result = large_df.select(
fc.semantic.classify(
"text" ,
[ "category_a" , "category_b" , "category_c" ]
).alias( "category" )
)
# Fenic handles:
# - Batching requests to respect RPM/TPM
# - Parallel execution within limits
# - Automatic retries on transient failures
result.write.save_as_table( "classified" , mode = "overwrite" )
Best Practices
Start Conservative : Begin with rate limits 20-30% below your tier maximum. Increase gradually based on actual usage.
Monitor Costs : Always check metrics.total_lm_metrics.cost after runs. Set alerts for unexpected cost spikes.
Cache Aggressively : Enable caching for development and testing to avoid repeated API calls for the same inputs.
Rate Limit Tuning
# Development: Lower limits, enable caching
dev_config = fc.SessionConfig(
app_name = "dev" ,
semantic = fc.SemanticConfig(
language_models = {
"gpt" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 50 , # Lower for cost control
tpm = 10_000
)
},
cache_config = fc.LLMResponseCacheConfig( backend = "sqlite" )
)
)
# Production: Higher limits, optional caching
prod_config = fc.SessionConfig(
app_name = "prod" ,
semantic = fc.SemanticConfig(
language_models = {
"gpt" : fc.OpenAILanguageModel(
model_name = "gpt-4o-mini" ,
rpm = 500 , # Max for tier
tpm = 200_000
)
}
)
)
Model Selection Strategy
# Route by complexity
def select_model ( complexity : str ) -> str :
return {
"simple" : "mini" , # Fast, cheap
"moderate" : "gpt-4o" , # Balanced
"complex" : "o4" # Reasoning
}[complexity]
# Classify complexity first
df_classified = df.select(
"text" ,
fc.semantic.classify(
"text" ,
[ "simple" , "moderate" , "complex" ],
model_alias = "mini"
).alias( "complexity" )
)
# Process by complexity tier
for complexity in [ "simple" , "moderate" , "complex" ]:
subset = df_classified.filter(fc.col( "complexity" ) == complexity)
result = subset.select(
fc.semantic.extract(
"text" ,
Schema,
model_alias = select_model(complexity)
).alias( "data" )
)
result.write.save_as_table( "output" , mode = "append" )
Troubleshooting
Rate Limit Errors
If you see rate limit errors:
Reduce RPM/TPM in configuration
Check other processes using the same API key
Verify you’re on the correct tier
Add delays between large batches
Timeout Issues
If operations timeout:
Increase request_timeout parameter
Reduce batch size
Check network connectivity
Consider using faster models
High Costs
# Audit token usage
metrics = pipeline.write.save_as_table( "output" , mode = "overwrite" )
total_tokens = (
metrics.total_lm_metrics.num_uncached_input_tokens +
metrics.total_lm_metrics.num_output_tokens
)
print ( f "Total tokens: { total_tokens :,} " )
print ( f "Cost: $ { metrics.total_lm_metrics.cost :.4f} " )
print ( f "Cost per row: $ { metrics.total_lm_metrics.cost / df.count() :.6f} " )
# Reduce costs:
# 1. Use smaller models for simple tasks
# 2. Enable caching
# 3. Reduce max_output_tokens
# 4. Pre-filter data to reduce volume
Next Steps
Error Handling Handle failures, retries, and graceful degradation
Semantic Operators Learn about semantic operations that use these configs