Skip to main content
Fenic provides sophisticated rate limiting and batch processing capabilities to maximize throughput while respecting API provider limits. Configure models, control concurrency, and optimize costs.

Session Configuration

All rate limiting and batching behavior is configured through SessionConfig and SemanticConfig.

Basic Setup

import fenic as fc

config = fc.SessionConfig(
    app_name="production_app",
    semantic=fc.SemanticConfig(
        language_models={
            "fast": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,      # Requests per minute
                tpm=200_000   # Tokens per minute
            )
        },
        embedding_models={
            "embed": fc.OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=500,
                tpm=1_000_000
            )
        },
        default_language_model="fast",
        default_embedding_model="embed"
    )
)

session = fc.Session.get_or_create(config)

Rate Limiting

Understanding RPM and TPM

Fenic respects both requests per minute (RPM) and tokens per minute (TPM) limits:
  • RPM: Maximum number of API requests per minute
  • TPM: Maximum number of tokens (input + output) per minute
Fenic automatically throttles requests to stay within both limits.
1

Set conservative limits

Start with limits 20-30% below your tier’s maximum to account for other workloads.
2

Monitor actual usage

Check metrics after runs to see if you’re hitting limits.
3

Adjust based on cost

Increase limits if you have budget, decrease if costs are too high.

Multi-Model Configuration

config = fc.SessionConfig(
    app_name="multi_model_app",
    semantic=fc.SemanticConfig(
        language_models={
            # Fast model for simple tasks
            "mini": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=1000,
                tpm=500_000
            ),
            # Powerful model for complex tasks
            "powerful": fc.OpenAILanguageModel(
                model_name="gpt-4o",
                rpm=100,
                tpm=100_000
            ),
            # Reasoning model with profiles
            "reasoning": fc.OpenAILanguageModel(
                model_name="o4-mini",
                rpm=50,
                tpm=50_000,
                profiles={
                    "fast": fc.OpenAILanguageModel.Profile(reasoning_effort="low"),
                    "thorough": fc.OpenAILanguageModel.Profile(reasoning_effort="high")
                },
                default_profile="fast"
            )
        },
        embedding_models={
            # Small, fast embeddings
            "small": fc.OpenAIEmbeddingModel(
                model_name="text-embedding-3-small",
                rpm=1000,
                tpm=2_000_000
            ),
            # Large, high-quality embeddings
            "large": fc.OpenAIEmbeddingModel(
                model_name="text-embedding-3-large",
                rpm=500,
                tpm=1_000_000
            )
        },
        default_language_model="mini",
        default_embedding_model="small"
    )
)

# Use different models for different tasks
df.select(
    # Simple classification with fast model
    fc.semantic.classify(
        "text",
        ["positive", "negative", "neutral"],
        model_alias="mini"
    ).alias("sentiment"),
    
    # Complex extraction with powerful model
    fc.semantic.extract(
        "text",
        ComplexSchema,
        model_alias="powerful"
    ).alias("structured_data")
)

Provider-Specific Configuration

OpenAI

fc.OpenAILanguageModel(
    model_name="gpt-4o",
    rpm=500,      # Tier-based limit
    tpm=200_000   # Tier-based limit
)

Anthropic

# Anthropic has separate input and output token limits
fc.AnthropicLanguageModel(
    model_name="claude-3-5-sonnet-latest",
    rpm=100,
    input_tpm=100_000,   # Input token limit
    output_tpm=50_000    # Output token limit
)

Google (Developer)

fc.GoogleDeveloperLanguageModel(
    model_name="gemini-2.0-flash",
    rpm=100,
    tpm=1_000_000,
    profiles={
        "fast": fc.GoogleDeveloperLanguageModel.Profile(
            thinking_token_budget=1024
        ),
        "thorough": fc.GoogleDeveloperLanguageModel.Profile(
            thinking_token_budget=8192
        )
    }
)

Google (Vertex AI)

fc.GoogleVertexLanguageModel(
    model_name="gemini-2.0-flash",
    rpm=100,
    tpm=1_000_000
)

Cohere

fc.CohereEmbeddingModel(
    model_name="embed-v4.0",
    rpm=100,
    tpm=500_000
)

OpenRouter

from fenic.core.types.provider_routing import (
    ProviderRouting,
    ProviderSort,
    DataCollection
)

fc.OpenRouterLanguageModel(
    model_name="anthropic/claude-3.5-sonnet",
    rpm=100,
    tpm=100_000,
    provider_routing=ProviderRouting(
        order=[ProviderSort.THROUGHPUT],  # or LATENCY, COST
        data_collection=DataCollection.DENY
    )
)

Request Timeouts

Control how long to wait for individual LLM requests.

Global Defaults

Default timeout is 120 seconds for all semantic operations.

Per-Operation Timeouts

# Extract with custom timeout
df.select(
    fc.semantic.extract(
        "text",
        MySchema,
        request_timeout=60  # 60 second timeout
    ).alias("data")
)

