Documentation index for AI agents: see /llms.txt. Markdown versions of every page are available at <path>.md or via Accept: text/markdown.
Guides

Approvals

The Flow AI runtime can pause mid-stream to ask the host application to approve a proposed plan or a pending tool call. Python supplies the policy and the response; the actual...

The Flow AI runtime can pause mid-stream to ask the host application to approve a proposed plan or a pending tool call. Python supplies the policy and the response; the actual gate lives in the Rust runtime.

When define_coordinator(..., approval={"plans": "always", "tools": "never"}) is set, that policy becomes the runtime approval floor unless define_runtime(..., approval_policies=...) supplies one explicitly. Individual agents can override the floor with approval={...}, and individual tools under an agent can override it with tool_approvals={...} or define_tool(..., approval=...). If the resolved policy requires approval, the runtime emits an approval-required event, suspends the stream, and waits for runtime.respond_to_approval(...) before continuing.

Policy values

Runtime and agent approval policies accept the same shape for both channels:

ValueEffect
"never"Calls run without a gate.
"always"Every call emits approval-required and waits.
"default"Per-agent approval={...} only. Leave this channel unchanged from the runtime floor.
{"kind": "dynamic", "value": "<predicate-id>"}Tool-only. The runtime calls a registered Python predicate per call.

Plan approval supports "never" and "always". Tool approval also supports dynamic predicates, registered through define_tool(..., approval=fn) or by passing the approval_predicates mapping to create_runtime(...).

"default" is a per-agent sentinel, not a runtime-level value. In define_coordinator(..., approval={...}) (and the other define_* agent helpers) a "default" channel is stripped before the spec is built, so the runtime floor applies unchanged. Passing it to define_runtime(approval_policies={"plans": "default"}) raises a validation error: at the runtime level an approval rule must be "never", "always", or a dynamic rule.

coordinator = define_coordinator(
    name="scenario_coordinator",
    model="claude-sonnet-4-6",
    routes=["scenario_planner", "scenario_executor"],
    approval={"plans": "always", "tools": "never"},
    prompt=coordinator_prompt,
)

executor = define_executor(
    name="scenario_executor",
    model="claude-sonnet-4-6",
    plan=scenario_plan,
    approval={"plans": "never", "tools": "never"},
    tool_approvals={"execute_query": "always"},
    prompt=executor_prompt,
)

Precedence is runtime floor, then agent override, then tool override. Tool overrides are scoped by agent, so the same tool name can be approval-gated for one specialist and ungated for another.

The approval-required event

The stream emits a wire-shape dict. The data field carries the Rust ApprovalRequest payload in camelCase:

{
    "type": "approval-required",
    "data": {
        "id": "apr-1234",
        "kind": "plan",                # or "tool"
        "target": "demo-plan-1",       # plan id, or the tool name
        "payload": {...},              # plan body, or tool args
        "glimpse": {...},              # optional preview, may be absent
        "resourceId": "acme",
        "threadId": "thread-1",
        "correlationId": "..."         # optional, ties retries to the originating call
    }
}

For tool approvals, target is the tool name and correlationId is the LLM's tool_use_id. For plan approvals, target and correlationId are both the plan id.

Responding to an approval

runtime.respond_to_approval(approval_id, outcome, feedback=None, partial=None) is an async method:

await runtime.respond_to_approval(
    event["data"]["id"],
    "approve",
    feedback="approved by smoke test",
)

Outcomes:

  • "approve" — runs the gated action.
  • "reject" — drops the action. Accept an optional feedback string.
  • "revise" — plan only. The runtime re-invokes the planner with partial, which is forwarded as the partial payload the executor's executePlan tool returns. Tool approvals collapse revise to reject.

After the response lands, the runtime emits approval-decision and then the gated event sequence — for a plan, the executor's tool calls; for a tool, the tool-invocation result event.

End-to-end example

The script below is a complete, runnable version of the same coordinator → planner → executor flow the runtime's integration tests exercise. It uses the scripted interpreter (interpreter="scripted"), so each call_agent prompt is replayed deterministically instead of being sent to a live model: the planner prompt is the storePlan tool call as JSON, and the executor prompt is the matching executePlan call. The coordinator gates the plan; the host approves it; only then does the executor's action dispatch run.

