OrkaJS
Orka.JS

StateGraph

Build complex workflows with typed state, interrupts for human-in-the-loop, andcheckpoint persistence.

Why StateGraph?

StateGraph is an advanced graph workflow system inspired by LangGraph. Unlike basic GraphWorkflow, StateGraph provides:

Typed State Engine

Security

Robust schema enforcement using TypeScript. Ensures data integrity across asynchronous node transitions.

Human-in-the-Loop

Control

Strategic interrupts for validation. Pause execution to allow manual review before critical LLM actions.

Checkpoints & Persistence

Resilience

Fault-tolerant memory. Automatically saves state to persistent storage to resume after failure.

Real-time Event Streaming

UX / Feedback

Reactive execution logs. Stream tokens and node status updates directly to your frontend.

Basic Usage

import { StateGraph, createStateAnnotation, END } from '@orka-js/graph';
 
// 1. Define your state type
interface AgentState {
messages: string[];
currentStep: string;
result: string;
}
 
// 2. Create state annotation with defaults
const stateAnnotation = createStateAnnotation<AgentState>(() => ({
messages: [],
currentStep: '',
result: '',
}));
 
// 3. Build the graph
const graph = new StateGraph<AgentState>({ stateAnnotation })
.addNode('analyze', async (state) => ({
currentStep: 'analyzing',
messages: [...state.messages, 'Analyzing input...'],
}))
.addNode('process', async (state) => ({
currentStep: 'processing',
result: 'Processed: ' + state.messages.join(', '),
}))
.setEntryPoint('analyze')
.addEdge('analyze', 'process')
.setFinishPoint('process');
 
// 4. Compile and run
const compiled = graph.compile();
const result = await compiled.invoke({
messages: ['Hello world'],
});
 
console.log(result.state.result);
// "Processed: Hello world, Analyzing input..."

State Reducers

By default, state updates replace values. Use reducers to accumulate or transform state:

import { createStateAnnotation, Reducers } from '@orka-js/graph';
 
interface ChatState {
messages: string[];
tokenCount: number;
metadata: Record<string, unknown>;
}
 
const stateAnnotation = createStateAnnotation<ChatState>(
// Default values
() => ({
messages: [],
tokenCount: 0,
metadata: {},
}),
// Reducers (optional)
{
messages: Reducers.appendList, // Append new messages to array
tokenCount: Reducers.increment, // Add to existing count
metadata: Reducers.mergeObject, // Merge objects
}
);
 
// Now when nodes return partial state, reducers are applied:
// Node returns: { messages: ['New message'], tokenCount: 50 }
// Result: messages = [...oldMessages, 'New message'], tokenCount = oldCount + 50

Built-in Reducers

Reducer PrimitiveOperational LogicData Compatibility
appendListCollection
[...current, ...update]Array
mergeObjectStructure
{...current, ...update}Object
addToSetUnique
Array.from(new Set())Array
keepLastN(n)Window
slice(-n)Array
incrementCounter
current + updateNumber
max / minLimit
Math.max / Math.minNumber

Conditional Edges

Route execution based on state values:

import { StateGraph, END } from '@orka-js/graph';
 
const graph = new StateGraph<AgentState>({ stateAnnotation })
.addNode('classifier', async (state) => ({
category: classifyInput(state.input),
}))
.addNode('technical_support', async (state) => ({
result: await handleTechnical(state),
}))
.addNode('general_support', async (state) => ({
result: await handleGeneral(state),
}))
.setEntryPoint('classifier')
.addConditionalEdges(
'classifier',
// Condition function - returns route key
(state) => state.category === 'technical' ? 'tech' : 'general',
// Path map - route key -> node name
{
tech: 'technical_support',
general: 'general_support',
}
)
.setFinishPoint('technical_support')
.setFinishPoint('general_support');
 
// Can also route to END directly:
.addConditionalEdges(
'check_done',
(state) => state.isDone ? END: 'continue',
{ continue: 'next_step' }
)

