StateGraph
Build complex workflows with typed state, interrupts for human-in-the-loop, andcheckpoint persistence.
Why StateGraph?
StateGraph is an advanced graph workflow system inspired by LangGraph. Unlike basic GraphWorkflow, StateGraph provides:
Typed State Engine
SecurityRobust schema enforcement using TypeScript. Ensures data integrity across asynchronous node transitions.
Human-in-the-Loop
ControlStrategic interrupts for validation. Pause execution to allow manual review before critical LLM actions.
Checkpoints & Persistence
ResilienceFault-tolerant memory. Automatically saves state to persistent storage to resume after failure.
Real-time Event Streaming
UX / FeedbackReactive execution logs. Stream tokens and node status updates directly to your frontend.
Basic Usage
import { StateGraph, createStateAnnotation, END } from '@orka-js/graph'; // 1. Define your state typeinterface AgentState { messages: string[]; currentStep: string; result: string;} // 2. Create state annotation with defaultsconst stateAnnotation = createStateAnnotation<AgentState>(() => ({ messages: [], currentStep: '', result: '',})); // 3. Build the graphconst graph = new StateGraph<AgentState>({ stateAnnotation }) .addNode('analyze', async (state) => ({ currentStep: 'analyzing', messages: [...state.messages, 'Analyzing input...'], })) .addNode('process', async (state) => ({ currentStep: 'processing', result: 'Processed: ' + state.messages.join(', '), })) .setEntryPoint('analyze') .addEdge('analyze', 'process') .setFinishPoint('process'); // 4. Compile and runconst compiled = graph.compile();const result = await compiled.invoke({ messages: ['Hello world'],}); console.log(result.state.result);// "Processed: Hello world, Analyzing input..."State Reducers
By default, state updates replace values. Use reducers to accumulate or transform state:
import { createStateAnnotation, Reducers } from '@orka-js/graph'; interface ChatState { messages: string[]; tokenCount: number; metadata: Record<string, unknown>;} const stateAnnotation = createStateAnnotation<ChatState>( // Default values () => ({ messages: [], tokenCount: 0, metadata: {}, }), // Reducers (optional) { messages: Reducers.appendList, // Append new messages to array tokenCount: Reducers.increment, // Add to existing count metadata: Reducers.mergeObject, // Merge objects }); // Now when nodes return partial state, reducers are applied:// Node returns: { messages: ['New message'], tokenCount: 50 }// Result: messages = [...oldMessages, 'New message'], tokenCount = oldCount + 50Built-in Reducers
| Reducer Primitive | Operational Logic | Data Compatibility |
|---|---|---|
appendListCollection | [...current, ...update] | Array |
mergeObjectStructure | {...current, ...update} | Object |
addToSetUnique | Array.from(new Set()) | Array |
keepLastN(n)Window | slice(-n) | Array |
incrementCounter | current + update | Number |
max / minLimit | Math.max / Math.min | Number |
Conditional Edges
Route execution based on state values:
import { StateGraph, END } from '@orka-js/graph'; const graph = new StateGraph<AgentState>({ stateAnnotation }) .addNode('classifier', async (state) => ({ category: classifyInput(state.input), })) .addNode('technical_support', async (state) => ({ result: await handleTechnical(state), })) .addNode('general_support', async (state) => ({ result: await handleGeneral(state), })) .setEntryPoint('classifier') .addConditionalEdges( 'classifier', // Condition function - returns route key (state) => state.category === 'technical' ? 'tech' : 'general', // Path map - route key -> node name { tech: 'technical_support', general: 'general_support', } ) .setFinishPoint('technical_support') .setFinishPoint('general_support'); // Can also route to END directly:.addConditionalEdges( 'check_done', (state) => state.isDone ? END: 'continue', { continue: 'next_step' })Interrupts (Human-in-the-Loop)
Pause execution for human review or input. Perfect for approval workflows, content moderation, or interactive agents.
import { StateGraph, GraphCheckpointStore } from '@orka-js/graph'; const graph = new StateGraph<AgentState>({ stateAnnotation }) .addNode('draft_email', async (state) => ({ draft: await generateEmail(state.request), })) .addNode('send_email', async (state) => ({ sent: true, result: 'Email sent successfully', })) .setEntryPoint('draft_email') .addEdge('draft_email', 'send_email') .setFinishPoint('send_email'); const compiled = graph.compile();const checkpointer = new GraphCheckpointStore<AgentState>(); // First run - interrupt BEFORE send_email for human approvalconst result1 = await compiled.invoke( { request: 'Send meeting invite' }, { checkpointer, threadId: 'email-workflow-123', interrupt: { before: ['send_email'] }, }); console.log(result1.interrupted); // trueconsole.log(result1.state.draft); // Generated email draftconsole.log(result1.checkpoint?.id); // Checkpoint ID for resume // Human reviews the draft...// Then resume execution:const result2 = await compiled.resume( result1.checkpoint!.id, { checkpointer, threadId: 'email-workflow-123' }); console.log(result2.state.sent); // trueResume with State Updates
Modify state when resuming - perfect for human corrections:
// Human edited the draft before approvingconst result = await compiled.resumeWithState( checkpoint.id, { draft: 'Human-edited email content...' }, // State update { checkpointer, threadId: 'email-workflow-123' });Checkpoint Persistence
Checkpoints save the complete execution state, allowing you to resume, replay, or branch workflows.
import { GraphCheckpointStore } from '@orka-js/graph'; const checkpointer = new GraphCheckpointStore<AgentState>(); // Run with checkpointingconst result = await compiled.invoke(initialState, { checkpointer, threadId: 'my-workflow-session',}); // List all checkpoints for a threadconst checkpoints = await checkpointer.list('my-workflow-session'); // Load a specific checkpointconst checkpoint = await checkpointer.load(checkpointId); // Load the latest checkpointconst latest = await checkpointer.loadLatest('my-workflow-session'); // Delete checkpointsawait checkpointer.delete(checkpointId);await checkpointer.deleteThread('my-workflow-session');Note: GraphCheckpointStore is an in-memory store for development. For production, implement CheckpointStore interface with Redis, PostgreSQL, or another persistent storage.
Streaming Execution
Stream execution events for real-time UI updates:
const compiled = graph.compile(); for await (const event of compiled.stream(initialState)) { switch (event.type) { case 'node_start': console.log(`Starting node: ${event.nodeId}`); break; case 'node_end': console.log(`Finished node: ${event.nodeId}`); break; case 'state_update': console.log('State updated:', event.stateUpdate); break; case 'interrupt': console.log('Interrupted at:', event.nodeId); console.log('Checkpoint:', event.checkpoint?.id); break; case 'checkpoint': console.log('Checkpoint saved:', event.checkpoint?.id); break; case 'error': console.error('Error:', event.error); break; case 'done': console.log('Execution complete'); break; }}Automatic Observability with Tracer
StateGraph can automatically emit trace events for every node execution when configured with a Tracer. Perfect for debugging and monitoring production workflows.
import { StateGraph, createStateAnnotation } from '@orka-js/graph';import { Tracer } from '@orka-js/observability';import { createDevToolsHook } from '@orka-js/devtools'; // Create a tracer with DevTools integrationconst tracer = new Tracer({ logLevel: 'info', hooks: [createDevToolsHook()],}); // Pass tracer to StateGraph configconst graph = new StateGraph<AgentState>({ stateAnnotation, name: 'MyWorkflow', tracer, // Automatic tracing enabled}) .addNode('step1', async (state) => ({ result: 'Step 1 done' })) .addNode('step2', async (state) => ({ result: 'Step 2 done' })) .setEntryPoint('step1') .addEdge('step1', 'step2') .setFinishPoint('step2'); // All node executions are automatically tracedconst compiled = graph.compile();const result = await compiled.invoke({ messages: [] }); // View traces in DevTools dashboard at http://localhost:3001Automatic Tracing: When a tracer is configured, StateGraph automatically emits trace events for node start, completion, errors, and graph lifecycle. Each event includes timing, metadata, and state updates.
Mermaid Diagram Export
const compiled = graph.compile();const mermaid = compiled.toMermaid(); console.log(mermaid);// graph TD// __start__((START))// __start__ --> analyze// analyze[analyze]// process[process]// __end__((END))// analyze --> process// process --> __end__Complete Example: Approval Workflow
import { StateGraph, createStateAnnotation, GraphCheckpointStore, END } from '@orka-js/graph';import { OpenAIAdapter } from '@orka-js/openai'; // State definitioninterface ApprovalState { request: string; analysis: string; decision: 'approved' | 'rejected' | 'pending'; reason: string;} const stateAnnotation = createStateAnnotation<ApprovalState>(() => ({ request: '', analysis: '', decision: 'pending', reason: '',})); const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); // Build the graphconst graph = new StateGraph<ApprovalState>({ stateAnnotation }) .addNode('analyze', async (state) => { const result = await llm.generate( `Analyze this request and provide risk assessment: ${state.request}` ); return { analysis: result.content }; }) .addNode('auto_approve', async (state) => ({ decision: 'approved', reason: 'Auto-approved: Low risk request', })) .addNode('human_review', async (state) => ({ // This node is a placeholder - human will update state on resume })) .addNode('finalize', async (state) => ({ reason: state.decision === 'approved' ? 'Request approved and processed' : 'Request rejected: ' + state.reason, })) .setEntryPoint('analyze') .addConditionalEdges( 'analyze', (state) => state.analysis.includes('low risk') ? 'auto' : 'review', { auto: 'auto_approve', review: 'human_review' } ) .addEdge('auto_approve', 'finalize') .addEdge('human_review', 'finalize') .setFinishPoint('finalize'); // Run with human-in-the-loopconst compiled = graph.compile();const checkpointer = new GraphCheckpointStore<ApprovalState>(); const result = await compiled.invoke( { request: 'Access to production database' }, { checkpointer, threadId: 'approval-123', interrupt: { after: ['human_review'] }, }); if (result.interrupted) { // Human reviews and makes decision const finalResult = await compiled.resumeWithState( result.checkpoint!.id, { decision: 'approved', reason: 'Verified by security team' }, { checkpointer, threadId: 'approval-123' } ); console.log(finalResult.state.reason);}API Reference
| Interface Method | Execution Phase | Functional Core |
|---|---|---|
addNode(name, fn) | Structure | Add a node with async function |
addEdge(from, to) | Structure | Add edge between nodes |
addConditionalEdges(from, fn, map) | Structure | Add conditional routing |
setEntryPoint(name) | Structure | Set starting node |
setFinishPoint(name) | Structure | Add edge to END |
compile() | Build | Compile graph for execution |
invoke(state, config) | Runtime | Run the graph |
resume(checkpointId, config) | Recovery | Resume from checkpoint |
resumeWithState(id, update, config) | Recovery | Resume with state update |
stream(state, config) | Runtime | Stream execution events |
toMermaid() | Audit | Export Mermaid diagram |