Skip to main content

Overview

A DataFrame is a distributed collection of data organized into named columns. It provides a PySpark-inspired API for data manipulation with lazy evaluation and automatic optimization.

Key Concepts

Lazy Evaluation

DataFrames use lazy evaluation - operations build a logical plan without executing immediately:
# These operations are lazy - no execution yet
df = session.read.csv("data.csv")
filtered = df.filter(col("age") > 25)
selected = filtered.select("name", "age")

# Execution happens here
result = selected.collect()  # Action triggers execution
Lazy evaluation allows Fenic to optimize your entire query plan before execution, potentially reordering operations, pushing down filters, and eliminating redundant computations.

Transformations vs Actions

Transformations are lazy and return a new DataFrame:
  • select(), filter(), with_column(), join(), group_by()
Actions trigger execution and return results:
  • collect(), show(), count(), to_pandas(), to_polars(), write()

Creating DataFrames

# Read CSV
df = session.read.csv("data.csv")

# Read Parquet
df = session.read.parquet("data.parquet")

# Read JSON
df = session.read.json("data.json")

# Read with glob patterns
df = session.read.csv("data/*.csv")

Schema and Columns

Inspecting Schema

# Get schema
schema = df.schema
print(schema)
# Schema([
#     ColumnField('name', StringType),
#     ColumnField('age', IntegerType)
# ])

# Get column names
columns = df.columns
print(columns)  # ['name', 'age']

# Print schema in human-readable format
df.print_schema()

Accessing Columns

from fenic.api.functions import col

# Method 1: Using col() function
df.select(col("name"))

# Method 2: Using string name
df.select("name")

# Method 3: Using bracket notation
df["name"]

Common Transformations

Selecting Columns

# Select specific columns
df.select("name", "age")

# Select with expressions
df.select(col("name"), col("age") + 1)

# Select all columns plus new ones
df.select("*", (col("age") + 1).alias("age_next_year"))

Filtering Rows

# Simple filter
df.filter(col("age") > 25)

# Multiple conditions
df.filter((col("age") > 25) & (col("city") == "NYC"))

# String operations
df.filter(col("name").startswith("A"))

Adding and Modifying Columns

# Add new column
df.with_column("age_doubled", col("age") * 2)

# Add multiple columns
df.with_columns({
    "age_doubled": col("age") * 2,
    "name_upper": col("name").upper()
})

# Rename column
df.with_column_renamed("age", "years")

Dropping Columns

# Drop single column
df.drop("age")

# Drop multiple columns
df.drop("age", "city")

Sorting

# Sort ascending
df.sort("age")

# Sort descending
df.sort(col("age").desc())

# Multiple columns
df.sort(col("city"), col("age").desc())

Removing Duplicates

# Drop all duplicate rows
df.drop_duplicates()

# Drop duplicates based on specific columns
df.drop_duplicates(["name", "email"])

Limiting Results

# Get first N rows
df.limit(10)

# Combine with sort for top N
df.sort(col("score").desc()).limit(10)

Joins

Standard Joins

df1 = session.create_dataframe({"id": [1, 2], "name": ["Alice", "Bob"]})
df2 = session.create_dataframe({"id": [2, 3], "city": ["NYC", "LA"]})

# Inner join
df1.join(df2, on="id", how="inner")

# Left join
df1.join(df2, on="id", how="left")

# Join on multiple columns
df1.join(df2, on=["id", "country"], how="inner")

# Join with different column names
df1.join(df2, on=df1["user_id"] == df2["id"], how="left")

Semantic Joins

Join DataFrames using natural language predicates:
from fenic.api.functions import semantic
from textwrap import dedent

jobs = session.read.csv("jobs.csv")
resumes = session.read.csv("resumes.csv")

# Semantic join based on qualification
matches = jobs.semantic.join(
    resumes,
    predicate=dedent('''
        Job: {{left_on}}
        Experience: {{right_on}}
        The candidate is qualified for this job.'''),
    left_on=col("job_description"),
    right_on=col("work_experience")
)

Similarity Joins

Join based on embedding similarity:
from fenic.api.functions import semantic

# Find top 3 most similar documents
queries.semantic.sim_join(
    documents,
    left_on=semantic.embed(col("query_text")),
    right_on=semantic.embed(col("doc_text")),
    k=3,
    similarity_metric="cosine",
    similarity_score_column="score"
)

