Skip to main content
The Flyte agent framework lets you create new task backends without modifying the core FlytePropeller engine. You implement an agent as a Python class, test it independently, and deploy it as a separate service.
If you build a useful agent, consider contributing it back to the Flyte community. Follow the Flytekit contribution guide to add your agent to Flytekit plugins and create an example for the Integrations docs.

Agent types

Choose the agent type based on whether the external service has an async API:
TypeBase classMethods to implementUse when
AsyncAsyncAgentBasecreate, get, deleteThe external service has non-blocking job submission and status polling APIs
SyncSyncAgentBasedoThe external service returns a result immediately in a single blocking call

Developing an async agent

Async agents handle long-running jobs. FlytePropeller calls create to submit the job, then periodically calls get to check its status, and calls delete if the user aborts the workflow.

Required methods

  • create: Submit a new job. Return a ResourceMeta object containing the identifiers needed to check status (e.g., a job ID).
  • get: Check the status of a running job. Return a Resource with the current phase and, if finished, the output literals.
  • delete: Cancel a running job. This method is called when a user interrupts a task. Must be idempotent.

ResourceMeta

ResourceMeta is a dataclass that holds job identifiers. Define one for each agent:
from dataclasses import dataclass
from flytekit.extend.backend.base_agent import ResourceMeta

@dataclass
class BigQueryMetadata(ResourceMeta):
    """
    Holds the job ID returned by the BigQuery API after job creation.
    """
    job_id: str

Full async agent example

The following is based on the BigQuery agent implementation:
from typing import Optional
from dataclasses import dataclass
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import (
    AsyncAgentBase,
    AgentRegistry,
    Resource,
    ResourceMeta,
)


@dataclass
class BigQueryMetadata(ResourceMeta):
    """Metadata holding the BigQuery job ID."""
    job_id: str


class BigQueryAgent(AsyncAgentBase):
    def __init__(self):
        super().__init__(
            task_type_name="bigquery",
            metadata_type=BigQueryMetadata,
        )

    def create(
        self,
        task_template: TaskTemplate,
        inputs: Optional[LiteralMap] = None,
        **kwargs,
    ) -> BigQueryMetadata:
        job_id = submit_bigquery_job(inputs)
        return BigQueryMetadata(job_id=job_id)

    def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
        phase, outputs = get_job_status(resource_meta.job_id)
        return Resource(phase=phase, outputs=outputs)

    def delete(self, resource_meta: BigQueryMetadata, **kwargs):
        cancel_bigquery_job(resource_meta.job_id)


# Register the agent so FlytePropeller can discover it
AgentRegistry.register(BigQueryAgent())
When users call create and obtain a job ID, they can call get with that job ID to check whether the execution has succeeded. If a user interrupts a task while it is running, FlytePropeller calls delete with the corresponding ResourceMeta.

Developing a sync agent

Sync agents handle request/response services. FlytePropeller blocks on the do method until it returns.

Required methods

  • do: Execute the synchronous task and return a Resource with the result. The worker in Flyte blocks until this method returns.

Full sync agent example

from typing import Optional
from flytekit import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import SyncAgentBase, AgentRegistry, Resource


class OpenAIAgent(SyncAgentBase):
    def __init__(self):
        super().__init__(task_type_name="openai")

    def do(
        self,
        task_template: TaskTemplate,
        inputs: Optional[LiteralMap],
        **kwargs,
    ) -> Resource:
        # Convert the LiteralMap to Python values
        ctx = FlyteContextManager.current_context()
        python_inputs = TypeEngine.literal_map_to_kwargs(
            ctx, inputs, literal_types=task_template.interface.inputs
        )
        response = ask_chatgpt_question(python_inputs)
        return Resource(
            phase=TaskExecution.SUCCEEDED,
            outputs={"o0": response},
        )


AgentRegistry.register(OpenAIAgent())

Developing a sensor

With the agent framework you can build a custom sensor that monitors an external condition and blocks until it is met. Extend BaseSensor and implement the poke method:
from flytekit.sensor.base_sensor import BaseSensor
import s3fs

class FileSensor(BaseSensor):
    def __init__(self):
        super().__init__(task_type="file_sensor")

    def poke(self, path: str) -> bool:
        """Return True when the file exists, False otherwise."""
        fs = s3fs.S3FileSystem()
        return fs.exists(path)
Flyte calls poke on a schedule until it returns True, then proceeds with the rest of the workflow.

Testing your agent

You can test agents without running a full Flyte cluster by adding AsyncAgentExecutorMixin or SyncAgentExecutorMixin to your task class:
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin
from flytekitplugins.bigquery import BigQueryTask, BigQueryConfig
from flytekit.types.structured import StructuredDataset
from flytekit import kwtypes

class BigQueryTask(AsyncAgentExecutorMixin, BigQueryTask):
    pass

bigquery_doge_coin = BigQueryTask(
    name="bigquery.doge_coin",
    inputs=kwtypes(version=int),
    query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
    output_structured_dataset_type=StructuredDataset,
    task_config=BigQueryConfig(ProjectID="flyte-test-340607"),
)
Then run it locally:
pyflyte run bigquery_task.py bigquery_doge_coin --version 10
When testing locally, you may need to configure credentials in your environment. For example, BigQuery tasks require the GOOGLE_APPLICATION_CREDENTIALS environment variable to be set.

Building the agent Docker image

After implementing and testing your agent, build a Docker image that includes the agent plugin package. The image runs pyflyte serve agent to start the gRPC server:
FROM python:3.10-slim-bookworm

MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

# Dependencies for running in Kubernetes
RUN pip install prometheus-client grpcio-health-checking

# flytekit autoloads the agent when the plugin package is installed
RUN pip install flytekitplugins-bigquery

CMD pyflyte serve agent --port 8000
For flytekit <=v1.10.2, use pyflyte serve. For flytekit >v1.10.2, use pyflyte serve agent.

Canary deployments

Agents can be deployed independently in separate environments. This decouples agent development from the production cluster: a bug in a custom agent won’t affect other agents. To route specific task types to a non-default agent service, update the FlytePropeller configmap:
plugins:
  agent-service:
    # All task types not explicitly listed below go to the default agent
    defaultAgent:
      endpoint: "k8s://flyteagent.flyte:8000"
      insecure: true
      timeouts:
        CreateTask: 5s
        GetTask: 5s
        DeleteTask: 5s
        ExecuteTaskSync: 10s
      defaultTimeout: 10s
    agents:
      custom_agent:
        endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000"
        insecure: false
        defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
        timeouts:
          GetTask: 5s
        defaultTimeout: 10s
    agentForTaskTypes:
      # Route custom_task requests to custom_agent instead of the default
      - custom_task: custom_agent
Deploy a canary agent service with your new agent before routing production traffic to it. This lets you validate behavior in isolation before any user workflows depend on it.

Build docs developers (and LLMs) love