Interrupts (Human-in-the-Loop)

Pause execution for human review or input. Perfect for approval workflows, content moderation, or interactive agents.

import { StateGraph, GraphCheckpointStore } from '@orka-js/graph';
 
const graph = new StateGraph<AgentState>({ stateAnnotation })
.addNode('draft_email', async (state) => ({
draft: await generateEmail(state.request),
}))
.addNode('send_email', async (state) => ({
sent: true,
result: 'Email sent successfully',
}))
.setEntryPoint('draft_email')
.addEdge('draft_email', 'send_email')
.setFinishPoint('send_email');
 
const compiled = graph.compile();
const checkpointer = new GraphCheckpointStore<AgentState>();
 
// First run - interrupt BEFORE send_email for human approval
const result1 = await compiled.invoke(
{ request: 'Send meeting invite' },
{
checkpointer,
threadId: 'email-workflow-123',
interrupt: { before: ['send_email'] },
}
);
 
console.log(result1.interrupted); // true
console.log(result1.state.draft); // Generated email draft
console.log(result1.checkpoint?.id); // Checkpoint ID for resume
 
// Human reviews the draft...
// Then resume execution:
const result2 = await compiled.resume(
result1.checkpoint!.id,
{ checkpointer, threadId: 'email-workflow-123' }
);
 
console.log(result2.state.sent); // true

Resume with State Updates

Modify state when resuming - perfect for human corrections:

// Human edited the draft before approving
const result = await compiled.resumeWithState(
checkpoint.id,
{ draft: 'Human-edited email content...' }, // State update
{ checkpointer, threadId: 'email-workflow-123' }
);

Checkpoint Persistence

Checkpoints save the complete execution state, allowing you to resume, replay, or branch workflows.

import { GraphCheckpointStore } from '@orka-js/graph';
 
const checkpointer = new GraphCheckpointStore<AgentState>();
 
// Run with checkpointing
const result = await compiled.invoke(initialState, {
checkpointer,
threadId: 'my-workflow-session',
});
 
// List all checkpoints for a thread
const checkpoints = await checkpointer.list('my-workflow-session');
 
// Load a specific checkpoint
const checkpoint = await checkpointer.load(checkpointId);
 
// Load the latest checkpoint
const latest = await checkpointer.loadLatest('my-workflow-session');
 
// Delete checkpoints
await checkpointer.delete(checkpointId);
await checkpointer.deleteThread('my-workflow-session');

Note: GraphCheckpointStore is an in-memory store for development. For production, implement CheckpointStore interface with Redis, PostgreSQL, or another persistent storage.

Streaming Execution

Stream execution events for real-time UI updates:

const compiled = graph.compile();
 
for await (const event of compiled.stream(initialState)) {
switch (event.type) {
case 'node_start':
console.log(`Starting node: ${event.nodeId}`);
break;
case 'node_end':
console.log(`Finished node: ${event.nodeId}`);
break;
case 'state_update':
console.log('State updated:', event.stateUpdate);
break;
case 'interrupt':
console.log('Interrupted at:', event.nodeId);
console.log('Checkpoint:', event.checkpoint?.id);
break;
case 'checkpoint':
console.log('Checkpoint saved:', event.checkpoint?.id);
break;
case 'error':
console.error('Error:', event.error);
break;
case 'done':
console.log('Execution complete');
break;
}
}

Automatic Observability with Tracer

StateGraph can automatically emit trace events for every node execution when configured with a Tracer. Perfect for debugging and monitoring production workflows.

import { StateGraph, createStateAnnotation } from '@orka-js/graph';
import { Tracer } from '@orka-js/observability';
import { createDevToolsHook } from '@orka-js/devtools';
 
// Create a tracer with DevTools integration
const tracer = new Tracer({
logLevel: 'info',
hooks: [createDevToolsHook()],
});
 
