Skip to main content
User-defined functions (UDFs) let you extend Flink SQL and the Table API with custom logic that cannot be expressed using built-in functions. You implement them in Java, Scala, or Python and register them with the TableEnvironment. Flink supports the following UDF kinds:
KindBase classInputOutput
Scalar functionScalarFunctionOne or more values per rowOne value per row
Async scalar functionAsyncScalarFunctionOne or more values per rowOne value per row (async)
Table functionTableFunction<T>One or more values per rowZero or more rows
Async table functionAsyncTableFunction<T>One or more values per rowZero or more rows (async)
Aggregate functionAggregateFunction<T, ACC>Values from multiple rowsOne value per group
Table aggregate functionTableAggregateFunction<T, ACC>Values from multiple rowsZero or more rows per group

Registering functions

// register by class (recommended)
tableEnv.createTemporarySystemFunction("my_func", MyScalarFunc.class);

// register by instance (use when the function has constructor parameters)
tableEnv.createTemporarySystemFunction("my_func", new MyScalarFunc(true));

// register as a catalog function (persisted in current catalog/database)
tableEnv.createFunction("my_catalog_func", MyScalarFunc.class);
A function registered with createTemporarySystemFunction is available everywhere in the session. A function registered with createFunction is stored in the current catalog and database and survives session restarts (when using a persistent catalog).

Scalar functions

A scalar function maps zero, one, or multiple input values to a single output value. Implement one or more public eval(...) methods. The method signature determines the input and output types.

Basic example

import org.apache.flink.table.functions.ScalarFunction;

public static class HashFunction extends ScalarFunction {

    // takes any type, returns INT
    public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
        return o.hashCode();
    }
}

TableEnvironment env = TableEnvironment.create(...);

// inline call (no registration needed)
env.from("MyTable").select(call(HashFunction.class, $("myField")));

// register then call in SQL
env.createTemporarySystemFunction("HashFunction", HashFunction.class);
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable").execute().print();

Overloaded eval methods

Flink dispatches to the correct overload based on argument types:
public static class SumFunction extends ScalarFunction {

    public Integer eval(Integer a, Integer b) {
        return a + b;
    }

    public Integer eval(String a, String b) {
        return Integer.valueOf(a) + Integer.valueOf(b);
    }

    public Integer eval(Double... d) {
        double result = 0;
        for (double v : d) result += v;
        return (int) result;
    }
}

Parameterized functions

Pass constructor parameters when registering by instance:
public static class SubstringFunction extends ScalarFunction {
    private final boolean endInclusive;

    public SubstringFunction(boolean endInclusive) {
        this.endInclusive = endInclusive;
    }

    public String eval(String s, Integer begin, Integer end) {
        return s.substring(begin, endInclusive ? end + 1 : end);
    }
}

env.createTemporarySystemFunction("substring_incl", new SubstringFunction(true));

Python scalar function

from pyflink.table.udf import udf
from pyflink.table.types import DataTypes

@udf(result_type=DataTypes.STRING())
def to_upper(s: str) -> str:
    return s.upper() if s else None

t_env.create_temporary_system_function("to_upper", to_upper)
t_env.sql_query("SELECT to_upper(name) FROM users").execute().print()

Async scalar functions

Use AsyncScalarFunction when the function needs to make network calls (database lookups, REST requests). Async execution overlaps waiting time across multiple concurrent requests, dramatically increasing throughput.
import org.apache.flink.table.functions.AsyncScalarFunction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public static class EnrichmentFunction extends AsyncScalarFunction {
    private transient Executor executor;

    @Override
    public void open(FunctionContext context) {
        executor = Executors.newFixedThreadPool(10);
    }

    // first arg is always CompletableFuture<ReturnType>
    public void eval(CompletableFuture<String> future, Integer id) {
        executor.execute(() -> {
            // simulate an external lookup
            try { Thread.sleep(50); } catch (InterruptedException e) {}
            switch (id) {
                case 1: future.complete("Alice"); break;
                case 2: future.complete("Bob");   break;
                default: future.completeExceptionally(
                    new IllegalArgumentException("Unknown id: " + id));
            }
        });
    }
}

env.getConfig().set("table.exec.async-scalar.max-concurrent-operations", "50");
env.getConfig().set("table.exec.async-scalar.timeout", "30s");
env.createTemporarySystemFunction("enrich", EnrichmentFunction.class);
env.sqlQuery("SELECT enrich(user_id) AS user_name FROM events").execute().print();

Table functions

A table function (UDTF) maps one input row to zero, one, or many output rows. Implement eval(...) and call collect(...) to emit each output row.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {

    public void eval(String str) {
        for (String s : str.split(" ")) {
            collect(Row.of(s, s.length()));
        }
    }
}

TableEnvironment env = TableEnvironment.create(...);
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

// inner join: rows with no split results are dropped
env.sqlQuery(
    "SELECT myField, word, length " +
    "FROM MyTable, LATERAL TABLE(SplitFunction(myField))"
).execute().print();