# Map with timeout
df.select(
    fc.semantic.map(
        "Analyze: {{ text }}",
        text=fc.col("text"),
        request_timeout=90
    ).alias("analysis")
)

# Predicate with timeout
df.filter(
    fc.semantic.predicate(
        "Is this urgent: {{ text }}",
        text=fc.col("text"),
        request_timeout=30
    )
)

# Reduce with timeout
df.group_by("category").agg(
    fc.semantic.reduce(
        "Summarize",
        fc.col("text"),
        request_timeout=120
    ).alias("summary")
)

Model Profiles

Use profiles to configure different reasoning/thinking settings for the same model.

OpenAI Reasoning Models

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "o4": fc.OpenAILanguageModel(
                model_name="o4-mini",
                rpm=100,
                tpm=100_000,
                profiles={
                    "fast": fc.OpenAILanguageModel.Profile(
                        reasoning_effort="low"
                    ),
                    "balanced": fc.OpenAILanguageModel.Profile(
                        reasoning_effort="medium"
                    ),
                    "thorough": fc.OpenAILanguageModel.Profile(
                        reasoning_effort="high"
                    )
                },
                default_profile="balanced"
            )
        }
    )
)

# Use default profile
fc.semantic.extract("text", Schema, model_alias="o4")

# Use specific profile
from fenic.core.types.semantic import ModelAlias

fc.semantic.extract(
    "text",
    Schema,
    model_alias=ModelAlias(name="o4", profile="thorough")
)

Google Gemini Thinking Models

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "gemini": fc.GoogleDeveloperLanguageModel(
                model_name="gemini-2.5-flash",
                rpm=100,
                tpm=1_000_000,
                profiles={
                    "thinking_disabled": fc.GoogleDeveloperLanguageModel.Profile(),
                    "fast": fc.GoogleDeveloperLanguageModel.Profile(
                        thinking_token_budget=1024
                    ),
                    "thorough": fc.GoogleDeveloperLanguageModel.Profile(
                        thinking_token_budget=8192
                    )
                },
                default_profile="fast"
            )
        }
    )
)

Anthropic Extended Thinking

config = fc.SessionConfig(
    semantic=fc.SemanticConfig(
        language_models={
            "claude": fc.AnthropicLanguageModel(
                model_name="claude-opus-4-0",
                rpm=100,
                input_tpm=100_000,
                output_tpm=50_000,
                profiles={
                    "thinking_disabled": fc.AnthropicLanguageModel.Profile(),
                    "fast": fc.AnthropicLanguageModel.Profile(
                        thinking_token_budget=1024
                    ),
                    "thorough": fc.AnthropicLanguageModel.Profile(
                        thinking_token_budget=4096
                    )
                },
                default_profile="fast"
            )
        }
    )
)
Temperature Restrictions: When reasoning/thinking is enabled, temperature cannot be customized. For GPT-5.1 with reasoning_effort='none', temperature CAN be customized.

Async UDFs and Concurrency

For custom async operations, control concurrency and retry behavior.
import aiohttp
import fenic as fc

@fc.async_udf(
    return_type=fc.StringType,
    max_concurrency=50,    # Max parallel requests
    timeout_seconds=10,     # Per-request timeout
    num_retries=3          # Retry on failure
)
async def fetch_user_profile(user_id: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/users/{user_id}") as resp:
            return await resp.text()

# Use in DataFrame
df.select(
    "user_id",
    fetch_user_profile(fc.col("user_id")).alias("profile")
)

Concurrency Parameters

  • max_concurrency: Maximum number of concurrent async operations
  • timeout_seconds: Timeout for individual operations
  • num_retries: Number of retry attempts on failure
Concurrency Limits: Set max_concurrency based on your API’s rate limits and available resources. Too high can cause throttling.

Response Caching

Cache LLM responses to reduce costs and improve performance.

Enable Caching

config = fc.SessionConfig(
    app_name="cached_app",
    semantic=fc.SemanticConfig(
        language_models={"gpt": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=500, tpm=200_000)},
        cache_config=fc.LLMResponseCacheConfig(
            backend="sqlite",
            cache_dir="~/.fenic/cache"
        )
    )
)

Cache Behavior

  • Responses are cached by request fingerprint (model, prompt, parameters)
  • Cache hits return instantly without consuming API quota
  • Cache is persistent across sessions
  • Cache directory can be shared across projects
# First run: makes API call
result1 = df.select(
    fc.semantic.classify("text", ["positive", "negative"])
).collect()

# Second run: uses cache (instant, free)
result2 = df.select(
    fc.semantic.classify("text", ["positive", "negative"])
).collect()

Cost Tracking

Monitor token usage and costs throughout your pipelines.

Execution Metrics

# Execute and capture metrics
metrics = df.select(
    fc.semantic.extract("text", MySchema).alias("data")
).write.save_as_table("output", mode="overwrite")