import asyncio
import json

from flowai_harness import (
    create_runtime,
    define_coordinator,
    define_executor,
    define_tenant,
    define_plan,
    define_planner,
    define_runtime,
)

scenario_plan = define_plan(
    "DemoPlan",
    {
        "type": "object",
        "required": ["rationale", "actions"],
        "properties": {
            "rationale": {"type": "string"},
            "actions": {
                "type": "array",
                "minItems": 1,
                "items": {
                    "type": "object",
                    "required": ["kind"],
                    "properties": {
                        "kind": {"type": "string"},
                        "message": {"type": "string"},
                    },
                },
            },
        },
    },
)

coordinator = define_coordinator(
    "coordinator",
    model="claude-sonnet-4-6",
    prompt="Coordinate by delegating to the planner and executor.",
    routes=["planner", "executor"],
    approval={"plans": "always", "tools": "never"},
)
planner = define_planner(
    "planner",
    model="claude-sonnet-4-6",
    prompt="Store exactly one typed plan.",
    plan=scenario_plan,
)
executor = define_executor(
    "executor",
    model="claude-sonnet-4-6",
    prompt="Execute the requested plan.",
    plan=scenario_plan,
)

def dispatch_actions(actions, ctx):
    print(f"dispatching {len(actions)} approved action(s)")
    return {
        "entitiesAffected": len(actions),
        "summary": f"executed {len(actions)} action(s)",
        "details": None,
    }

runtime = create_runtime(
    define_runtime(
        tenant=define_tenant("acme", "v1"),
        agents=[coordinator, planner, executor],
        providers={"anthropic": {"apiKey": "unused"}},
    ),
    action_dispatcher=dispatch_actions,
    interpreter="scripted",
)

# The scripted interpreter replays these prompts instead of calling a model.
# The planner prompt is the storePlan tool call; the executor prompt is the
# executePlan call for the same plan id.
planner_prompt = json.dumps({
    "tool": "storePlan",
    "args": {
        "specName": "DemoPlan",
        "planId": "demo-plan-1",
        "body": {
            "rationale": "deterministic demo plan",
            "actions": [
                {"kind": "record_counter", "message": "record the approved action"},
            ],
        },
    },
})
executor_prompt = json.dumps({
    "tool": "executePlan",
    "args": {"planId": "demo-plan-1"},
})

async def main():
    stream = runtime.query(
        json.dumps({"script": [
            {"tool": "call_agent", "args": {"agent": "planner", "prompt": planner_prompt}},
            {"tool": "call_agent", "args": {"agent": "executor", "prompt": executor_prompt}},
        ]}),
        thread_id="thread-1",
    )
    async for event in stream:
        if event["type"] == "approval-required":
            data = event["data"]
            print(f"approval-required: kind={data['kind']} target={data['target']}")
            await runtime.respond_to_approval(
                data["id"],
                "approve",
                feedback="approved by host",
            )
        elif event["type"] == "approval-decision":
            print(f"approval-decision: {event['data']['outcome']}")

asyncio.run(main())

Running the script prints:

approval-required: kind=plan target=demo-plan-1
dispatching 1 approved action(s)
approval-decision: {'outcome': 'approve'}

The gate fires when the executor calls executePlan: the stream suspends on approval-required, dispatch_actions runs only after the "approve" response lands, and the approval-decision event then records the outcome before the executor's result events complete the stream.

Note

The coordinator must not dispatch any side effect before the gate resolves. The runtime guarantees this — action_dispatcher is only invoked after an "approve" response lands. Use the interpreter="scripted" mode in tests to drive the exact sequence above without a live model.

Action dispatcher contract

action_dispatcher is the host adapter that applies approved plan actions to your platform. It receives the full action list and a runtime-built context:

def dispatch_actions(actions, ctx):
    ...

actions is a JSON-serializable list of normalized harness actions. Flat plan actions are converted to the canonical shape before dispatch:

[
    {
        "kind": "price_change",
        "payload": {"product_id": "sku-1", "new_price": 12.5},
        "references": [],
    }
]

