Documentation index for AI agents: see /llms.txt. Markdown versions of every page are available at <path>.md or via Accept: text/markdown.
Tutorials

Tutorial: build the Acme scenario-planning agent

Build, run, and inspect a scenario-planning agent in fifteen minutes.

Build, run, and inspect a scenario-planning agent in fifteen minutes.

This walkthrough constructs an Acme scenario-planning runtime: a coordinator routes between a planner and an executor, both using shared domain knowledge and a typed plan whose actions are a tagged union. A specialist handles ad-hoc product questions. The final runtime runs against the deterministic testing interpreter, so you can complete the tutorial without provider credentials.

1. Install

Private preview access

flowai-harness is not currently available on the public PyPI registry. To get access to the preview release, contact aaro@flow-ai.com or karolus@flow-ai.com before running the install command below.

pip install flowai-harness

Python 3.11+ is required. The wheel embeds the native runtime, so no extra Rust toolchain is needed at install time. The model ids used below are illustrative: the final step runs under the deterministic testing interpreter, which never calls a provider, so any model string validates.

2. Define tenant identity and domain knowledge

Tenant identity scopes runtime storage, telemetry, references, plans, and approvals. Domain knowledge is separate prompt content that your agents can read.

from flowai_harness import define_tenant

tenant = define_tenant("acme", "v1")
domain_knowledge = {
    "entities": [{"id": "product", "description": "SKU-level product catalog"}],
    "dimensions": ["segment", "channel"],
    "action_types": ["price_change", "promotion_launch"],
    "data_model": {
        "segments": ["enterprise", "smb"],
        "channels": ["retail", "online"],
    },
}

To see the camelCase wire shape the runtime consumes:

print(tenant.model_dump(by_alias=True, mode="json"))
{'resourceId': 'acme', 'version': 'v1'}

If you see this dict, the tenant is wired correctly and every later primitive will scope itself to acme.

3. Define a reference

References are TTL-bounded, content-addressed handles to customer-owned values that agents pass between turns without dragging the full payload through every prompt. The glimpse callable distils the value into a small, prompt-friendly dict.

from pydantic import BaseModel

from flowai_harness import define_reference

class ProductSetPayload(BaseModel):
    product_ids: list[str]

ProductSet = define_reference(
    name="ProductSet",
    schema=ProductSetPayload,
    ttl_ms=60 * 60 * 1000,
    glimpse=lambda value: {
        "productCount": len(value.product_ids),
        "preview": value.product_ids[:3],
    },
)

4. Define a typed plan with a tagged action union

define_plan accepts any Pydantic schema. When the action list is polymorphic, wrap the variants in TaggedUnion.

from pydantic import BaseModel

from flowai_harness import TaggedUnion, define_plan

class PriceChange(BaseModel):
    kind: str = "price_change"
    product_id: str
    new_price: float

class PromotionLaunch(BaseModel):
    kind: str = "promotion_launch"
    product_ids: list[str]
    discount_pct: float

ScenarioAction = TaggedUnion(PriceChange, PromotionLaunch)

class ScenarioPlanPayload(BaseModel):
    scope_ref: str
    actions: list[ScenarioAction]
    rationale: str

scenario_plan = define_plan(name="ScenarioPlan", schema=ScenarioPlanPayload)

5. Define a tool

@define_tool produces a ToolSpec; calling it as a decorator binds an async handler. The handler receives (args, ctx), where ctx carries metadata such as tool_use_id and any Python services attached with create_runtime(..., services=...).

from flowai_harness import define_tool, glimpse

@define_tool(
    name="search_products",
    description="Search products by query.",
    input_schema={"query": str, "limit": int},
    approval="never",
)
async def search_products(args, ctx):
    products = await ctx.acme.search(args["query"], limit=args["limit"])
    return {
        "products": products,
        "glimpse": glimpse({
            "resultCount": len(products),
            "preview": [product["id"] for product in products[:3]],
        }),
    }

6. Compose the prompt

layered_prompt renders deterministic text from a fixed section order: identity, communication, operational rules, tools, domain knowledge, safety, output format, examples.

from flowai_harness import layered_prompt

coordinator_prompt = layered_prompt(
    identity="You coordinate scenario planning for Acme customers.",
    communication="Be concise and surface approval points explicitly.",
    operational_rules=[
        "Route plan-building work to the planner.",
        "Route approved materialization work to the executor.",
    ],
    tools=[search_products],
    domain_knowledge=domain_knowledge,
    safety=["Never execute side-effecting tools without approval."],
)

