Skip to main content

govern()

The govern() function is the universal one-liner entry point. Pass any supported framework object and it returns a compliance-wrapped version. It never raises — if no config is found or the framework is unrecognised, the original object is returned unchanged with a warning.
from drako import govern

crew  = govern(crew)        # CrewAI
graph = govern(graph)       # LangGraph
chat  = govern(group_chat)  # AutoGen

Signature

govern(obj, *, config_path: str | None = None, framework: str | None = None)

Parameters

obj
Any
required
A CrewAI Crew, LangGraph compiled graph, or AutoGen GroupChat instance.
config_path
str | None
Explicit path to a .drako.yaml file. When omitted, govern() walks up from the current working directory looking for the file.
framework
str | None
Override automatic framework detection. Accepted values: "crewai", "langgraph", "autogen".

Returns

The compliance-wrapped object. For CrewAI this is a CrewAIComplianceMiddleware that proxies all attributes to the underlying crew. For LangGraph it is a _LangGraphProxy. For AutoGen it is a _AutoGenGroupChatProxy. In all cases the public interface of the original object is preserved. If no .drako.yaml is found and DRAKO_API_KEY is not set in the environment, the original obj is returned unchanged.

Framework auto-detection

govern() inspects the object to determine the framework:
ConditionDetected framework
hasattr(obj, "kickoff")crewai
hasattr(obj, "invoke") or hasattr(obj, "stream")langgraph
hasattr(obj, "groupchat")autogen
If none of these conditions match and framework is not supplied, the object is returned ungoverned.
govern() is safe to call multiple times on the same object. Each tool’s _run method carries a _drako_wrapped flag that prevents double-wrapping.

Examples

from crewai import Crew, Agent, Task
from drako import govern

crew = Crew(agents=[...], tasks=[...])
crew = govern(crew)

result = crew.kickoff()

DrakoClient

DrakoClient is the async-first HTTP client that communicates with the Drako backend. All middleware wrappers create one internally, but you can instantiate it directly for programmatic access.

Constructor

from drako import DrakoClient

client = DrakoClient(
    api_key="am_live_...",
    endpoint="https://api.getdrako.com",  # optional
    tenant_id="acme",                     # optional
)
api_key
str
required
Your Drako API key. Keys follow the format am_live_<tenant>_<secret> or am_test_<tenant>_<secret>. The tenant ID is extracted automatically from the key prefix when tenant_id is not provided.
endpoint
str
default:"https://api.getdrako.com"
Base URL of the Drako API.
tenant_id
str | None
Tenant identifier. Derived from the API key prefix when omitted.

Class methods

Instantiate from a .drako.yaml file.
client = DrakoClient.from_config(".drako.yaml")
config_path
str
default:".drako.yaml"
Path to the config file.
Instantiate from environment variables. Reads DRAKO_API_KEY, DRAKO_ENDPOINT, and DRAKO_TENANT_ID.
client = DrakoClient.from_env()
Raises AuthenticationError if DRAKO_API_KEY is not set.

Key methods

All async methods have _sync counterparts for use in synchronous code.
Register or verify an agent and return its decentralised identifier (DID) and trust score.
# async
result = await client.verify_agent_identity(
    agent_name="researcher",
    agent_role="analyst",
    metadata={"team": "finance"},
)

# sync
result = client.verify_agent_identity_sync(
    agent_name="researcher",
    agent_role="analyst",
)
agent_name
str
required
Display name of the agent.
agent_role
str
required
Role or job description of the agent.
metadata
dict | None
Optional key/value metadata attached to the registration.
Returns a dict with at minimum did (string) and trust_score (float).
Evaluate whether an action is permitted by the tenant’s policies.
result = await client.evaluate_policy(
    action="tool:web_search",
    agent_did="did:mesh:ag_abc",
    context={"tool_name": "web_search", "scope": "default"},
)
action
str
required
Human-readable action label, e.g. "tool:web_search".
agent_did
str
required
DID returned by verify_agent_identity.
context
dict | None
Optional context dict. Recognised keys: task_id, tool_name, scope, payload_preview.
Returns a dict containing decision ("allowed", "rejected", "PENDING_APPROVAL", or "escalated").
Append an entry to the tamper-proof audit hash chain.
await client.audit_log(
    action="tool:file_write",
    agent_did="did:mesh:ag_abc",
    result={"bytes_written": 1024},
    metadata={"task_id": "t_001"},
)
action
str
required
Action label for the log entry.
agent_did
str
required
DID of the acting agent.
result
dict | None
Output data to store with the entry.
metadata
dict | None
Additional metadata.
Verify the integrity of the audit hash chain.
status = await client.verify_chain(last_n=100)
last_n
int | None
Verify only the last n entries. Verifies the entire chain when omitted.
Return the tenant’s current usage quota and subscription status.
quota = await client.check_quota()
Validate the API key and return tenant info.
info = await client.validate_key()

