Tutorial: build the Acme scenario-planning agent
Build, run, and inspect a scenario-planning agent in fifteen minutes.
Build, run, and inspect a scenario-planning agent in fifteen minutes.
This walkthrough constructs an Acme scenario-planning runtime: a coordinator routes between a planner and an executor, both using shared domain knowledge and a typed plan whose actions are a tagged union. A specialist handles ad-hoc product questions. The final runtime runs against the deterministic testing interpreter, so you can complete the tutorial without provider credentials.
1. Install
Private preview access
flowai-harness is not currently available on the public PyPI registry.
To get access to the preview release, contact
aaro@flow-ai.com or
karolus@flow-ai.com before running the
install command below.
pip install flowai-harnessPython 3.11+ is required. The wheel embeds the native runtime, so no extra Rust toolchain is needed at install time. The model ids used below are illustrative: the final step runs under the deterministic testing interpreter, which never calls a provider, so any model string validates.
2. Define tenant identity and domain knowledge
Tenant identity scopes runtime storage, telemetry, references, plans, and approvals. Domain knowledge is separate prompt content that your agents can read.
from flowai_harness import define_tenant
tenant = define_tenant("acme", "v1")
domain_knowledge = {
"entities": [{"id": "product", "description": "SKU-level product catalog"}],
"dimensions": ["segment", "channel"],
"action_types": ["price_change", "promotion_launch"],
"data_model": {
"segments": ["enterprise", "smb"],
"channels": ["retail", "online"],
},
}To see the camelCase wire shape the runtime consumes:
print(tenant.model_dump(by_alias=True, mode="json")){'resourceId': 'acme', 'version': 'v1'}If you see this dict, the tenant is wired correctly and every later primitive
will scope itself to acme.
3. Define a reference
References are TTL-bounded, content-addressed handles to customer-owned values
that agents pass between turns without dragging the full payload through every
prompt. The glimpse callable distils the value into a small, prompt-friendly
dict.
from pydantic import BaseModel
from flowai_harness import define_reference
class ProductSetPayload(BaseModel):
product_ids: list[str]
ProductSet = define_reference(
name="ProductSet",
schema=ProductSetPayload,
ttl_ms=60 * 60 * 1000,
glimpse=lambda value: {
"productCount": len(value.product_ids),
"preview": value.product_ids[:3],
},
)4. Define a typed plan with a tagged action union
define_plan accepts any Pydantic schema. When the action list is polymorphic,
wrap the variants in TaggedUnion.
from pydantic import BaseModel
from flowai_harness import TaggedUnion, define_plan
class PriceChange(BaseModel):
kind: str = "price_change"
product_id: str
new_price: float
class PromotionLaunch(BaseModel):
kind: str = "promotion_launch"
product_ids: list[str]
discount_pct: float
ScenarioAction = TaggedUnion(PriceChange, PromotionLaunch)
class ScenarioPlanPayload(BaseModel):
scope_ref: str
actions: list[ScenarioAction]
rationale: str
scenario_plan = define_plan(name="ScenarioPlan", schema=ScenarioPlanPayload)5. Define a tool
@define_tool produces a ToolSpec; calling it as a decorator binds an async
handler. The handler receives (args, ctx), where ctx carries metadata such as
tool_use_id and any Python services attached with
create_runtime(..., services=...).
from flowai_harness import define_tool, glimpse
@define_tool(
name="search_products",
description="Search products by query.",
input_schema={"query": str, "limit": int},
approval="never",
)
async def search_products(args, ctx):
products = await ctx.acme.search(args["query"], limit=args["limit"])
return {
"products": products,
"glimpse": glimpse({
"resultCount": len(products),
"preview": [product["id"] for product in products[:3]],
}),
}6. Compose the prompt
layered_prompt renders deterministic text from a fixed section order:
identity, communication, operational rules, tools, domain knowledge, safety,
output format, examples.
from flowai_harness import layered_prompt
coordinator_prompt = layered_prompt(
identity="You coordinate scenario planning for Acme customers.",
communication="Be concise and surface approval points explicitly.",
operational_rules=[
"Route plan-building work to the planner.",
"Route approved materialization work to the executor.",
],
tools=[search_products],
domain_knowledge=domain_knowledge,
safety=["Never execute side-effecting tools without approval."],
)7. Define the agents
Four agent roles cover the topology:
from flowai_harness import (
define_coordinator,
define_executor,
define_planner,
define_specialist,
)
coordinator = define_coordinator(
name="scenario_coordinator",
model="claude-sonnet-4-6",
routes=["scenario_planner", "scenario_executor"],
approval={"plans": "always", "tools": "never"},
prompt=coordinator_prompt,
)
planner = define_planner(
name="scenario_planner",
model="claude-sonnet-4-6",
plan=scenario_plan,
prompt=layered_prompt(
identity="You produce typed scenario plans.",
domain_knowledge=domain_knowledge,
),
)
executor = define_executor(
name="scenario_executor",
model="claude-sonnet-4-6",
plan=scenario_plan,
tools=[search_products],
prompt=layered_prompt(
identity="You execute approved scenario plans action by action.",
domain_knowledge=domain_knowledge,
),
)
specialist = define_specialist(
name="product_insights",
model="claude-haiku-4-5",
tools=[search_products],
prompt="You answer focused product questions.",
)8. Build the runtime spec
define_runtime collects everything into a single validated RuntimeSpec.
Plans, toolkits, and tool bindings declared via agents are auto-attached.
from flowai_harness import define_runtime
runtime_spec = define_runtime(
tenant=tenant,
agents=[coordinator, planner, executor, specialist],
references=[ProductSet],
providers={"anthropic": {"apiKeyEnv": "ANTHROPIC_API_KEY"}},
)9. Run with the deterministic testing interpreter
create_runtime returns a native-backed Runtime handle. The testing option
swaps in a deterministic no-network interpreter.
import asyncio
from flowai_harness import Runtime, TestingConfig, create_runtime
class MockProducts:
async def search(self, query: str, *, limit: int):
return [{"id": f"{query}-{index}"} for index in range(limit)]
runtime: Runtime = create_runtime(
runtime_spec,
services={"acme": MockProducts()},
testing=TestingConfig(mock_response="mocked runtime response"),
)
async def main() -> list[dict]:
events = []
async for event in runtime.query(
"Draft a tiny pricing scenario.",
thread_id="thread-1",
):
events.append(event)
print(event)
return events
if __name__ == "__main__":
asyncio.run(main())Expect seven events: the coordinator tool-agent call/result pair wrapping a
step-start, two text events, a data-latency-summary, and a finish. The
toolInvocationId values are fresh UUIDs on every run:
{'agentName': 'scenario_coordinator', 'state': 'call', 'toolInvocationId': 'inv-...', 'type': 'tool-agent'}
{'type': 'step-start'}
{'text': 'Received: Draft a tiny pricing scenario.\n\n', 'type': 'text'}
{'text': 'mocked runtime response', 'type': 'text'}
{'data': {'hadTimeout': False, 'phases': {'llmCalls': 1, 'llmTimeMs': 0, 'subAgentTimeMs': 0, 'toolTimeMs': 0}, 'retryCount': 0, 'toolTimings': [], 'totalDurationMs': 0}, 'type': 'data-latency-summary'}
{'finishReason': 'stop', 'type': 'finish', 'usage': {'cacheCreationInputTokens': 0, 'cacheReadInputTokens': 0, 'completionTokens': 25, 'promptTokens': 50, 'totalTokens': 75}}
{'agentName': 'scenario_coordinator', 'state': 'result', 'toolInvocationId': 'inv-...', 'type': 'tool-agent'}If the second text event carries mocked runtime response and the stream
ends with finish followed by the coordinator's tool-agent result, the
topology is assembled correctly.
10. Approvals
The coordinator above configures approval={"plans": "always"}, but the
testing interpreter returns a mocked text response without storing a plan, so
no gate fires in this run. When a plan or tool gate does fire, the stream emits
an approval-required event and pauses until the host answers with
runtime.respond_to_approval(event["data"]["id"], "approve"). See
Approvals for the full gated
coordinator-planner-executor flow, driven end-to-end with the scripted
interpreter.
11. Save and run
Save the assembled sections as acme_scenario_agent.py and run it:
python acme_scenario_agent.pyThe script prints the seven events from step 9 and exits — no credentials
required. The same file ships in the repository as
examples/acme_scenario_agent.py, backed by a smoke test, so it cannot drift
from this tutorial.
Next steps
Read the Concepts section to learn how each primitive composes. Approvals, Testing, and Streaming Events cover the production workflows that usually follow this tutorial.
