pip install agentlead-flow-core · v0.1.1 · MIT
Declare agents in YAML.
Run them as a graph.
A tiny framework for declaring agents — each a system prompt plus a set of tools — and running them as a graph over shared data state. Gemini by default, Claude built in, and a pluggable provider interface for the rest.
provider: gemini # default provider for all agents
model: gemini-3.5-flash # default model for all agents
agents:
analyst:
system: "You are a data analyst..."
tools: [fetch_sales_data, calculator]
writer:
system: "You are an executive writer..."
tools: [save_report]
pipeline:
- name: analyze
agent: analyst
input: "Analyze this year's sales..."
output: analysis # writes result to state["analysis"]
- name: write
agent: writer
input: |
Write a summary of:
{analysis} # pulled from state
output: summaryThe idea
You declare the graph.
The engine owns how it runs.
Nodes wrap an agent or a tool; edges decide what runs next. A linear pipeline: is just sugar the loader lowers to the same graph — so everything runs on one engine.
A scheduler walks the frontier
It runs every ready node, writes each result to a versioned channel, and drives the tool-use loop inside each agent.
Checkpoints at every boundary
State is snapshotted after each node, so a crashed or paused run resumes exactly where it left off.
Authority is opt-in
The model never owns flow by default. You grant it routing or replanning — only from a vocabulary you declare.
Any typed function is a tool
Add @tool to a function with type hints — the JSON Schema is derived for you. The docstring summary becomes the description; the Args: lines document each parameter.
from agentic_flow import tool
@tool
def save_report(filename: str, content: str) -> str:
"""Write a report to disk and return the path.
Args:
filename: File name to write, e.g. "report.md".
content: The full text of the report.
"""
...Mix providers in one graph
Set a pipeline default and override it per agent. Agents on different providers share one Store and can delegate to each other.
provider: gemini # pipeline default
model: gemini-3.5-flash
agents:
analyst: # uses the defaults above
system: "..."
auditor:
provider: anthropic # this agent runs on Claude
model: claude-opus-4-8 # optional — omit for the provider default
system: "..."Everything you need to run agents in production
Control flow, durability, delegation, secrets, reliability and observability — each configured in the same YAML, covered in depth in the docs.
Pipelines, loops & branches
A linear pipeline: lowers to a graph. Add a loop: with an until: condition and a max_iterations: cap, or a when:/then:/else: branch — no expression language, just state-template truthiness.
Author the graph directly
When a readable sequence can't express the flow — any-to-any handoff, cycles, fan-in, fan-out — declare an entry, nodes, and edges (static, conditional, or model-routed).
JSON-schema results
Give an agent an output_schema and its answer comes back as a dict. Later steps address fields directly with {report[title]}.
Agents as tools
List other agents under an agent's agents: key and they become callable tools. The sub-agent runs its own loop and shares the same Store.
One Store per run
Versioned channels + reducers + a message log + a todo board, threaded through every agent, tool, and sub-agent. Messages and todos use exactly-once delivery.
Checkpoint at every node
Every run checkpoints at each node boundary, so a crash resumes at the last completed node — inside loops and branches too. In-memory and JSON-file backends ship.
Pause for a person
A human: node pauses the run, checkpoints a marker, and raises NodePaused. The host collects an answer and calls resume() to continue — durable by definition.
Named, never embedded
Credentials are named in YAML and fetched late at point-of-use, wrapped in a masked Secret type, and never written to the Store — so they can't reach a checkpoint.
Retries & limits
Retry policies with exponential backoff at tool and agent scope, plus limits on steps, output tokens, wall-clock, and per-node visits — per agent and for the whole run.
Mix Gemini & Claude
Gemini by default, Anthropic built in, pluggable interface for the rest. Agents on different providers share one Store and can delegate to each other in a single graph.
Let the model drive flow
Opt into the authority spectrum: route (an agent picks the next node from your candidates) or replan (a planner rewrites the not-yet-run subgraph from your declared vocabulary).
One event seam
on_event(event, data) sees every node, tool call, retry, checkpoint, route and replan. Route it to the console, standard logging, or an EventBus you can stream() live.
Durable runs & human-in-the-loop
Pause for a person. Resume after a crash.
The engine checkpoints at every node boundary, so a crash resumes at the last completed node — inside loops and branches too. A human: node pauses the run and raises NodePaused; your host collects the answer and calls resume(...) to continue.
Channel values are JSON-serializable and exactly-once delivery survives a resume, so messages and assigned todos are never lost or doubled.
from agentic_flow import (
Orchestrator, InMemoryCheckpointer, NodePaused, registry,
)
orch = Orchestrator.from_yaml("pipeline.yaml", registry)
cp = InMemoryCheckpointer() # or JsonFileCheckpointer("./runs")
try:
orch.run({"topic": "Q3"}, run_id="R1", checkpointer=cp)
except NodePaused as paused:
print(paused.prompt) # show the human what to answer
answer = "approve" # ...collected out of band...
store = orch.resume("R1", cp, human_input=answer)
print(store["result"])The vocabulary
A small, precise glossary — the same terms the docs and the code use.
- Agent
- A system prompt + a set of tools + the model's tool-use loop.
- Node
- A graph vertex wrapping an agent, a tool call, or a human step.
- Edge
- A connection between nodes — static, conditional (when), or model-driven (route).
- Store
- Versioned channels + reducers + a message log + a todo board for one run.
- Reducer
- A function that merges channel updates across versions.
- Checkpoint
- A state snapshot taken at every node boundary, for durability and resume.
- Provider
- The LLM backend interface — Gemini and Anthropic ship; add your own.
- Authority
- The spectrum from a static flow to model-driven routing and replanning.
- Exactly-once delivery
- A message/todo guarantee for recipients that survives crashes and resume.
Install & run
pip install gives you the importable library and the agentic-flow CLI. Requires Python 3.10+.
Examples and docs live in the GitHub repo (not the wheel) — clone ai-agent-lead/agentic-flow and read the examples/ ladder in order.
# run a pipeline against its tools module
agentic-flow pipeline.yaml --tools my.tools
# print the plan as a graph — no model called
agentic-flow pipeline.yaml --tools my.tools --dry-run
# seed initial state
agentic-flow pipeline.yaml --tools my.tools --set topic="Q3 sales"
# durable run with a checkpoint backend
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1Build your agents with the same rigor.
Open source, MIT-licensed, and one pip install away.