Approvals
The Flow AI runtime can pause mid-stream to ask the host application to approve a proposed plan or a pending tool call. Python supplies the policy and the response; the actual...
The Flow AI runtime can pause mid-stream to ask the host application to approve a proposed plan or a pending tool call. Python supplies the policy and the response; the actual gate lives in the Rust runtime.
When define_coordinator(..., approval={"plans": "always", "tools": "never"})
is set, that policy becomes the runtime approval floor unless
define_runtime(..., approval_policies=...) supplies one explicitly. Individual
agents can override the floor with approval={...}, and individual tools under
an agent can override it with tool_approvals={...} or define_tool(..., approval=...). If the resolved policy requires approval, the runtime emits an
approval-required event, suspends the stream, and waits for
runtime.respond_to_approval(...) before continuing.
Policy values
Runtime and agent approval policies accept the same shape for both channels:
| Value | Effect |
|---|---|
"never" | Calls run without a gate. |
"always" | Every call emits approval-required and waits. |
"default" | Per-agent approval={...} only. Leave this channel unchanged from the runtime floor. |
{"kind": "dynamic", "value": "<predicate-id>"} | Tool-only. The runtime calls a registered Python predicate per call. |
Plan approval supports "never" and "always". Tool approval also supports
dynamic predicates, registered through define_tool(..., approval=fn) or by
passing the approval_predicates mapping to create_runtime(...).
"default" is a per-agent sentinel, not a runtime-level value. In
define_coordinator(..., approval={...}) (and the other define_* agent
helpers) a "default" channel is stripped before the spec is built, so the
runtime floor applies unchanged. Passing it to
define_runtime(approval_policies={"plans": "default"}) raises a validation
error: at the runtime level an approval rule must be "never", "always", or
a dynamic rule.
coordinator = define_coordinator(
name="scenario_coordinator",
model="claude-sonnet-4-6",
routes=["scenario_planner", "scenario_executor"],
approval={"plans": "always", "tools": "never"},
prompt=coordinator_prompt,
)
executor = define_executor(
name="scenario_executor",
model="claude-sonnet-4-6",
plan=scenario_plan,
approval={"plans": "never", "tools": "never"},
tool_approvals={"execute_query": "always"},
prompt=executor_prompt,
)Precedence is runtime floor, then agent override, then tool override. Tool overrides are scoped by agent, so the same tool name can be approval-gated for one specialist and ungated for another.
The approval-required event
The stream emits a wire-shape dict. The data field carries the Rust
ApprovalRequest payload in camelCase:
{
"type": "approval-required",
"data": {
"id": "apr-1234",
"kind": "plan", # or "tool"
"target": "demo-plan-1", # plan id, or the tool name
"payload": {...}, # plan body, or tool args
"glimpse": {...}, # optional preview, may be absent
"resourceId": "acme",
"threadId": "thread-1",
"correlationId": "..." # optional, ties retries to the originating call
}
}For tool approvals, target is the tool name and correlationId is the LLM's
tool_use_id. For plan approvals, target and correlationId are both the
plan id.
Responding to an approval
runtime.respond_to_approval(approval_id, outcome, feedback=None, partial=None)
is an async method:
await runtime.respond_to_approval(
event["data"]["id"],
"approve",
feedback="approved by smoke test",
)Outcomes:
"approve"— runs the gated action."reject"— drops the action. Accept an optionalfeedbackstring."revise"— plan only. The runtime re-invokes the planner withpartial, which is forwarded as thepartialpayload the executor'sexecutePlantool returns. Tool approvals collapserevisetoreject.
After the response lands, the runtime emits approval-decision and then the
gated event sequence — for a plan, the executor's tool calls; for a tool, the
tool-invocation result event.
End-to-end example
The script below is a complete, runnable version of the same coordinator →
planner → executor flow the runtime's integration tests exercise. It uses the
scripted interpreter (interpreter="scripted"), so each call_agent prompt is
replayed deterministically instead of being sent to a live model: the planner
prompt is the storePlan tool call as JSON, and the executor prompt is the
matching executePlan call. The coordinator gates the plan; the host approves
it; only then does the executor's action dispatch run.
import asyncio
import json
from flowai_harness import (
create_runtime,
define_coordinator,
define_executor,
define_tenant,
define_plan,
define_planner,
define_runtime,
)
scenario_plan = define_plan(
"DemoPlan",
{
"type": "object",
"required": ["rationale", "actions"],
"properties": {
"rationale": {"type": "string"},
"actions": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["kind"],
"properties": {
"kind": {"type": "string"},
"message": {"type": "string"},
},
},
},
},
},
)
coordinator = define_coordinator(
"coordinator",
model="claude-sonnet-4-6",
prompt="Coordinate by delegating to the planner and executor.",
routes=["planner", "executor"],
approval={"plans": "always", "tools": "never"},
)
planner = define_planner(
"planner",
model="claude-sonnet-4-6",
prompt="Store exactly one typed plan.",
plan=scenario_plan,
)
executor = define_executor(
"executor",
model="claude-sonnet-4-6",
prompt="Execute the requested plan.",
plan=scenario_plan,
)
def dispatch_actions(actions, ctx):
print(f"dispatching {len(actions)} approved action(s)")
return {
"entitiesAffected": len(actions),
"summary": f"executed {len(actions)} action(s)",
"details": None,
}
runtime = create_runtime(
define_runtime(
tenant=define_tenant("acme", "v1"),
agents=[coordinator, planner, executor],
providers={"anthropic": {"apiKey": "unused"}},
),
action_dispatcher=dispatch_actions,
interpreter="scripted",
)
# The scripted interpreter replays these prompts instead of calling a model.
# The planner prompt is the storePlan tool call; the executor prompt is the
# executePlan call for the same plan id.
planner_prompt = json.dumps({
"tool": "storePlan",
"args": {
"specName": "DemoPlan",
"planId": "demo-plan-1",
"body": {
"rationale": "deterministic demo plan",
"actions": [
{"kind": "record_counter", "message": "record the approved action"},
],
},
},
})
executor_prompt = json.dumps({
"tool": "executePlan",
"args": {"planId": "demo-plan-1"},
})
async def main():
stream = runtime.query(
json.dumps({"script": [
{"tool": "call_agent", "args": {"agent": "planner", "prompt": planner_prompt}},
{"tool": "call_agent", "args": {"agent": "executor", "prompt": executor_prompt}},
]}),
thread_id="thread-1",
)
async for event in stream:
if event["type"] == "approval-required":
data = event["data"]
print(f"approval-required: kind={data['kind']} target={data['target']}")
await runtime.respond_to_approval(
data["id"],
"approve",
feedback="approved by host",
)
elif event["type"] == "approval-decision":
print(f"approval-decision: {event['data']['outcome']}")
asyncio.run(main())Running the script prints:
approval-required: kind=plan target=demo-plan-1
dispatching 1 approved action(s)
approval-decision: {'outcome': 'approve'}The gate fires when the executor calls executePlan: the stream suspends on
approval-required, dispatch_actions runs only after the "approve"
response lands, and the approval-decision event then records the outcome
before the executor's result events complete the stream.
Note
The coordinator must not dispatch any side effect before the gate resolves.
The runtime guarantees this — action_dispatcher is only invoked after an
"approve" response lands. Use the interpreter="scripted" mode in tests
to drive the exact sequence above without a live model.
Action dispatcher contract
action_dispatcher is the host adapter that applies approved plan actions to
your platform. It receives the full action list and a runtime-built context:
def dispatch_actions(actions, ctx):
...actions is a JSON-serializable list of normalized harness actions. Flat plan
actions are converted to the canonical shape before dispatch:
[
{
"kind": "price_change",
"payload": {"product_id": "sku-1", "new_price": 12.5},
"references": [],
}
]ctx["resolved_refs"] contains any action references hydrated by the runtime,
grouped by reference kind and id:
{
"ProductSet": {
"ref-123": {"product_ids": ["sku-1", "sku-2"]}
}
}The dispatcher may return None, which is treated as an empty execution result.
Otherwise it must return an object matching the execution-result envelope:
return {
"entitiesAffected": len(actions), # required non-negative integer
"summary": "Updated 3 products", # optional string or None
"details": { # optional JSON value
"platformJobId": "job-123",
},
}Only entitiesAffected, summary, and details are accepted at the top level.
Put domain-specific metadata under details. Invalid dispatcher returns fail
executePlan, and the plan is marked failed rather than silently discarding
malformed data.
Using platform API tools and an action dispatcher together
Most production agents use custom tools and an action dispatcher against the same external platform API. The split is:
- Custom tools let the planner inspect, search, simulate, or preview platform state while it is building a plan.
- The action dispatcher performs the approved writes by translating plan actions into platform API calls.
For example, a commerce platform might expose a generic action API:
class MockCommerceApi:
def preview_price_change(self, product_id, new_price):
return {"productId": product_id, "newPrice": new_price, "marginOk": True}
def preview_availability_change(self, product_id, available):
return {"productId": product_id, "available": available, "safeToApply": True}
def create_action(self, *, action_type, payload):
return {
"id": f"action-{action_type.lower()}-{payload['productId']}",
"type": action_type,
"payload": payload,
}The plan still uses typed action variants. The action kind is the stable
harness-side discriminator; the dispatcher maps it to the platform's API type.
from typing import Literal
from pydantic import BaseModel
from flowai_harness import TaggedUnion, define_plan, define_tool
class PriceChange(BaseModel):
kind: Literal["price_change"]
product_id: str
new_price: float
class AvailabilityChange(BaseModel):
kind: Literal["availability_change"]
product_id: str
available: bool
CommercialAction = TaggedUnion(PriceChange, AvailabilityChange)
class CommercialPlan(BaseModel):
rationale: str
actions: list[CommercialAction]
commercial_plan = define_plan("CommercialPlan", CommercialPlan)Tools can use the platform client for non-final operations. Pass the service to
create_runtime(..., services=...), then read it from the tool context:
api = MockCommerceApi()
@define_tool(
"preview_price_change",
{"product_id": str, "new_price": float},
approval="never",
)
async def preview_price_change(args, ctx):
return ctx.platform.preview_price_change(
args["product_id"],
args["new_price"],
)
@define_tool(
"preview_availability_change",
{"product_id": str, "available": bool},
approval="never",
)
async def preview_availability_change(args, ctx):
return ctx.platform.preview_availability_change(
args["product_id"],
args["available"],
)The dispatcher uses the same platform client, but it is not model-callable. It
runs only after the plan approval gate resolves to "approve". Unlike tool
handlers, the dispatcher does not receive services through ctx; create it in
the same runtime factory and close over the API client:
def dispatch_actions(actions, ctx):
created_actions = []
for action in actions:
payload = action["payload"]
if action["kind"] == "price_change":
created_actions.append(
api.create_action(
action_type="PRICE_CHANGE",
payload={
"productId": payload["product_id"],
"newPrice": payload["new_price"],
},
)
)
elif action["kind"] == "availability_change":
created_actions.append(
api.create_action(
action_type="AVAILABILITY_CHANGE",
payload={
"productId": payload["product_id"],
"available": payload["available"],
},
)
)
else:
raise ValueError(f"unsupported action kind: {action['kind']}")
return {
"entitiesAffected": len(created_actions),
"summary": f"Created {len(created_actions)} platform action(s)",
"details": {
"createdActions": created_actions,
},
}Then wire both pieces into the runtime:
runtime = create_runtime(
define_runtime(
tenant=define_tenant("acme", "v1"),
agents=[coordinator, planner, executor],
providers={"anthropic": {"apiKey": "unused"}},
),
services={"platform": api},
action_dispatcher=dispatch_actions,
)In this setup, the model can call preview tools while planning, but it cannot
directly call api.create_action(...). It can only store a typed plan and ask
the executor to run executePlan. The runtime validates the plan, asks for
approval when required, hydrates references, and only then invokes
dispatch_actions.
Dependency flow: tools vs dispatchers
Tool handlers and action dispatchers both receive a ctx argument, but it is
not the same context.
Tool handler context is host-service context. If you pass a service through
create_runtime(..., services={"platform": api}), custom Python tools can read
it through ctx.platform, ctx["platform"], or ctx.services["platform"]:
@define_tool("preview_price_change", {"product_id": str, "new_price": float})
async def preview_price_change(args, ctx):
# This works for tools.
return ctx.platform.preview_price_change(
args["product_id"],
args["new_price"],
)Action dispatcher context is plan-execution context. It carries runtime-built
execution data, such as hydrated references, not the services mapping:
def dispatch_actions(actions, ctx):
# This works for dispatchers.
resolved_refs = ctx["resolved_refs"]So do not write dispatchers that expect service injection through ctx:
def dispatch_actions(actions, ctx):
# Wrong today: action dispatcher ctx is not the tool service context.
return ctx.platform.create_action(...)Instead, build the runtime in a factory, create the platform client once, and close over it from the dispatcher:
from flowai_harness import (
create_runtime,
define_runtime,
define_specialist,
define_tenant,
define_tool,
)
def build_runtime(tenant_id: str):
api = MockCommerceApi() # in production: a per-tenant API client
@define_tool("preview_price_change", {"product_id": str, "new_price": float})
async def preview_price_change(args, ctx):
return ctx.platform.preview_price_change(
args["product_id"],
args["new_price"],
)
def dispatch_actions(actions, ctx):
created = []
for action in actions:
if action["kind"] == "price_change":
created.append(
api.create_action(
action_type="PRICE_CHANGE",
payload=action["payload"],
)
)
return {
"entitiesAffected": len(created),
"summary": f"Created {len(created)} platform action(s)",
"details": {"createdActions": created},
}
planner = define_specialist(
"commercial_planner",
model="claude-sonnet-4-6",
prompt="Preview price changes before proposing a plan.",
tools=[preview_price_change],
)
return create_runtime(
define_runtime(
tenant=define_tenant(tenant_id, "v1"),
agents=[planner],
providers={"anthropic": {"apiKey": "unused"}},
),
services={"platform": api},
action_dispatcher=dispatch_actions,
)Both callbacks use the same platform client, but the dependency path is different:
tool callback -> ctx.platform
action dispatcher -> closure / runtime factory scopeTool approvals
A tool can opt in through define_tool(..., approval="always") or a dynamic
predicate. The approval event shape is identical except kind is "tool" and
target carries the tool name. Approving runs the Python handler; rejecting
records a decision event and returns a tool result that signals the rejection
to the LLM.
@define_tool("increment_counter", {"amount": int}, approval="always")
async def increment_counter(args, ctx):
return {"count": args["amount"]}For dynamic policies, the predicate receives (args, ctx) and returns a bool.
When it returns True the runtime gates the call; when it returns False the
call runs immediately.
def needs_approval(args, ctx):
return ctx["target"] == "guarded_echo"
@define_tool("guarded_echo", {"value": str}, approval=needs_approval)
async def guarded_echo(args, ctx):
return {"echo": args["value"]}Common errors
| Error | Fix |
|---|---|
No approval-required event appears | Check the resolved runtime, agent, and tool policy. "never" at a narrower scope can override a broader floor. |
A plan revise response acts like a rejection | revise is plan-only. Tool approvals treat unsupported revise requests as rejection. |
| The stream appears paused | Respond with runtime.respond_to_approval(...); gated work does not continue until the host sends a decision. |
| The wrong tool is gated | Tool overrides are scoped by agent, so check the agent that owns the current tool call. |
See also
Guides
Use these guides when you have a specific runtime task to complete. They assume you already have a RuntimeSpec or have worked through the Quickstart.
Data Environment
The built-in catalog toolkit needs data dependencies the Python harness does not own: a catalog of warehouse objects, a catalog search index, and a target database for approved...
