Flink provides a command-line interface (CLI) at bin/flink for submitting packaged jobs, monitoring executions, and managing job lifecycle. The CLI connects to the JobManager specified in your conf/config.yaml.
Submitting a job
Use flink run to upload a JAR and start a job:
./bin/flink run \
--detached \
./examples/streaming/StateMachineExample.jar
The --detached flag returns immediately after submission. The output includes the assigned job ID:
Job has been submitted with JobID cca7bc1061d61cf15238e92312c2fc20
Store the job ID in a variable for subsequent commands:
export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"
Passing configuration at submission
Pass any Flink configuration option with -D:
./bin/flink run \
-Dparallelism.default=8 \
-Dtaskmanager.memory.process.size=4096m \
-Dpipeline.max-parallelism=128 \
./my-job.jar
On session clusters, only execution configuration parameters (those under the execution prefix) take effect at submission time. Cluster-level options like memory sizes are fixed at cluster startup.
Monitoring jobs
List all running and scheduled jobs:
------------------ Running/Restarting Jobs -------------------
30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)
--------------------------------------------------------------
Creating savepoints
A savepoint captures the full state of a running job so you can stop and resume it later:
./bin/flink savepoint \
$JOB_ID \
/tmp/flink-savepoints
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
For large state, use --detached to avoid a client timeout:
./bin/flink savepoint \
$JOB_ID \
/tmp/flink-savepoints \
-detached
Disposing a savepoint
Remove savepoint data and clean up metadata:
./bin/flink savepoint \
--dispose \
/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
$JOB_ID
If your job uses custom state classes, pass the JAR to avoid ClassNotFoundException:
./bin/flink savepoint \
--dispose /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
--jarfile my-job.jar
Creating checkpoints manually
You can trigger a checkpoint on demand:
./bin/flink checkpoint $JOB_ID
To force a full checkpoint even if the job uses incremental checkpointing:
./bin/flink checkpoint $JOB_ID --full
Stopping a job gracefully
The stop command sends a final checkpoint barrier from sources to sinks, creates a savepoint, then calls cancel() on each source. This is the recommended way to stop a streaming job you intend to resume:
./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
$JOB_ID
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
Use --drain to emit MAX_WATERMARK before stopping, flushing all event-time timers and windows:
./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
--drain \
$JOB_ID
Only use --drain if you intend to terminate the job permanently. Draining modifies event-time processing state and can produce incorrect results if the job is resumed afterward.
Cancelling a job
Cancel a job immediately without creating a savepoint:
./bin/flink cancel $JOB_ID
The job transitions from RUNNING to CANCELLED and all computation stops.
Resuming from a savepoint
Restart a job from a previously created savepoint:
./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
A new job ID is assigned. If your updated job removed an operator that existed when the savepoint was taken, use --allowNonRestoredState:
./bin/flink run \
--fromSavepoint <savepointPath> \
--allowNonRestoredState \
./my-updated-job.jar
Selecting a deployment target
Use --target to specify where and how to submit the job:
| Target | Description |
|---|
remote | Submit to an already-running standalone cluster |
local | Run in a local MiniCluster (development only) |
yarn-session | Submit to a running Flink on YARN session |
yarn-application | Start a new Flink cluster on YARN in Application mode |
kubernetes-session | Submit to a running Flink on Kubernetes session |
kubernetes-application | Start a new Flink cluster on Kubernetes in Application mode |
# YARN Application mode
./bin/flink run --target yarn-application \
./examples/streaming/TopSpeedWindowing.jar
# Kubernetes session (targeting an existing session cluster)
./bin/flink run --target kubernetes-session \
-Dkubernetes.cluster-id=my-session-cluster \
./examples/streaming/TopSpeedWindowing.jar
CLI actions reference
| Action | Purpose |
|---|
run | Submit and optionally execute a job |
run-application | Submit a job in Application mode |
info | Print the optimized execution graph without running the job |
list | List running and scheduled jobs |
savepoint | Create or dispose a savepoint |
checkpoint | Trigger a manual checkpoint |
cancel | Cancel a running job immediately |
stop | Gracefully stop a job with a final savepoint |
For a complete list of options for any action:
./bin/flink <action> --help
Submitting PyFlink jobs
PyFlink jobs are submitted without a JAR file path. Python 3.9 or higher is required:
# Run a Python script directly
./bin/flink run --python examples/python/table/word_count.py
# Include additional Python files on PYTHONPATH
./bin/flink run \
--python examples/python/table/word_count.py \
--pyFiles file:///utils.py,hdfs:///shared/helpers.py
# Run using a module name (useful in Application mode)
./bin/flink run \
--pyModule word_count \
--pyFiles examples/python/table
# Submit to YARN Application mode with a virtual environment
./bin/flink run -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.ship-files=/path/to/shipfiles \
-pyarch shipfiles/venv.zip \
-pyclientexec venv.zip/venv/bin/python3 \
-pyexec venv.zip/venv/bin/python3 \
-pyfs shipfiles \
-pym word_count
Python CLI options
| Option | Description |
|---|
-py, --python | Python script containing the job entry point |
-pym, --pyModule | Python module name containing the entry point (requires --pyFiles) |
-pyfs, --pyFiles | Additional files or directories to add to PYTHONPATH |
-pyarch, --pyArchives | Archive files (e.g., virtual environments) distributed to workers |
-pyexec, --pyExecutable | Path to the Python interpreter used on workers |
-pyclientexec, --pyClientExecutable | Path to the Python interpreter used on the client |
-pyreq, --pyRequirements | requirements.txt file for third-party dependencies |