7. Define the agents

Four agent roles cover the topology:

from flowai_harness import (
    define_coordinator,
    define_executor,
    define_planner,
    define_specialist,
)

coordinator = define_coordinator(
    name="scenario_coordinator",
    model="claude-sonnet-4-6",
    routes=["scenario_planner", "scenario_executor"],
    approval={"plans": "always", "tools": "never"},
    prompt=coordinator_prompt,
)

planner = define_planner(
    name="scenario_planner",
    model="claude-sonnet-4-6",
    plan=scenario_plan,
    prompt=layered_prompt(
        identity="You produce typed scenario plans.",
        domain_knowledge=domain_knowledge,
    ),
)

executor = define_executor(
    name="scenario_executor",
    model="claude-sonnet-4-6",
    plan=scenario_plan,
    tools=[search_products],
    prompt=layered_prompt(
        identity="You execute approved scenario plans action by action.",
        domain_knowledge=domain_knowledge,
    ),
)

specialist = define_specialist(
    name="product_insights",
    model="claude-haiku-4-5",
    tools=[search_products],
    prompt="You answer focused product questions.",
)

8. Build the runtime spec

define_runtime collects everything into a single validated RuntimeSpec. Plans, toolkits, and tool bindings declared via agents are auto-attached.

from flowai_harness import define_runtime

runtime_spec = define_runtime(
    tenant=tenant,
    agents=[coordinator, planner, executor, specialist],
    references=[ProductSet],
    providers={"anthropic": {"apiKeyEnv": "ANTHROPIC_API_KEY"}},
)

9. Run with the deterministic testing interpreter

create_runtime returns a native-backed Runtime handle. The testing option swaps in a deterministic no-network interpreter.

import asyncio

from flowai_harness import Runtime, TestingConfig, create_runtime

class MockProducts:
    async def search(self, query: str, *, limit: int):
        return [{"id": f"{query}-{index}"} for index in range(limit)]

runtime: Runtime = create_runtime(
    runtime_spec,
    services={"acme": MockProducts()},
    testing=TestingConfig(mock_response="mocked runtime response"),
)

async def main() -> list[dict]:
    events = []
    async for event in runtime.query(
        "Draft a tiny pricing scenario.",
        thread_id="thread-1",
    ):
        events.append(event)
        print(event)
    return events

if __name__ == "__main__":
    asyncio.run(main())

Expect seven events: the coordinator tool-agent call/result pair wrapping a step-start, two text events, a data-latency-summary, and a finish. The toolInvocationId values are fresh UUIDs on every run:

{'agentName': 'scenario_coordinator', 'state': 'call', 'toolInvocationId': 'inv-...', 'type': 'tool-agent'}
{'type': 'step-start'}
{'text': 'Received: Draft a tiny pricing scenario.\n\n', 'type': 'text'}
{'text': 'mocked runtime response', 'type': 'text'}
{'data': {'hadTimeout': False, 'phases': {'llmCalls': 1, 'llmTimeMs': 0, 'subAgentTimeMs': 0, 'toolTimeMs': 0}, 'retryCount': 0, 'toolTimings': [], 'totalDurationMs': 0}, 'type': 'data-latency-summary'}
{'finishReason': 'stop', 'type': 'finish', 'usage': {'cacheCreationInputTokens': 0, 'cacheReadInputTokens': 0, 'completionTokens': 25, 'promptTokens': 50, 'totalTokens': 75}}
{'agentName': 'scenario_coordinator', 'state': 'result', 'toolInvocationId': 'inv-...', 'type': 'tool-agent'}

If the second text event carries mocked runtime response and the stream ends with finish followed by the coordinator's tool-agent result, the topology is assembled correctly.

10. Approvals

The coordinator above configures approval={"plans": "always"}, but the testing interpreter returns a mocked text response without storing a plan, so no gate fires in this run. When a plan or tool gate does fire, the stream emits an approval-required event and pauses until the host answers with runtime.respond_to_approval(event["data"]["id"], "approve"). See Approvals for the full gated coordinator-planner-executor flow, driven end-to-end with the scripted interpreter.

11. Save and run

Save the assembled sections as acme_scenario_agent.py and run it:

python acme_scenario_agent.py

The script prints the seven events from step 9 and exits — no credentials required. The same file ships in the repository as examples/acme_scenario_agent.py, backed by a smoke test, so it cannot drift from this tutorial.

Next steps

Read the Concepts section to learn how each primitive composes. Approvals, Testing, and Streaming Events cover the production workflows that usually follow this tutorial.