Skip to main content
Flink provides a command-line interface (CLI) at bin/flink for submitting packaged jobs, monitoring executions, and managing job lifecycle. The CLI connects to the JobManager specified in your conf/config.yaml.

Submitting a job

Use flink run to upload a JAR and start a job:
./bin/flink run \
    --detached \
    ./examples/streaming/StateMachineExample.jar
The --detached flag returns immediately after submission. The output includes the assigned job ID:
Job has been submitted with JobID cca7bc1061d61cf15238e92312c2fc20
Store the job ID in a variable for subsequent commands:
export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"

Passing configuration at submission

Pass any Flink configuration option with -D:
./bin/flink run \
    -Dparallelism.default=8 \
    -Dtaskmanager.memory.process.size=4096m \
    -Dpipeline.max-parallelism=128 \
    ./my-job.jar
On session clusters, only execution configuration parameters (those under the execution prefix) take effect at submission time. Cluster-level options like memory sizes are fixed at cluster startup.

Monitoring jobs

List all running and scheduled jobs:
./bin/flink list
------------------ Running/Restarting Jobs -------------------
30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)
--------------------------------------------------------------

Creating savepoints

A savepoint captures the full state of a running job so you can stop and resume it later:
./bin/flink savepoint \
    $JOB_ID \
    /tmp/flink-savepoints
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
For large state, use --detached to avoid a client timeout:
./bin/flink savepoint \
    $JOB_ID \
    /tmp/flink-savepoints \
    -detached

Disposing a savepoint

Remove savepoint data and clean up metadata:
./bin/flink savepoint \
    --dispose \
    /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
    $JOB_ID
If your job uses custom state classes, pass the JAR to avoid ClassNotFoundException:
./bin/flink savepoint \
    --dispose /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
    --jarfile my-job.jar

Creating checkpoints manually

You can trigger a checkpoint on demand:
./bin/flink checkpoint $JOB_ID
To force a full checkpoint even if the job uses incremental checkpointing:
./bin/flink checkpoint $JOB_ID --full

Stopping a job gracefully

The stop command sends a final checkpoint barrier from sources to sinks, creates a savepoint, then calls cancel() on each source. This is the recommended way to stop a streaming job you intend to resume:
./bin/flink stop \
    --savepointPath /tmp/flink-savepoints \
    $JOB_ID
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
Use --drain to emit MAX_WATERMARK before stopping, flushing all event-time timers and windows:
./bin/flink stop \
    --savepointPath /tmp/flink-savepoints \
    --drain \
    $JOB_ID
Only use --drain if you intend to terminate the job permanently. Draining modifies event-time processing state and can produce incorrect results if the job is resumed afterward.

Cancelling a job

Cancel a job immediately without creating a savepoint:
./bin/flink cancel $JOB_ID
The job transitions from RUNNING to CANCELLED and all computation stops.

Resuming from a savepoint

Restart a job from a previously created savepoint:
./bin/flink run \
    --detached \
    --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
    ./examples/streaming/StateMachineExample.jar
A new job ID is assigned. If your updated job removed an operator that existed when the savepoint was taken, use --allowNonRestoredState:
./bin/flink run \
    --fromSavepoint <savepointPath> \
    --allowNonRestoredState \
    ./my-updated-job.jar

Selecting a deployment target

Use --target to specify where and how to submit the job:
TargetDescription
remoteSubmit to an already-running standalone cluster
localRun in a local MiniCluster (development only)
yarn-sessionSubmit to a running Flink on YARN session
yarn-applicationStart a new Flink cluster on YARN in Application mode
kubernetes-sessionSubmit to a running Flink on Kubernetes session
kubernetes-applicationStart a new Flink cluster on Kubernetes in Application mode
# YARN Application mode
./bin/flink run --target yarn-application \
    ./examples/streaming/TopSpeedWindowing.jar

# Kubernetes session (targeting an existing session cluster)
./bin/flink run --target kubernetes-session \
    -Dkubernetes.cluster-id=my-session-cluster \
    ./examples/streaming/TopSpeedWindowing.jar

CLI actions reference

ActionPurpose
runSubmit and optionally execute a job
run-applicationSubmit a job in Application mode
infoPrint the optimized execution graph without running the job
listList running and scheduled jobs
savepointCreate or dispose a savepoint
checkpointTrigger a manual checkpoint
cancelCancel a running job immediately
stopGracefully stop a job with a final savepoint
For a complete list of options for any action:
./bin/flink <action> --help
PyFlink jobs are submitted without a JAR file path. Python 3.9 or higher is required:
# Run a Python script directly
./bin/flink run --python examples/python/table/word_count.py

# Include additional Python files on PYTHONPATH
./bin/flink run \
    --python examples/python/table/word_count.py \
    --pyFiles file:///utils.py,hdfs:///shared/helpers.py

# Run using a module name (useful in Application mode)
./bin/flink run \
    --pyModule word_count \
    --pyFiles examples/python/table

# Submit to YARN Application mode with a virtual environment
./bin/flink run -t yarn-application \
    -Djobmanager.memory.process.size=1024m \
    -Dtaskmanager.memory.process.size=1024m \
    -Dyarn.ship-files=/path/to/shipfiles \
    -pyarch shipfiles/venv.zip \
    -pyclientexec venv.zip/venv/bin/python3 \
    -pyexec venv.zip/venv/bin/python3 \
    -pyfs shipfiles \
    -pym word_count

Python CLI options

OptionDescription
-py, --pythonPython script containing the job entry point
-pym, --pyModulePython module name containing the entry point (requires --pyFiles)
-pyfs, --pyFilesAdditional files or directories to add to PYTHONPATH
-pyarch, --pyArchivesArchive files (e.g., virtual environments) distributed to workers
-pyexec, --pyExecutablePath to the Python interpreter used on workers
-pyclientexec, --pyClientExecutablePath to the Python interpreter used on the client
-pyreq, --pyRequirementsrequirements.txt file for third-party dependencies

Build docs developers (and LLMs) love