Skip to main content
The TableEnvironment is the central concept of the Table API and SQL. It is the entry point for:
  • Creating, registering, and querying tables and views
  • Executing SQL statements (DDL, DML, DQL)
  • Registering and managing user-defined functions
  • Working with catalogs and databases
  • Configuring job-level options

Creating a TableEnvironment

Streaming mode

For most use cases, create a TableEnvironment from EnvironmentSettings:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

Batch mode

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inBatchMode()
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

Interoperating with the DataStream API

If you need to convert between DataStream and Table, create a StreamTableEnvironment from an existing StreamExecutionEnvironment:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Use StreamTableEnvironment only when you need to mix Table API code with DataStream API code in the same program. For pure table programs, use TableEnvironment.create() instead.

Executing SQL statements

executeSql

executeSql() is the primary method for running SQL. It supports DDL (CREATE TABLE, DROP TABLE, ALTER TABLE), DML (INSERT INTO), DQL (SELECT), and utility statements (SHOW TABLES, DESCRIBE, EXPLAIN, USE).
// DDL: create a source table
tableEnv.executeSql(
    "CREATE TABLE kafka_source (" +
    "  user_id BIGINT," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user-events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// DDL: create a sink table
tableEnv.executeSql(
    "CREATE TABLE print_sink (" +
    "  user_id BIGINT," +
    "  event_time TIMESTAMP(3)" +
    ") WITH ('connector' = 'print')"
);

// DML: submit a streaming job
tableEnv.executeSql("INSERT INTO print_sink SELECT user_id, event_time FROM kafka_source");

sqlQuery

sqlQuery() evaluates a SELECT statement and returns the result as a Table object. The query is not executed until you call a terminal operation such as execute() or executeInsert().
Table result = tableEnv.sqlQuery(
    "SELECT region, COUNT(*) AS cnt " +
    "FROM Orders " +
    "GROUP BY region"
);

// print results locally (triggers execution)
result.execute().print();

Registering tables and views

Creating tables from connectors

Use a TableDescriptor to register a table programmatically without writing a DDL string:
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.DataTypes;

Schema schema = Schema.newBuilder()
    .column("user_id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("score", DataTypes.DOUBLE())
    .build();

tableEnv.createTemporaryTable("users", TableDescriptor.forConnector("filesystem")
    .schema(schema)
    .option("path", "/data/users")
    .option("format", "csv")
    .build());

Registering a view from a Table object

Table aggregated = tableEnv.sqlQuery(
    "SELECT region, SUM(amount) AS total FROM Orders GROUP BY region"
);

// register as a named view for reuse
tableEnv.createTemporaryView("RegionTotals", aggregated);

// now use it in another query
tableEnv.sqlQuery("SELECT * FROM RegionTotals WHERE total > 1000").execute().print();

Scanning a registered table

// returns a Table object backed by the registered table
Table orders = tableEnv.from("Orders");

Table and SQL operations reference

The following table lists commonly used TableEnvironment methods:
Java/ScalaPythonDescription
from(path)from_path(path)Scans a registered table.
fromValues(values...)from_elements(elements, schema)Creates a table from in-memory values.
createTemporaryView(path, table)create_temporary_view(path, table)Registers a Table as a temporary view.
createTemporaryTable(path, descriptor)create_temporary_table(path, descriptor)Creates a temporary connector table.
createTable(path, descriptor)create_table(path, descriptor)Creates a catalog table from a descriptor.
executeSql(stmt)execute_sql(stmt)Executes a DDL/DML/DQL/utility statement.
sqlQuery(query)sql_query(query)Returns query results as a Table.
dropTemporaryView(path)drop_temporary_view(path)Drops a temporary view.
dropTemporaryTable(path)drop_temporary_table(path)Drops a temporary table.

Using a StatementSet for multiple INSERT jobs

By default, each INSERT INTO statement runs as a separate Flink job. Use StatementSet to submit multiple INSERT INTO statements as a single, optimized job:
StatementSet stmtSet = tableEnv.createStatementSet();

stmtSet.addInsertSql(
    "INSERT INTO sink_a SELECT * FROM source WHERE region = 'EU'"
);
stmtSet.addInsertSql(
    "INSERT INTO sink_b SELECT * FROM source WHERE region = 'US'"
);

// both inserts run in a single Flink job, sharing the source scan
stmtSet.execute();

Catalog API

The TableEnvironment exposes methods for navigating and modifying the catalog hierarchy:
// register a catalog
tableEnv.registerCatalog("myhive", new HiveCatalog("myhive", null, "/path/to/hive-conf"));

// switch the current catalog and database
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");

// inspect what is available
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();

Configuration

Access TableConfig to set Flink configuration options at the job level:
TableConfig config = tableEnv.getConfig();

// parallelism
config.set("parallelism.default", "8");

// job name shown in the Flink UI
config.set("pipeline.name", "my-analytics-job");

// exactly-once checkpointing every 3 minutes
config.set("execution.checkpointing.mode", "EXACTLY_ONCE");
config.set("execution.checkpointing.interval", "3min");

// RocksDB state backend
config.set("state.backend.type", "rocksdb");
config.set("execution.checkpointing.dir", "s3://my-bucket/checkpoints/");

Restart strategy

TableConfig config = tableEnv.getConfig();

config.set("restart-strategy.type", "fixed-delay");
config.set("restart-strategy.fixed-delay.attempts", "3");
config.set("restart-strategy.fixed-delay.delay", "30s");

Registering user-defined functions

import org.apache.flink.table.functions.ScalarFunction;

public static class UpperCase extends ScalarFunction {
    public String eval(String s) {
        return s.toUpperCase();
    }
}

// register as a temporary system function (available across catalogs/databases)
tableEnv.createTemporarySystemFunction("upper_case", UpperCase.class);

// use in SQL
tableEnv.sqlQuery("SELECT upper_case(name) FROM users").execute().print();
For full details on implementing and registering UDFs, see User-Defined Functions.

Build docs developers (and LLMs) love