// Pass tracer to StateGraph config
const graph = new StateGraph<AgentState>({
stateAnnotation,
name: 'MyWorkflow',
tracer, // Automatic tracing enabled
})
.addNode('step1', async (state) => ({ result: 'Step 1 done' }))
.addNode('step2', async (state) => ({ result: 'Step 2 done' }))
.setEntryPoint('step1')
.addEdge('step1', 'step2')
.setFinishPoint('step2');
 
// All node executions are automatically traced
const compiled = graph.compile();
const result = await compiled.invoke({ messages: [] });
 
// View traces in DevTools dashboard at http://localhost:3001

Automatic Tracing: When a tracer is configured, StateGraph automatically emits trace events for node start, completion, errors, and graph lifecycle. Each event includes timing, metadata, and state updates.

Mermaid Diagram Export

const compiled = graph.compile();
const mermaid = compiled.toMermaid();
 
console.log(mermaid);
// graph TD
// __start__((START))
// __start__ --> analyze
// analyze[analyze]
// process[process]
// __end__((END))
// analyze --> process
// process --> __end__

Complete Example: Approval Workflow

import {
StateGraph,
createStateAnnotation,
GraphCheckpointStore,
END
} from '@orka-js/graph';
import { OpenAIAdapter } from '@orka-js/openai';
 
// State definition
interface ApprovalState {
request: string;
analysis: string;
decision: 'approved' | 'rejected' | 'pending';
reason: string;
}
 
const stateAnnotation = createStateAnnotation<ApprovalState>(() => ({
request: '',
analysis: '',
decision: 'pending',
reason: '',
}));
 
const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! });
 
// Build the graph
const graph = new StateGraph<ApprovalState>({ stateAnnotation })
.addNode('analyze', async (state) => {
const result = await llm.generate(
`Analyze this request and provide risk assessment: ${state.request}`
);
return { analysis: result.content };
})
.addNode('auto_approve', async (state) => ({
decision: 'approved',
reason: 'Auto-approved: Low risk request',
}))
.addNode('human_review', async (state) => ({
// This node is a placeholder - human will update state on resume
}))
.addNode('finalize', async (state) => ({
reason: state.decision === 'approved'
? 'Request approved and processed'
: 'Request rejected: ' + state.reason,
}))
.setEntryPoint('analyze')
.addConditionalEdges(
'analyze',
(state) => state.analysis.includes('low risk') ? 'auto' : 'review',
{ auto: 'auto_approve', review: 'human_review' }
)
.addEdge('auto_approve', 'finalize')
.addEdge('human_review', 'finalize')
.setFinishPoint('finalize');
 
// Run with human-in-the-loop
const compiled = graph.compile();
const checkpointer = new GraphCheckpointStore<ApprovalState>();
 
const result = await compiled.invoke(
{ request: 'Access to production database' },
{
checkpointer,
threadId: 'approval-123',
interrupt: { after: ['human_review'] },
}
);
 
if (result.interrupted) {
// Human reviews and makes decision
const finalResult = await compiled.resumeWithState(
result.checkpoint!.id,
{ decision: 'approved', reason: 'Verified by security team' },
{ checkpointer, threadId: 'approval-123' }
);
console.log(finalResult.state.reason);
}

API Reference

Interface MethodExecution PhaseFunctional Core
addNode(name, fn)StructureAdd a node with async function
addEdge(from, to)StructureAdd edge between nodes
addConditionalEdges(from, fn, map)StructureAdd conditional routing
setEntryPoint(name)StructureSet starting node
setFinishPoint(name)StructureAdd edge to END
compile()BuildCompile graph for execution
invoke(state, config)RuntimeRun the graph
resume(checkpointId, config)RecoveryResume from checkpoint
resumeWithState(id, update, config)RecoveryResume with state update
stream(state, config)RuntimeStream execution events
toMermaid()AuditExport Mermaid diagram