ctx["resolved_refs"] contains any action references hydrated by the runtime, grouped by reference kind and id:

{
    "ProductSet": {
        "ref-123": {"product_ids": ["sku-1", "sku-2"]}
    }
}

The dispatcher may return None, which is treated as an empty execution result. Otherwise it must return an object matching the execution-result envelope:

return {
    "entitiesAffected": len(actions),  # required non-negative integer
    "summary": "Updated 3 products",   # optional string or None
    "details": {                       # optional JSON value
        "platformJobId": "job-123",
    },
}

Only entitiesAffected, summary, and details are accepted at the top level. Put domain-specific metadata under details. Invalid dispatcher returns fail executePlan, and the plan is marked failed rather than silently discarding malformed data.

Using platform API tools and an action dispatcher together

Most production agents use custom tools and an action dispatcher against the same external platform API. The split is:

  • Custom tools let the planner inspect, search, simulate, or preview platform state while it is building a plan.
  • The action dispatcher performs the approved writes by translating plan actions into platform API calls.

For example, a commerce platform might expose a generic action API:

class MockCommerceApi:
    def preview_price_change(self, product_id, new_price):
        return {"productId": product_id, "newPrice": new_price, "marginOk": True}

    def preview_availability_change(self, product_id, available):
        return {"productId": product_id, "available": available, "safeToApply": True}

    def create_action(self, *, action_type, payload):
        return {
            "id": f"action-{action_type.lower()}-{payload['productId']}",
            "type": action_type,
            "payload": payload,
        }

The plan still uses typed action variants. The action kind is the stable harness-side discriminator; the dispatcher maps it to the platform's API type.

from typing import Literal

from pydantic import BaseModel

from flowai_harness import TaggedUnion, define_plan, define_tool

class PriceChange(BaseModel):
    kind: Literal["price_change"]
    product_id: str
    new_price: float

class AvailabilityChange(BaseModel):
    kind: Literal["availability_change"]
    product_id: str
    available: bool

CommercialAction = TaggedUnion(PriceChange, AvailabilityChange)

class CommercialPlan(BaseModel):
    rationale: str
    actions: list[CommercialAction]

commercial_plan = define_plan("CommercialPlan", CommercialPlan)

Tools can use the platform client for non-final operations. Pass the service to create_runtime(..., services=...), then read it from the tool context:

api = MockCommerceApi()

@define_tool(
    "preview_price_change",
    {"product_id": str, "new_price": float},
    approval="never",
)
async def preview_price_change(args, ctx):
    return ctx.platform.preview_price_change(
        args["product_id"],
        args["new_price"],
    )

@define_tool(
    "preview_availability_change",
    {"product_id": str, "available": bool},
    approval="never",
)
async def preview_availability_change(args, ctx):
    return ctx.platform.preview_availability_change(
        args["product_id"],
        args["available"],
    )

The dispatcher uses the same platform client, but it is not model-callable. It runs only after the plan approval gate resolves to "approve". Unlike tool handlers, the dispatcher does not receive services through ctx; create it in the same runtime factory and close over the API client:

def dispatch_actions(actions, ctx):
    created_actions = []

    for action in actions:
        payload = action["payload"]

        if action["kind"] == "price_change":
            created_actions.append(
                api.create_action(
                    action_type="PRICE_CHANGE",
                    payload={
                        "productId": payload["product_id"],
                        "newPrice": payload["new_price"],
                    },
                )
            )
        elif action["kind"] == "availability_change":
            created_actions.append(
                api.create_action(
                    action_type="AVAILABILITY_CHANGE",
                    payload={
                        "productId": payload["product_id"],
                        "available": payload["available"],
                    },
                )
            )
        else:
            raise ValueError(f"unsupported action kind: {action['kind']}")

    return {
        "entitiesAffected": len(created_actions),
        "summary": f"Created {len(created_actions)} platform action(s)",
        "details": {
            "createdActions": created_actions,
        },
    }

Then wire both pieces into the runtime:

runtime = create_runtime(
    define_runtime(
        tenant=define_tenant("acme", "v1"),
        agents=[coordinator, planner, executor],
        providers={"anthropic": {"apiKey": "unused"}},
    ),
    services={"platform": api},
    action_dispatcher=dispatch_actions,
)