Context manager (async)

DrakoClient implements the async context manager protocol and closes underlying HTTP connections on exit.
async with DrakoClient.from_env() as client:
    result = await client.validate_key()

HTTP behaviour

  • Timeout: 30 seconds per request.
  • Retries: up to 3 attempts with exponential back-off (1 s, 2 s, 4 s) on ConnectError, ReadTimeout, and WriteTimeout.
  • Separate connection pools for async (httpx.AsyncClient) and sync (httpx.Client) code paths.

DrakoConfig

DrakoConfig is the Pydantic model that represents a .drako.yaml file. Load it with DrakoConfig.load().

Loading

from drako import DrakoConfig

config = DrakoConfig.load(".drako.yaml")
api_key = config.resolve_api_key()
DrakoConfig.load() supports template inheritance via the extends field (e.g. extends: fintech).

Key fields

version
str
default:"1.0"
Config schema version.
tenant_id
str
required
Your Drako tenant identifier.
api_key_env
str
default:"DRAKO_API_KEY"
Name of the environment variable that holds the API key.
api_key
str | None
API key stored directly in the YAML file. The environment variable set by api_key_env takes precedence.
endpoint
str
default:"https://api.getdrako.com"
Drako API endpoint URL.
governance_level
str
default:"custom"
Preset governance profile: autopilot, balanced, strict, or custom.
framework
str
default:"generic"
Framework hint stored in the config file.
governance.on_backend_unreachable
str
default:"allow"
Action when the Drako backend cannot be reached: "allow" (fail-open) or "block" (fail-closed).
hitl.mode
str
default:"off"
Human-in-the-loop mode: "off", "audit", or "enforce". Requires Pro plan.
hitl.approval_timeout_minutes
int
default:"30"
Minutes to wait for a HITL approval decision before applying timeout_action.
hitl.timeout_action
str
default:"reject"
Action taken when the HITL approval window expires: "reject" or "allow".
dlp.mode
str
default:"audit"
Data loss prevention mode: "audit", "enforce", or "off".
audit.enabled
bool
default:"true"
Whether to write audit log entries.

Exceptions

All exceptions live in drako.exceptions and are re-exported from drako.
from drako import (
    DrakoAPIError,
    AuthenticationError,
    PolicyViolationError,
    QuotaExceededError,
)

DrakoAPIError

Base class for all errors returned by the Drako API.
class DrakoAPIError(DrakoError):
    def __init__(self, status_code: int, detail: str, request_id: str | None = None)
status_code
int
HTTP status code from the API response.
detail
str
Human-readable error message.
request_id
str | None
Value of the X-Request-ID response header, useful for support.

AuthenticationError

Raised on HTTP 401. Indicates an invalid or expired API key.
class AuthenticationError(DrakoAPIError):
    # status_code is always 401
    def __init__(self, detail: str = "Authentication failed", request_id: str | None = None)

PolicyViolationError

Raised on HTTP 403. The requested action was blocked by a governance policy.
class PolicyViolationError(DrakoAPIError):
    # status_code is always 403
    def __init__(
        self,
        detail: str = "Action blocked by policy",
        policy_id: str | None = None,
        request_id: str | None = None,
    )
policy_id
str | None
Identifier of the policy that blocked the action, when available.

QuotaExceededError

Raised on HTTP 429. The tenant has exceeded its usage quota.
class QuotaExceededError(DrakoAPIError):
    # status_code is always 429
    def __init__(self, detail: str = "Quota exceeded", request_id: str | None = None)

Handling exceptions

from drako import (
    DrakoClient,
    PolicyViolationError,
    QuotaExceededError,
    AuthenticationError,
    DrakoAPIError,
)

try:
    result = await client.evaluate_policy(
        action="tool:delete_records",
        agent_did=did,
    )
except PolicyViolationError as exc:
    print(f"Blocked by policy {exc.policy_id}: {exc.detail}")
except QuotaExceededError:
    print("Monthly quota reached — upgrade your plan.")
except AuthenticationError:
    print("Invalid API key.")
except DrakoAPIError as exc:
    print(f"API error {exc.status_code}: {exc.detail} (request_id={exc.request_id})")
govern() catches all exceptions internally and returns the original object ungoverned rather than propagating them. If you need structured error handling, use DrakoClient directly.

Build docs developers (and LLMs) love