A DataFrame is a distributed collection of data organized into named columns. It provides a PySpark-inspired API for data manipulation with lazy evaluation and automatic optimization.
DataFrames use lazy evaluation - operations build a logical plan without executing immediately:
# These operations are lazy - no execution yetdf = session.read.csv("data.csv")filtered = df.filter(col("age") > 25)selected = filtered.select("name", "age")# Execution happens hereresult = selected.collect() # Action triggers execution
Lazy evaluation allows Fenic to optimize your entire query plan before execution, potentially reordering operations, pushing down filters, and eliminating redundant computations.
from fenic.api.functions import col# Method 1: Using col() functiondf.select(col("name"))# Method 2: Using string namedf.select("name")# Method 3: Using bracket notationdf["name"]
# Select specific columnsdf.select("name", "age")# Select with expressionsdf.select(col("name"), col("age") + 1)# Select all columns plus new onesdf.select("*", (col("age") + 1).alias("age_next_year"))
Join DataFrames using natural language predicates:
from fenic.api.functions import semanticfrom textwrap import dedentjobs = session.read.csv("jobs.csv")resumes = session.read.csv("resumes.csv")# Semantic join based on qualificationmatches = jobs.semantic.join( resumes, predicate=dedent(''' Job: {{left_on}} Experience: {{right_on}} The candidate is qualified for this job.'''), left_on=col("job_description"), right_on=col("work_experience"))
from fenic.api.functions import semantic# Find top 3 most similar documentsqueries.semantic.sim_join( documents, left_on=semantic.embed(col("query_text")), right_on=semantic.embed(col("doc_text")), k=3, similarity_metric="cosine", similarity_score_column="score")
from fenic.api.functions import semantic, col# Summarize documents by categorydf.group_by("category").agg( semantic.reduce( "Summarize these documents", col("document_text") ))# With orderingdf.group_by("conversation_id").agg( semantic.reduce( "Summarize this conversation", col("message_text"), order_by=[col("timestamp")] ))
df1 = session.read.csv("2023_data.csv")df2 = session.read.csv("2024_data.csv")# Union two DataFramescombined = df1.union(df2)# Union multiple DataFramesall_data = df1.union(df2).union(df3)
# Collect as list of rowsrows = df.collect()# Convert to Pandaspdf = df.to_pandas()# Convert to Polarspldf = df.to_polars()# Convert to PyArrowtable = df.to_arrow()# Convert to dictionarydata = df.to_pydict()# Convert to list of dictsdata = df.to_pylist()
# Write to Parquetdf.write.parquet("output.parquet")# Write to CSVdf.write.csv("output.csv")# Write to JSONdf.write.json("output.json")# Save as tabledf.write.save_as_table("my_table")