In this setup, the model can call preview tools while planning, but it cannot directly call api.create_action(...). It can only store a typed plan and ask the executor to run executePlan. The runtime validates the plan, asks for approval when required, hydrates references, and only then invokes dispatch_actions.

Dependency flow: tools vs dispatchers

Tool handlers and action dispatchers both receive a ctx argument, but it is not the same context.

Tool handler context is host-service context. If you pass a service through create_runtime(..., services={"platform": api}), custom Python tools can read it through ctx.platform, ctx["platform"], or ctx.services["platform"]:

@define_tool("preview_price_change", {"product_id": str, "new_price": float})
async def preview_price_change(args, ctx):
    # This works for tools.
    return ctx.platform.preview_price_change(
        args["product_id"],
        args["new_price"],
    )

Action dispatcher context is plan-execution context. It carries runtime-built execution data, such as hydrated references, not the services mapping:

def dispatch_actions(actions, ctx):
    # This works for dispatchers.
    resolved_refs = ctx["resolved_refs"]

So do not write dispatchers that expect service injection through ctx:

def dispatch_actions(actions, ctx):
    # Wrong today: action dispatcher ctx is not the tool service context.
    return ctx.platform.create_action(...)

Instead, build the runtime in a factory, create the platform client once, and close over it from the dispatcher:

from flowai_harness import (
    create_runtime,
    define_runtime,
    define_specialist,
    define_tenant,
    define_tool,
)

def build_runtime(tenant_id: str):
    api = MockCommerceApi()  # in production: a per-tenant API client

    @define_tool("preview_price_change", {"product_id": str, "new_price": float})
    async def preview_price_change(args, ctx):
        return ctx.platform.preview_price_change(
            args["product_id"],
            args["new_price"],
        )

    def dispatch_actions(actions, ctx):
        created = []
        for action in actions:
            if action["kind"] == "price_change":
                created.append(
                    api.create_action(
                        action_type="PRICE_CHANGE",
                        payload=action["payload"],
                    )
                )

        return {
            "entitiesAffected": len(created),
            "summary": f"Created {len(created)} platform action(s)",
            "details": {"createdActions": created},
        }

    planner = define_specialist(
        "commercial_planner",
        model="claude-sonnet-4-6",
        prompt="Preview price changes before proposing a plan.",
        tools=[preview_price_change],
    )

    return create_runtime(
        define_runtime(
            tenant=define_tenant(tenant_id, "v1"),
            agents=[planner],
            providers={"anthropic": {"apiKey": "unused"}},
        ),
        services={"platform": api},
        action_dispatcher=dispatch_actions,
    )

Both callbacks use the same platform client, but the dependency path is different:

tool callback       -> ctx.platform
action dispatcher  -> closure / runtime factory scope

Tool approvals

A tool can opt in through define_tool(..., approval="always") or a dynamic predicate. The approval event shape is identical except kind is "tool" and target carries the tool name. Approving runs the Python handler; rejecting records a decision event and returns a tool result that signals the rejection to the LLM.

@define_tool("increment_counter", {"amount": int}, approval="always")
async def increment_counter(args, ctx):
    return {"count": args["amount"]}

For dynamic policies, the predicate receives (args, ctx) and returns a bool. When it returns True the runtime gates the call; when it returns False the call runs immediately.

def needs_approval(args, ctx):
    return ctx["target"] == "guarded_echo"

@define_tool("guarded_echo", {"value": str}, approval=needs_approval)
async def guarded_echo(args, ctx):
    return {"echo": args["value"]}

Common errors

ErrorFix
No approval-required event appearsCheck the resolved runtime, agent, and tool policy. "never" at a narrower scope can override a broader floor.
A plan revise response acts like a rejectionrevise is plan-only. Tool approvals treat unsupported revise requests as rejection.
The stream appears pausedRespond with runtime.respond_to_approval(...); gated work does not continue until the host sends a decision.
The wrong tool is gatedTool overrides are scoped by agent, so check the agent that owns the current tool call.

See also