LangGraph Stateful Agents
Build agents that remember state across steps using LangGraph, with Redis for checkpoint storage and Postgres for conversation history — all running locally on kindling.
Why LangGraph on kindling
LangGraph agents need persistent state — checkpoints between graph nodes, conversation memory, tool call history. In production you'd use managed Redis and Postgres. On kindling, those same services spin up automatically in your local cluster, so you iterate on graph logic without deploying anywhere.
The key benefit: your agent's state survives kindling sync restarts.
Edit a node's logic, sync, and pick up the same conversation mid-flow.
What you'll build
A research agent that:
- Takes a research question
- Plans sub-questions (planner node)
- Searches for answers (researcher node, loops until satisfied)
- Synthesizes a final report (writer node)
- Persists state in Redis so you can resume interrupted research
┌──────────┐
│ Planner │
└────┬─────┘
│
┌────▼─────┐
┌────▶│Researcher│──────┐
│ └────┬─────┘ │
│ │ enough? │
│ no ───┘ yes ──┘
│ │
└─────────────── ┌────▼─────┐
│ Writer │
└──────────┘
State checkpointed in Redis at every transition
Conversation history stored in Postgres
Project structure
research-agent/
├── Dockerfile
├── requirements.txt
├── main.py
├── graph.py
└── models.py
requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.30.0
langgraph==0.2.0
langchain-openai==0.2.0
langchain-core==0.3.0
redis==5.1.0
asyncpg==0.30.0
models.py
from __future__ import annotations
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
class ResearchState(TypedDict):
"""State passed between graph nodes."""
messages: Annotated[list, add_messages]
question: str
sub_questions: list[str]
findings: list[dict]
iteration: int
max_iterations: int
report: str
graph.py
import json
import os
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from models import ResearchState
llm = ChatOpenAI(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"])
async def planner(state: ResearchState) -> dict:
"""Break the research question into sub-questions."""
response = await llm.ainvoke([
{"role": "system", "content": (
"You are a research planner. Given a question, produce 3-5 "
"focused sub-questions that together would answer the main question. "
"Return a JSON array of strings."
)},
{"role": "user", "content": state["question"]},
])
sub_questions = json.loads(response.content)
return {
"sub_questions": sub_questions,
"messages": [{"role": "assistant", "content": f"Planning: {len(sub_questions)} sub-questions"}],
"iteration": 0,
}
async def researcher(state: ResearchState) -> dict:
"""Research the next sub-question."""
idx = state["iteration"]
sub_questions = state["sub_questions"]
if idx >= len(sub_questions):
return {"iteration": idx}
sq = sub_questions[idx]
response = await llm.ainvoke([
{"role": "system", "content": (
"You are a thorough researcher. Answer this specific sub-question "
"with concrete facts and details. Be concise but comprehensive."
)},
{"role": "user", "content": sq},
])
finding = {"question": sq, "answer": response.content}
return {
"findings": state.get("findings", []) + [finding],
"iteration": idx + 1,
"messages": [{"role": "assistant", "content": f"Researched: {sq}"}],
}
def should_continue(state: ResearchState) -> str:
"""Decide whether to keep researching or write the report."""
if state["iteration"] < len(state["sub_questions"]):
return "researcher"
return "writer"
async def writer(state: ResearchState) -> dict:
"""Synthesize findings into a final report."""
findings_text = "\n\n".join(
f"**{f['question']}**\n{f['answer']}" for f in state.get("findings", [])
)
response = await llm.ainvoke([
{"role": "system", "content": (
"You are a report writer. Synthesize the research findings into "
"a clear, well-structured report that answers the original question."
)},
{"role": "user", "content": (
f"Original question: {state['question']}\n\n"
f"Research findings:\n{findings_text}"
)},
])
return {
"report": response.content,
"messages": [{"role": "assistant", "content": "Report complete."}],
}
def build_graph() -> StateGraph:
graph = StateGraph(ResearchState)
graph.add_node("planner", planner)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.set_entry_point("planner")
graph.add_edge("planner", "researcher")
graph.add_conditional_edges("researcher", should_continue)
graph.add_edge("writer", END)
return graph.compile()
main.py
import os
import uuid
from contextlib import asynccontextmanager
import asyncpg
import redis.asyncio as aioredis
from fastapi import FastAPI
from graph import build_graph
REDIS_URL = os.environ["RESEARCH_CACHE_URL"]
DATABASE_URL = os.environ["RESEARCH_DB_URL"]
@asynccontextmanager
async def lifespan(app: FastAPI):
# Set up Redis for LangGraph checkpointing
app.state.redis = aioredis.from_url(REDIS_URL)
# Set up Postgres for conversation history
app.state.pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=5)
# Create history table
async with app.state.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS research_sessions (
id TEXT PRIMARY KEY,
question TEXT NOT NULL,
report TEXT,
status TEXT DEFAULT 'running',
created_at TIMESTAMPTZ DEFAULT now(),
completed_at TIMESTAMPTZ
)
""")
app.state.graph = build_graph()
yield
await app.state.pool.close()
await app.state.redis.close()
app = FastAPI(title="Research Agent", lifespan=lifespan)
@app.post("/research")
async def start_research(question: str, max_iterations: int = 5):
"""Start a new research session."""
session_id = str(uuid.uuid4())[:8]
# Store session in Postgres
async with app.state.pool.acquire() as conn:
await conn.execute(
"INSERT INTO research_sessions (id, question) VALUES ($1, $2)",
session_id, question,
)
# Run the graph
result = await app.state.graph.ainvoke({
"question": question,
"messages": [],
"sub_questions": [],
"findings": [],
"iteration": 0,
"max_iterations": max_iterations,
"report": "",
})
# Update session with results
async with app.state.pool.acquire() as conn:
await conn.execute(
"UPDATE research_sessions SET report = $1, status = 'complete', "
"completed_at = now() WHERE id = $2",
result["report"], session_id,
)
return {
"session_id": session_id,
"question": question,
"sub_questions": result["sub_questions"],
"report": result["report"],
}
@app.get("/research/{session_id}")
async def get_session(session_id: str):
"""Retrieve a past research session."""
async with app.state.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM research_sessions WHERE id = $1", session_id
)
if not row:
return {"error": "Session not found"}
return dict(row)
@app.get("/research")
async def list_sessions():
"""List recent research sessions."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, question, status, created_at FROM research_sessions "
"ORDER BY created_at DESC LIMIT 20"
)
return [dict(r) for r in rows]
@app.get("/health")
async def health():
return {"status": "ok"}
Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
kindling setup
1. Store your API key
kindling secrets set OPENAI_API_KEY sk-your-key
2. Workflow
Both Redis and Postgres are local kindling dependencies — no cloud services needed.
name: dev-deploy
on:
push:
branches: [main]
workflow_dispatch:
env:
REGISTRY: registry:5000
TAG: ${{ github.actor }}-${{ github.sha }}
jobs:
deploy:
runs-on: [self-hosted, "${{ github.actor }}"]
steps:
- uses: actions/checkout@v4
- run: rm -rf /builds/*
- name: Build research agent
uses: kindling-sh/kindling/.github/actions/kindling-build@main
with:
name: research-agent
context: ${{ github.workspace }}
image: "${{ env.REGISTRY }}/research-agent:${{ env.TAG }}"
- name: Deploy research agent
uses: kindling-sh/kindling/.github/actions/kindling-deploy@main
with:
name: ${{ github.actor }}-research-agent
image: "${{ env.REGISTRY }}/research-agent:${{ env.TAG }}"
port: "8000"
ingress-host: "${{ github.actor }}-research.localhost"
health-check-path: "/health"
dependencies:
- type: postgres
name: research-db
- type: redis
name: research-cache
env: |
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: kindling-secret-openai-api-key
key: value
kindling auto-injects RESEARCH_DB_URL and RESEARCH_CACHE_URL as
env vars.
3. Iterate on graph logic
kindling sync -n <you>-research-agent -d .
# Edit graph.py — add a new node, change the routing logic,
# tweak prompts — state persists across syncs
Tips
- Add nodes incrementally: start with planner → writer, then insert researcher in between. Sync after each change to test.
- Inspect state: hit
GET /research/{session_id}to see the full report and verify your graph produced the right output. - Redis persistence: data survives
kindling syncbut notkindling reset. Use Postgres for anything you want to keep. - Swap to LangGraph Cloud later: the graph definition is the same — only the checkpoint backend changes from local Redis to LangGraph's managed persistence.