AgentLead.dev

pip install agentlead-flow-core · v0.1.1 · MIT

Declare agents in YAML.
Run them as a graph.

A tiny framework for declaring agents — each a system prompt plus a set of tools — and running them as a graph over shared data state. Gemini by default, Claude built in, and a pluggable provider interface for the rest.

pipeline.yaml
provider: gemini                 # default provider for all agents
model: gemini-3.5-flash          # default model for all agents

agents:
  analyst:
    system: "You are a data analyst..."
    tools: [fetch_sales_data, calculator]
  writer:
    system: "You are an executive writer..."
    tools: [save_report]

pipeline:
  - name: analyze
    agent: analyst
    input: "Analyze this year's sales..."
    output: analysis             # writes result to state["analysis"]
  - name: write
    agent: writer
    input: |
      Write a summary of:
      {analysis}                 # pulled from state
    output: summary

The idea

You declare the graph. The engine owns how it runs.

Nodes wrap an agent or a tool; edges decide what runs next. A linear pipeline: is just sugar the loader lowers to the same graph — so everything runs on one engine.

build · YAML lowers to a runnable graph
YAMLagents · tools · edges
build_programvalidate & lower
Graphnodes + edges (IR)
schedulerwalk the frontier
the scheduler walks the graph
run · one node at a time, checkpointed at every boundary
entrystart
gatheragent
analyzeagent
writetool
__end__done
channelssourcesclaimsdraftvalue + version + reducer

A scheduler walks the frontier

It runs every ready node, writes each result to a versioned channel, and drives the tool-use loop inside each agent.

Checkpoints at every boundary

State is snapshotted after each node, so a crashed or paused run resumes exactly where it left off.

Authority is opt-in

The model never owns flow by default. You grant it routing or replanning — only from a vocabulary you declare.

Any typed function is a tool

Add @tool to a function with type hints — the JSON Schema is derived for you. The docstring summary becomes the description; the Args: lines document each parameter.

tools.py
from agentic_flow import tool

@tool
def save_report(filename: str, content: str) -> str:
    """Write a report to disk and return the path.

    Args:
        filename: File name to write, e.g. "report.md".
        content: The full text of the report.
    """
    ...

Mix providers in one graph

Set a pipeline default and override it per agent. Agents on different providers share one Store and can delegate to each other.

providers.yaml
provider: gemini             # pipeline default
model: gemini-3.5-flash

agents:
  analyst:                   # uses the defaults above
    system: "..."
  auditor:
    provider: anthropic      # this agent runs on Claude
    model: claude-opus-4-8   # optional — omit for the provider default
    system: "..."

Everything you need to run agents in production

Control flow, durability, delegation, secrets, reliability and observability — each configured in the same YAML, covered in depth in the docs.

control flow

Pipelines, loops & branches

A linear pipeline: lowers to a graph. Add a loop: with an until: condition and a max_iterations: cap, or a when:/then:/else: branch — no expression language, just state-template truthiness.

graph

Author the graph directly

When a readable sequence can't express the flow — any-to-any handoff, cycles, fan-in, fan-out — declare an entry, nodes, and edges (static, conditional, or model-routed).

structured output

JSON-schema results

Give an agent an output_schema and its answer comes back as a dict. Later steps address fields directly with {report[title]}.

delegation

Agents as tools

List other agents under an agent's agents: key and they become callable tools. The sub-agent runs its own loop and shares the same Store.

shared state

One Store per run

Versioned channels + reducers + a message log + a todo board, threaded through every agent, tool, and sub-agent. Messages and todos use exactly-once delivery.

durability

Checkpoint at every node

Every run checkpoints at each node boundary, so a crash resumes at the last completed node — inside loops and branches too. In-memory and JSON-file backends ship.

human-in-the-loop

Pause for a person

A human: node pauses the run, checkpoints a marker, and raises NodePaused. The host collects an answer and calls resume() to continue — durable by definition.

secrets

Named, never embedded

Credentials are named in YAML and fetched late at point-of-use, wrapped in a masked Secret type, and never written to the Store — so they can't reach a checkpoint.

reliability

Retries & limits

Retry policies with exponential backoff at tool and agent scope, plus limits on steps, output tokens, wall-clock, and per-node visits — per agent and for the whole run.

providers

Mix Gemini & Claude

Gemini by default, Anthropic built in, pluggable interface for the rest. Agents on different providers share one Store and can delegate to each other in a single graph.

authority

Let the model drive flow

Opt into the authority spectrum: route (an agent picks the next node from your candidates) or replan (a planner rewrites the not-yet-run subgraph from your declared vocabulary).

observability

One event seam

on_event(event, data) sees every node, tool call, retry, checkpoint, route and replan. Route it to the console, standard logging, or an EventBus you can stream() live.

Durable runs & human-in-the-loop

Pause for a person. Resume after a crash.

The engine checkpoints at every node boundary, so a crash resumes at the last completed node — inside loops and branches too. A human: node pauses the run and raises NodePaused; your host collects the answer and calls resume(...) to continue.

Channel values are JSON-serializable and exactly-once delivery survives a resume, so messages and assigned todos are never lost or doubled.

approval.py
from agentic_flow import (
    Orchestrator, InMemoryCheckpointer, NodePaused, registry,
)

orch = Orchestrator.from_yaml("pipeline.yaml", registry)
cp = InMemoryCheckpointer()              # or JsonFileCheckpointer("./runs")
try:
    orch.run({"topic": "Q3"}, run_id="R1", checkpointer=cp)
except NodePaused as paused:
    print(paused.prompt)                 # show the human what to answer
    answer = "approve"                   # ...collected out of band...
store = orch.resume("R1", cp, human_input=answer)
print(store["result"])

The vocabulary

A small, precise glossary — the same terms the docs and the code use.

Agent
A system prompt + a set of tools + the model's tool-use loop.
Node
A graph vertex wrapping an agent, a tool call, or a human step.
Edge
A connection between nodes — static, conditional (when), or model-driven (route).
Store
Versioned channels + reducers + a message log + a todo board for one run.
Reducer
A function that merges channel updates across versions.
Checkpoint
A state snapshot taken at every node boundary, for durability and resume.
Provider
The LLM backend interface — Gemini and Anthropic ship; add your own.
Authority
The spectrum from a static flow to model-driven routing and replanning.
Exactly-once delivery
A message/todo guarantee for recipients that survives crashes and resume.

Install & run

pip install gives you the importable library and the agentic-flow CLI. Requires Python 3.10+.

# from PyPI
$ pip install agentlead-flow-core
# run the examples on Gemini
$ export GEMINI_API_KEY=...
# or run an agent on Claude
$ export ANTHROPIC_API_KEY=sk-ant-...

Examples and docs live in the GitHub repo (not the wheel) — clone ai-agent-lead/agentic-flow and read the examples/ ladder in order.

agentic-flow — CLI
# run a pipeline against its tools module
agentic-flow pipeline.yaml --tools my.tools

# print the plan as a graph — no model called
agentic-flow pipeline.yaml --tools my.tools --dry-run

# seed initial state
agentic-flow pipeline.yaml --tools my.tools --set topic="Q3 sales"

# durable run with a checkpoint backend
agentic-flow pipeline.yaml --tools my.tools --checkpoint json:./runs --run-id R1

Build your agents with the same rigor.

Open source, MIT-licensed, and one pip install away.