// left join: unmatched outer rows are preserved with NULL for UDTF columns
env.sqlQuery(
    "SELECT myField, word, length " +
    "FROM MyTable " +
    "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE"
).execute().print();

Python table function

from pyflink.table.udf import udtf
from pyflink.table.types import DataTypes
from pyflink.common import Row

@udtf(result_types=[DataTypes.STRING(), DataTypes.INT()])
def split(s: str):
    for word in s.split(" "):
        yield Row(word, len(word))

t_env.create_temporary_system_function("split", split)
t_env.sql_query(
    "SELECT myField, word, length "
    "FROM MyTable, LATERAL TABLE(split(myField))"
).execute().print()

Aggregate functions

Aggregate functions (UDAGGs) map values from multiple rows to a single scalar result. The function maintains an accumulator that is updated row by row:
  1. createAccumulator() — creates the initial (empty) accumulator.
  2. accumulate(acc, value...) — called for each input row.
  3. getValue(acc) — returns the final result after all rows are processed.
  4. retract(acc, value...) — optional; required for streaming queries that retract rows.
  5. merge(acc, Iterable<ACC>) — optional; required for session windows and bounded OVER aggregates.
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.*;

public static class WeightedAvgAccumulator {
    public long sum = 0;
    public int count = 0;
}

public static class WeightedAvg
        extends AggregateFunction<Long, WeightedAvgAccumulator> {

    @Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }

    @Override
    public Long getValue(WeightedAvgAccumulator acc) {
        return acc.count == 0 ? null : acc.sum / acc.count;
    }

    public void accumulate(WeightedAvgAccumulator acc, Long value, Integer weight) {
        acc.sum   += value * weight;
        acc.count += weight;
    }

    public void retract(WeightedAvgAccumulator acc, Long value, Integer weight) {
        acc.sum   -= value * weight;
        acc.count -= weight;
    }

    public void merge(
            WeightedAvgAccumulator acc,
            Iterable<WeightedAvgAccumulator> it) {
        for (WeightedAvgAccumulator a : it) {
            acc.sum   += a.sum;
            acc.count += a.count;
        }
    }
}

TableEnvironment env = TableEnvironment.create(...);
env.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

env.sqlQuery(
    "SELECT category, WeightedAvg(price, quantity) AS avg_price " +
    "FROM Products GROUP BY category"
).execute().print();

Type inference with annotations

Flink infers input and output types from the Java method signature using reflection. Use annotations when the default inference is insufficient.

@DataTypeHint

import org.apache.flink.table.annotation.DataTypeHint;

public static class OverloadedFunction extends ScalarFunction {

    // simple case: no hint needed for Long
    public Long eval(long a, long b) {
        return a + b;
    }

    // specify precision and scale for DECIMAL
    public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
        return BigDecimal.valueOf(a + b);
    }

    // return a complex ROW type
    @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
    public Row eval(int i) {
        return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
    }
}

@FunctionHint

import org.apache.flink.table.annotation.FunctionHint;

// globally declare the output type for all eval() overloads
@FunctionHint(output = @DataTypeHint("ROW<word STRING, count INT>"))
public static class WordCountFunction extends TableFunction<Row> {

    public void eval(String sentence) {
        for (String w : sentence.split(" ")) {
            collect(Row.of(w, 1));
        }
    }

    public void eval() {
        collect(Row.of("<empty>", 0));
    }
}

Named parameters with @ArgumentHint

Annotate parameters with @ArgumentHint to support named-parameter call syntax in SQL:
public static class FormatterFunction extends ScalarFunction {

    public String eval(
        @ArgumentHint(name = "input",   isOptional = false, type = @DataTypeHint("STRING")) String s,
        @ArgumentHint(name = "padding", isOptional = true,  type = @DataTypeHint("INT"))    Integer pad
    ) {
        return pad == null ? s : String.format("%" + pad + "s", s);
    }
}
Calling with named parameters in SQL:
SELECT FormatterFunction(input => myField, padding => 10) FROM MyTable;
-- optional parameter omitted:
SELECT FormatterFunction(input => myField) FROM MyTable;

Runtime context and open/close lifecycle

Access job parameters, metrics, and distributed cache files via FunctionContext:
public static class HashCodeFunction extends ScalarFunction {
    private int factor;

    @Override
    public void open(FunctionContext context) throws Exception {
        // read a job parameter set via env.getConfig().addJobParameter(...)
        factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "31"));
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }

    @Override
    public void close() {
        // release resources (connections, thread pools, etc.)
    }
}

env.getConfig().addJobParameter("hashcode_factor", "31");
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);
env.sqlQuery("SELECT hashCode(name) FROM users").execute().print();

Function resolution order

When multiple functions share the same name, Flink resolves them in this order:
  1. Temporary system functions
  2. System (built-in) functions
  3. Temporary catalog functions in the current catalog and database
  4. Catalog functions in the current catalog and database
Use a fully-qualified name (catalog.database.function) for precise reference.

Build docs developers (and LLMs) love