# Access aggregated metrics
lm_metrics = metrics.total_lm_metrics
print(f"Total cost: ${lm_metrics.cost:.4f}")
print(f"Input tokens: {lm_metrics.num_uncached_input_tokens:,}")
print(f"Output tokens: {lm_metrics.num_output_tokens:,}")
print(f"Cached input tokens: {lm_metrics.num_cached_input_tokens:,}")

# Detailed execution plan
print(metrics.get_execution_plan_details())

Token Estimation

# Estimate before running
token_count = df.select(
    fc.text.count_tokens(fc.col("text")).alias("tokens")
).agg(fc.sum("tokens")).collect()[0][0]

print(f"Estimated input tokens: {token_count:,}")
print(f"Estimated cost: ${token_count * 0.00015:.2f}")  # gpt-4o-mini rate

Batching Strategies

Process in Chunks

# Process large dataset in batches
batch_size = 1000
total_rows = df.count()

for offset in range(0, total_rows, batch_size):
    batch = session.sql(
        f"SELECT * FROM {{df}} LIMIT {batch_size} OFFSET {offset}",
        df=df
    )
    
    result = batch.select(
        fc.semantic.extract("text", Schema).alias("data")
    )
    
    result.write.save_as_table(
        "output",
        mode="append" if offset > 0 else "overwrite"
    )
    
    print(f"Processed {offset + batch_size}/{total_rows}")

Parallel Processing

Fenic automatically parallelizes within rate limits.
# This processes 10,000 rows with automatic batching
large_df = session.create_dataframe([{"text": f"text {i}"} for i in range(10_000)])

result = large_df.select(
    fc.semantic.classify(
        "text",
        ["category_a", "category_b", "category_c"]
    ).alias("category")
)

# Fenic handles:
# - Batching requests to respect RPM/TPM
# - Parallel execution within limits
# - Automatic retries on transient failures
result.write.save_as_table("classified", mode="overwrite")

Best Practices

Start Conservative: Begin with rate limits 20-30% below your tier maximum. Increase gradually based on actual usage.
Monitor Costs: Always check metrics.total_lm_metrics.cost after runs. Set alerts for unexpected cost spikes.
Cache Aggressively: Enable caching for development and testing to avoid repeated API calls for the same inputs.

Rate Limit Tuning

# Development: Lower limits, enable caching
dev_config = fc.SessionConfig(
    app_name="dev",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=50,      # Lower for cost control
                tpm=10_000
            )
        },
        cache_config=fc.LLMResponseCacheConfig(backend="sqlite")
    )
)

# Production: Higher limits, optional caching
prod_config = fc.SessionConfig(
    app_name="prod",
    semantic=fc.SemanticConfig(
        language_models={
            "gpt": fc.OpenAILanguageModel(
                model_name="gpt-4o-mini",
                rpm=500,     # Max for tier
                tpm=200_000
            )
        }
    )
)

Model Selection Strategy

# Route by complexity
def select_model(complexity: str) -> str:
    return {
        "simple": "mini",      # Fast, cheap
        "moderate": "gpt-4o",  # Balanced
        "complex": "o4"        # Reasoning
    }[complexity]

# Classify complexity first
df_classified = df.select(
    "text",
    fc.semantic.classify(
        "text",
        ["simple", "moderate", "complex"],
        model_alias="mini"
    ).alias("complexity")
)

# Process by complexity tier
for complexity in ["simple", "moderate", "complex"]:
    subset = df_classified.filter(fc.col("complexity") == complexity)
    
    result = subset.select(
        fc.semantic.extract(
            "text",
            Schema,
            model_alias=select_model(complexity)
        ).alias("data")
    )
    
    result.write.save_as_table("output", mode="append")

Troubleshooting

Rate Limit Errors

If you see rate limit errors:
  1. Reduce RPM/TPM in configuration
  2. Check other processes using the same API key
  3. Verify you’re on the correct tier
  4. Add delays between large batches

Timeout Issues

If operations timeout:
  1. Increase request_timeout parameter
  2. Reduce batch size
  3. Check network connectivity
  4. Consider using faster models

High Costs

# Audit token usage
metrics = pipeline.write.save_as_table("output", mode="overwrite")

total_tokens = (
    metrics.total_lm_metrics.num_uncached_input_tokens +
    metrics.total_lm_metrics.num_output_tokens
)

print(f"Total tokens: {total_tokens:,}")
print(f"Cost: ${metrics.total_lm_metrics.cost:.4f}")
print(f"Cost per row: ${metrics.total_lm_metrics.cost / df.count():.6f}")

# Reduce costs:
# 1. Use smaller models for simple tasks
# 2. Enable caching
# 3. Reduce max_output_tokens
# 4. Pre-filter data to reduce volume

Next Steps

Error Handling

Handle failures, retries, and graceful degradation

Semantic Operators

Learn about semantic operations that use these configs

Build docs developers (and LLMs) love