The Flyte agent framework lets you create new task backends without modifying the core FlytePropeller engine. You implement an agent as a Python class, test it independently, and deploy it as a separate service.
Agent types
Choose the agent type based on whether the external service has an async API:
| Type | Base class | Methods to implement | Use when |
|---|
| Async | AsyncAgentBase | create, get, delete | The external service has non-blocking job submission and status polling APIs |
| Sync | SyncAgentBase | do | The external service returns a result immediately in a single blocking call |
Developing an async agent
Async agents handle long-running jobs. FlytePropeller calls create to submit the job, then periodically calls get to check its status, and calls delete if the user aborts the workflow.
Required methods
create: Submit a new job. Return a ResourceMeta object containing the identifiers needed to check status (e.g., a job ID).
get: Check the status of a running job. Return a Resource with the current phase and, if finished, the output literals.
delete: Cancel a running job. This method is called when a user interrupts a task. Must be idempotent.
ResourceMeta is a dataclass that holds job identifiers. Define one for each agent:
from dataclasses import dataclass
from flytekit.extend.backend.base_agent import ResourceMeta
@dataclass
class BigQueryMetadata(ResourceMeta):
"""
Holds the job ID returned by the BigQuery API after job creation.
"""
job_id: str
Full async agent example
The following is based on the BigQuery agent implementation:
from typing import Optional
from dataclasses import dataclass
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import (
AsyncAgentBase,
AgentRegistry,
Resource,
ResourceMeta,
)
@dataclass
class BigQueryMetadata(ResourceMeta):
"""Metadata holding the BigQuery job ID."""
job_id: str
class BigQueryAgent(AsyncAgentBase):
def __init__(self):
super().__init__(
task_type_name="bigquery",
metadata_type=BigQueryMetadata,
)
def create(
self,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
**kwargs,
) -> BigQueryMetadata:
job_id = submit_bigquery_job(inputs)
return BigQueryMetadata(job_id=job_id)
def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
phase, outputs = get_job_status(resource_meta.job_id)
return Resource(phase=phase, outputs=outputs)
def delete(self, resource_meta: BigQueryMetadata, **kwargs):
cancel_bigquery_job(resource_meta.job_id)
# Register the agent so FlytePropeller can discover it
AgentRegistry.register(BigQueryAgent())
When users call create and obtain a job ID, they can call get with that job ID to check whether the execution has succeeded. If a user interrupts a task while it is running, FlytePropeller calls delete with the corresponding ResourceMeta.
Developing a sync agent
Sync agents handle request/response services. FlytePropeller blocks on the do method until it returns.
Required methods
do: Execute the synchronous task and return a Resource with the result. The worker in Flyte blocks until this method returns.
Full sync agent example
from typing import Optional
from flytekit import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import SyncAgentBase, AgentRegistry, Resource
class OpenAIAgent(SyncAgentBase):
def __init__(self):
super().__init__(task_type_name="openai")
def do(
self,
task_template: TaskTemplate,
inputs: Optional[LiteralMap],
**kwargs,
) -> Resource:
# Convert the LiteralMap to Python values
ctx = FlyteContextManager.current_context()
python_inputs = TypeEngine.literal_map_to_kwargs(
ctx, inputs, literal_types=task_template.interface.inputs
)
response = ask_chatgpt_question(python_inputs)
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"o0": response},
)
AgentRegistry.register(OpenAIAgent())
Developing a sensor
With the agent framework you can build a custom sensor that monitors an external condition and blocks until it is met. Extend BaseSensor and implement the poke method:
from flytekit.sensor.base_sensor import BaseSensor
import s3fs
class FileSensor(BaseSensor):
def __init__(self):
super().__init__(task_type="file_sensor")
def poke(self, path: str) -> bool:
"""Return True when the file exists, False otherwise."""
fs = s3fs.S3FileSystem()
return fs.exists(path)
Flyte calls poke on a schedule until it returns True, then proceeds with the rest of the workflow.
Testing your agent
You can test agents without running a full Flyte cluster by adding AsyncAgentExecutorMixin or SyncAgentExecutorMixin to your task class:
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin
from flytekitplugins.bigquery import BigQueryTask, BigQueryConfig
from flytekit.types.structured import StructuredDataset
from flytekit import kwtypes
class BigQueryTask(AsyncAgentExecutorMixin, BigQueryTask):
pass
bigquery_doge_coin = BigQueryTask(
name="bigquery.doge_coin",
inputs=kwtypes(version=int),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
output_structured_dataset_type=StructuredDataset,
task_config=BigQueryConfig(ProjectID="flyte-test-340607"),
)
Then run it locally:
pyflyte run bigquery_task.py bigquery_doge_coin --version 10
When testing locally, you may need to configure credentials in your environment. For example, BigQuery tasks require the GOOGLE_APPLICATION_CREDENTIALS environment variable to be set.
Building the agent Docker image
After implementing and testing your agent, build a Docker image that includes the agent plugin package. The image runs pyflyte serve agent to start the gRPC server:
FROM python:3.10-slim-bookworm
MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit
# Dependencies for running in Kubernetes
RUN pip install prometheus-client grpcio-health-checking
# flytekit autoloads the agent when the plugin package is installed
RUN pip install flytekitplugins-bigquery
CMD pyflyte serve agent --port 8000
For flytekit <=v1.10.2, use pyflyte serve. For flytekit >v1.10.2, use pyflyte serve agent.
Canary deployments
Agents can be deployed independently in separate environments. This decouples agent development from the production cluster: a bug in a custom agent won’t affect other agents.
To route specific task types to a non-default agent service, update the FlytePropeller configmap:
plugins:
agent-service:
# All task types not explicitly listed below go to the default agent
defaultAgent:
endpoint: "k8s://flyteagent.flyte:8000"
insecure: true
timeouts:
CreateTask: 5s
GetTask: 5s
DeleteTask: 5s
ExecuteTaskSync: 10s
defaultTimeout: 10s
agents:
custom_agent:
endpoint: "dns:///custom-flyteagent.flyte.svc.cluster.local:8000"
insecure: false
defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
timeouts:
GetTask: 5s
defaultTimeout: 10s
agentForTaskTypes:
# Route custom_task requests to custom_agent instead of the default
- custom_task: custom_agent
Deploy a canary agent service with your new agent before routing production traffic to it. This lets you validate behavior in isolation before any user workflows depend on it.