
How to Build Lean Agent Workflows Without Killing Functionality
How to Build Lean Agent Workflows Without Killing Functionality
The promise: AI agents that can handle complex workflows autonomously.
The reality: Agents that burn through tokens, get stuck in loops, make unpredictable decisions, and fail in production.
The problem isn't agents. The problem is building agents with no constraints, letting them do whatever they want, whenever they want.
The solution: Structured workflows with strategic autonomy. Give agents just enough freedom to be useful, but not enough rope to hang themselves.
LangGraph is the tool for this. Let me show you how to build lean, performant agent workflows that actually work in production.
What Is LangGraph?
LangGraph is a framework for building stateful, multi-step agent workflows.
Key concept: Workflows as graphs
- Nodes = Steps (LLM calls, tool executions, logic)
- Edges = Transitions (what happens next)
- State = Data flowing through the graph
Why it matters: You control the structure, the agent handles the reasoning.
The Lean Workflow Pattern
Core principle: Deterministic scaffolding + strategic LLM reasoning.
Not this (fully agentic):
Goal: Process refund
Agent decides:
→ Step 1: Look up order? Or check eligibility first? Or validate user?
→ Step 2: Based on step 1 result... what now?
→ Step 3: Hmm, maybe I should... [gets confused]
This (structured with autonomy):
Step 1: [Deterministic] Validate user
Step 2: [Deterministic] Look up order
Step 3: [LLM] Analyze refund eligibility (requires judgment)
Step 4: [Deterministic] If approved → Process refund
Step 5: [LLM] Generate personalized response
Benefit: Agent can't get lost, but still applies intelligence where needed.
LangGraph Architecture
Basic Structure
from langgraph.graph import StateGraph, END
# Define state (data passed between nodes)
class WorkflowState(TypedDict):
user_id: str
order_id: str
refund_eligibility: dict
refund_processed: bool
response_message: str
# Create graph
workflow = StateGraph(WorkflowState)
# Add nodes (steps)
workflow.add_node("validate_user", validate_user_node)
workflow.add_node("lookup_order", lookup_order_node)
workflow.add_node("check_eligibility", check_eligibility_node)
workflow.add_node("process_refund", process_refund_node)
workflow.add_node("generate_response", generate_response_node)
# Add edges (flow)
workflow.add_edge("validate_user", "lookup_order")
workflow.add_edge("lookup_order", "check_eligibility")
workflow.add_conditional_edges(
"check_eligibility",
lambda state: "process" if state["refund_eligibility"]["approved"] else "reject",
{
"process": "process_refund",
"reject": "generate_response"
}
)
workflow.add_edge("process_refund", "generate_response")
workflow.add_edge("generate_response", END)
# Set entry point
workflow.set_entry_point("validate_user")
# Compile
app = workflow.compile()
Node Types
1. Deterministic Nodes (no LLM):
def validate_user_node(state: WorkflowState) -> WorkflowState:
# Pure logic, no LLM call
user = db.get_user(state["user_id"])
if not user:
raise ValueError("User not found")
return state
2. LLM-Enhanced Nodes (strategic reasoning):
def check_eligibility_node(state: WorkflowState) -> WorkflowState:
order = state.get("order")
# LLM analyzes edge cases
prompt = f"""
Analyze refund eligibility:
Order date: {order.date}
Return window: 30 days
Today: {datetime.now()}
Customer tier: {order.customer_tier}
Reason: {state.get("refund_reason")}
Product condition: {order.product_condition}
Is this eligible? Consider special circumstances.
Return JSON: {{"approved": bool, "reason": str}}
"""
result = llm.generate(prompt)
state["refund_eligibility"] = json.loads(result)
return state
3. Tool-Calling Nodes:
def lookup_order_node(state: WorkflowState) -> WorkflowState:
order_id = state["order_id"]
# Call external system
order = order_api.get_order(order_id)
state["order"] = order
return state
Pattern #1: Linear Workflow with Checkpoints
Use case: Multi-step process with validation at each step.
from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver
class OnboardingState(TypedDict):
email: str
email_valid: bool
account_created: bool
welcome_sent: bool
error: str
# Create graph with checkpointing
checkpointer = MemorySaver()
workflow = StateGraph(OnboardingState)
# Nodes
def validate_email(state):
if "@" not in state["email"]:
state["error"] = "Invalid email"
return state
state["email_valid"] = True
return state
def create_account(state):
if not state["email_valid"]:
return state
# Create account
account_id = db.create_account(state["email"])
state["account_id"] = account_id
state["account_created"] = True
return state
def send_welcome(state):
if not state["account_created"]:
return state
# LLM generates personalized welcome
welcome_message = llm.generate(f"Write a welcome email for {state['email']}")
email_service.send(state["email"], welcome_message)
state["welcome_sent"] = True
return state
# Build graph
workflow.add_node("validate_email", validate_email)
workflow.add_node("create_account", create_account)
workflow.add_node("send_welcome", send_welcome)
workflow.add_edge("validate_email", "create_account")
workflow.add_edge("create_account", "send_welcome")
workflow.add_edge("send_welcome", END)
workflow.set_entry_point("validate_email")
app = workflow.compile(checkpointer=checkpointer)
# Run with checkpointing (can resume if fails)
result = app.invoke({"email": "user@example.com"}, config={"configurable": {"thread_id": "user_123"}})
Benefit: If step 2 fails, you can resume from step 2 (not start over).
Pattern #2: Conditional Branching
Use case: Different paths based on data or LLM decision.
class SupportTicketState(TypedDict):
ticket_id: str
category: str
priority: str
assigned_to: str
resolved: bool
workflow = StateGraph(SupportTicketState)
def classify_ticket(state):
# LLM classifies
ticket_text = state["ticket_text"]
classification = llm.generate(f"""
Classify this support ticket:
{ticket_text}
Category: [billing, technical, account, other]
Priority: [low, medium, high, urgent]
Return JSON.
""")
result = json.loads(classification)
state["category"] = result["category"]
state["priority"] = result["priority"]
return state
def route_ticket(state) -> str:
# Deterministic routing based on classification
category = state["category"]
priority = state["priority"]
if priority == "urgent":
return "escalate"
elif category == "billing":
return "billing_team"
elif category == "technical":
return "tech_team"
else:
return "general_queue"
# Add nodes
workflow.add_node("classify", classify_ticket)
workflow.add_node("escalate", escalate_node)
workflow.add_node("billing_team", assign_to_billing)
workflow.add_node("tech_team", assign_to_tech)
workflow.add_node("general_queue", assign_to_general)
# Conditional routing
workflow.add_conditional_edges(
"classify",
route_ticket,
{
"escalate": "escalate",
"billing_team": "billing_team",
"tech_team": "tech_team",
"general_queue": "general_queue"
}
)
# All paths end
workflow.add_edge("escalate", END)
workflow.add_edge("billing_team", END)
workflow.add_edge("tech_team", END)
workflow.add_edge("general_queue", END)
workflow.set_entry_point("classify")
app = workflow.compile()
Benefit: Clear branching logic, predictable paths.
Pattern #3: Loops with Exit Conditions
Use case: Iterative processes that need to loop until done.
class ResearchState(TypedDict):
query: str
findings: List[str]
confidence: float
max_iterations: int
current_iteration: int
def search_step(state):
# Perform search
results = search_engine.search(state["query"])
state["findings"].extend(results)
state["current_iteration"] += 1
return state
def evaluate_confidence(state):
# LLM evaluates if we have enough info
prompt = f"""
Query: {state["query"]}
Findings so far: {state["findings"]}
Do we have enough information to answer confidently?
Return JSON: {{"confidence": float, "reasoning": str}}
"""
result = llm.generate(prompt)
evaluation = json.loads(result)
state["confidence"] = evaluation["confidence"]
return state
def should_continue(state) -> str:
# Exit conditions
if state["confidence"] > 0.85:
return "done"
if state["current_iteration"] >= state["max_iterations"]:
return "max_reached"
return "continue"
# Build graph
workflow = StateGraph(ResearchState)
workflow.add_node("search", search_step)
workflow.add_node("evaluate", evaluate_confidence)
workflow.add_node("synthesize", synthesize_findings)
# Loop until exit condition
workflow.add_edge("search", "evaluate")
workflow.add_conditional_edges(
"evaluate",
should_continue,
{
"continue": "search", # Loop back
"done": "synthesize",
"max_reached": "synthesize"
}
)
workflow.add_edge("synthesize", END)
workflow.set_entry_point("search")
app = workflow.compile()
# Run with max iterations cap
result = app.invoke({
"query": "What are the latest AI trends?",
"findings": [],
"confidence": 0.0,
"max_iterations": 5,
"current_iteration": 0
})
Benefit: Controlled loops (won't run forever), clear exit logic.
Pattern #4: Parallel Execution
Use case: Independent tasks that can run concurrently.
from langgraph.graph import StateGraph, END
import asyncio
class ParallelAnalysisState(TypedDict):
document: str
sentiment_analysis: dict
key_topics: list
summary: str
async def analyze_sentiment(state):
result = await llm_async.generate(f"Analyze sentiment: {state['document']}")
state["sentiment_analysis"] = json.loads(result)
return state
async def extract_topics(state):
result = await llm_async.generate(f"Extract key topics: {state['document']}")
state["key_topics"] = json.loads(result)
return state
async def summarize(state):
result = await llm_async.generate(f"Summarize: {state['document']}")
state["summary"] = result
return state
async def combine_results(state):
# Combine all parallel results
final_report = f"""
Sentiment: {state['sentiment_analysis']}
Topics: {state['key_topics']}
Summary: {state['summary']}
"""
state["final_report"] = final_report
return state
# Build graph
workflow = StateGraph(ParallelAnalysisState)
workflow.add_node("sentiment", analyze_sentiment)
workflow.add_node("topics", extract_topics)
workflow.add_node("summarize", summarize)
workflow.add_node("combine", combine_results)
# Parallel execution
workflow.add_edge("sentiment", "combine")
workflow.add_edge("topics", "combine")
workflow.add_edge("summarize", "combine")
workflow.add_edge("combine", END)
# Multiple entry points (runs in parallel)
workflow.set_entry_point("sentiment")
workflow.set_entry_point("topics")
workflow.set_entry_point("summarize")
app = workflow.compile()
# Run async
result = await app.ainvoke({"document": "Long document text..."})
Benefit: 3x faster (parallel vs sequential), lean context per LLM call.
Keeping Workflows Lean: Best Practices
Practice #1: Minimize State
Bad (bloated state):
class State(TypedDict):
# Storing everything
full_user_profile: dict # 5,000 tokens
complete_order_history: list # 10,000 tokens
all_product_details: list # 8,000 tokens
# ... more bloat
Good (lean state):
class State(TypedDict):
user_id: str # Reference, not full data
order_id: str # Reference, not full data
refund_amount: float # Just what's needed
eligibility_check: dict # Minimal result
Principle: Store references and minimal results, not full data dumps.
Practice #2: Clear Node Boundaries
Each node should do ONE thing:
# Bad: Node does too much
def mega_node(state):
user = validate_user(state)
order = lookup_order(state)
eligibility = check_eligibility(order)
refund = process_refund(eligibility)
email = send_email(user, refund)
# ... hard to debug, hard to optimize
# Good: Each node has one job
def validate_user_node(state): ...
def lookup_order_node(state): ...
def check_eligibility_node(state): ...
def process_refund_node(state): ...
def send_email_node(state): ...
Practice #3: Fail Fast
Add validation nodes that exit early:
def validate_input(state):
if not state.get("email"):
state["error"] = "Email required"
state["exit_early"] = True
return state
if not state.get("order_id"):
state["error"] = "Order ID required"
state["exit_early"] = True
return state
return state
def should_continue(state) -> str:
if state.get("exit_early"):
return "error_handler"
return "continue"
# Add to graph
workflow.add_conditional_edges(
"validate_input",
should_continue,
{
"continue": "next_step",
"error_handler": "error_handler"
}
)
Practice #4: Use Streaming
For long-running workflows, stream progress:
app = workflow.compile()
# Stream results
for event in app.stream({"query": "user question"}):
node_name = event["node"]
state = event["state"]
print(f"Completed: {node_name}, State: {state}")
Benefit: User sees progress, not a black box.
Practice #5: Budget Token Usage Per Node
def llm_node_with_budget(state):
# Enforce max context size
max_context_tokens = 2000
context = build_minimal_context(state, max_tokens=max_context_tokens)
response = llm.generate(context)
return state
Measuring Workflow Performance
Key metrics:
1. Workflow Duration
start_time = time.time()
result = app.invoke(initial_state)
duration = time.time() - start_time
print(f"Workflow took {duration}s")
Goal: < 10 seconds for most workflows
2. Token Usage Per Node
# Track in each node
def tracked_node(state):
tokens_before = count_tokens(state)
# Do work
result = process(state)
tokens_after = count_tokens(result)
log_metric("node_tokens", tokens_after - tokens_before)
return result
Goal: Minimize token delta per node
3. Success Rate
successes = 0
failures = 0
for test_case in test_cases:
try:
result = app.invoke(test_case)
if validate_result(result):
successes += 1
else:
failures += 1
except:
failures += 1
success_rate = successes / (successes + failures)
Goal: > 95% success rate
Common Mistakes
Mistake #1: No Structure
Wrong: Fully autonomous agent with no graph structure
Right: LangGraph with clear nodes and edges
Mistake #2: Too Much LLM
Wrong: Every node calls LLM
Right: LLM only where reasoning is needed
Mistake #3: No Exit Conditions
Wrong: Loops with no max iterations
Right: Always have exit conditions (max iterations, confidence threshold, timeout)
Mistake #4: Passing Full Objects
Wrong: Passing entire database records through state
Right: Pass IDs, fetch data in nodes as needed
The Bottom Line
Lean agent workflows = Structured graphs + strategic LLM usage
LangGraph provides the structure: nodes, edges, state, checkpoints
Keys to staying lean:
- Minimize state (references, not data dumps)
- Clear node boundaries (one job per node)
- Mix deterministic + LLM nodes strategically
- Add exit conditions to loops
- Fail fast with validation
- Track and optimize token usage
Expected results:
- Predictable execution paths
- 50-70% token reduction vs fully agentic
- 10-30% faster execution
- 95%+ reliability
Getting Started
Build your first lean workflow:
- Map your process: What are the steps?
- Identify decision points: Where is judgment needed?
- Create nodes: Deterministic for structure, LLM for reasoning
- Add edges: Define flow and branches
- Test and measure: Duration, tokens, success rate
Start simple: 3-5 nodes, linear flow, then add complexity.
Need help building performant LangGraph workflows? We've built dozens of production agent systems.
Get workflow architecture help →
Related reading:
- LangGraph documentation: https://www.langgraph.dev
- Deterministic vs agentic workflows (this blog)
About the Author
DomAIn Labs Team
The DomAIn Labs team consists of AI engineers, strategists, and educators passionate about demystifying AI for small businesses.