Aggregations

Group By

from fenic.api.functions import count, avg, sum, max, min

# Group and aggregate
df.group_by("city").agg(
    count("*").alias("total"),
    avg("age").alias("avg_age")
)

# Multiple grouping columns
df.group_by("city", "state").agg(
    sum("sales").alias("total_sales")
)

# Filter after aggregation
df.group_by("city").agg(
    count("*").alias("count")
).filter(col("count") > 10)

Semantic Reduce

Aggregate text using LLM:
from fenic.api.functions import semantic, col

# Summarize documents by category
df.group_by("category").agg(
    semantic.reduce(
        "Summarize these documents",
        col("document_text")
    )
)

# With ordering
df.group_by("conversation_id").agg(
    semantic.reduce(
        "Summarize this conversation",
        col("message_text"),
        order_by=[col("timestamp")]
    )
)

Union

Combine DataFrames with the same schema:
df1 = session.read.csv("2023_data.csv")
df2 = session.read.csv("2024_data.csv")

# Union two DataFrames
combined = df1.union(df2)

# Union multiple DataFrames
all_data = df1.union(df2).union(df3)

Actions

Collecting Results

# Collect as list of rows
rows = df.collect()

# Convert to Pandas
pdf = df.to_pandas()

# Convert to Polars
pldf = df.to_polars()

# Convert to PyArrow
table = df.to_arrow()

# Convert to dictionary
data = df.to_pydict()

# Convert to list of dicts
data = df.to_pylist()

Displaying Results

# Show first 20 rows
df.show()

# Show first N rows
df.show(n=10)

# Show with truncation disabled
df.show(truncate=False)

Counting

# Count rows
total = df.count()

# Count after filter
filtered_count = df.filter(col("age") > 25).count()

Writing Data

# Write to Parquet
df.write.parquet("output.parquet")

# Write to CSV
df.write.csv("output.csv")

# Write to JSON
df.write.json("output.json")

# Save as table
df.write.save_as_table("my_table")

Complex Data Types

Working with Arrays

from fenic.api.functions import array, col

# Create array column
df.with_column("tags", array(["tag1", "tag2"]))

# Explode array into rows
df.select("id", col("tags").explode())

# Array operations
df.select(col("tags").list.len().alias("tag_count"))

Working with Structs

# Access struct fields
df.select(col("address").struct.field("city"))

# Unnest struct fields
df.unnest("address")  # Expands struct into separate columns

Semantic Operations

DataFrames provide a semantic namespace for LLM-powered operations:
from fenic.api.functions import semantic, col

# Extract structured data
df.with_column(
    "extracted",
    semantic.extract("text", response_format=MyPydanticModel)
)

# Classify text
df.with_column(
    "category",
    semantic.classify("text", classes=["A", "B", "C"])
)

# Generate embeddings
df.with_column(
    "embeddings",
    semantic.embed(col("text"))
)

# Summarize text
df.with_column(
    "summary",
    semantic.summarize(col("text"))
)

Best Practices

Apply filters as early as possible to reduce data volume:
# Good: Filter first
df.filter(col("date") >= "2024-01-01").select("name", "amount")

# Less efficient: Select first
df.select("name", "amount", "date").filter(col("date") >= "2024-01-01")
Prefer col() function for clarity:
from fenic.api.functions import col

# Clear and explicit
df.filter(col("age") > 25)

# Less clear
df.filter("age > 25")
Use limit() or show() to preview data instead of collecting everything:
# Preview data
df.limit(100).to_pandas()

# Don't do this with huge datasets
# df.collect()  # May cause OOM
Use method chaining for clear data transformation pipelines:
result = (
    df
    .filter(col("status") == "active")
    .with_column("total", col("price") * col("quantity"))
    .group_by("category")
    .agg(sum("total").alias("revenue"))
    .sort(col("revenue").desc())
    .limit(10)
)

Performance Tips

  1. Partition large datasets: When reading large files, use partitioned data formats like Parquet
  2. Cache intermediate results: Use df.cache() for DataFrames used multiple times
  3. Push down predicates: Filters are automatically pushed down to data sources when possible
  4. Use semantic operations efficiently: Batch LLM calls happen automatically in semantic operations

Next Steps

Semantic Operators

Learn about LLM-powered transformations

Data Types

Explore Fenic’s type system

Build docs developers (and LLMs) love