Streaming
Real-time token streaming for responsiveAI applications.
OrkaJS supports real-time streaming of LLM responses, enabling you to display tokens as they are generated. This dramatically improves user experience by reducing perceived latency.
Key Features
High Precision
Token-by-token streaming with callbacks
Lightning Speed
Event-based architecture for fine-grained control
Time Efficiency
Time to First Token (TTFT) tracking
Native Connectivity
Support for all LLM providers (OpenAI, Anthropic, Mistral, Ollama)
- Cancellation support via AbortController
- Extended thinking support for Claude models
Quick Start
The simplest way to use streaming is with the onToken callback:
import { OpenAIAdapter } from '@orka-js/openai'; const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); // Stream with onToken callbackconst result = await llm.streamGenerate('Explain quantum computing', { onToken: (token, index) => { process.stdout.write(token); // Print each token as it arrives }, onEvent: (event) => { if (event.type === 'done') { console.log('\nStream complete!'); } },}); console.log('Total tokens:', result.usage.totalTokens);console.log('Time to first token:', result.ttft, 'ms');Using the stream() Method
For more control, use the async iterator pattern:
import { OpenAIAdapter } from '@orka-js/openai'; const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); // Use async iterator for full controlfor await (const event of llm.stream('Write a poem about AI')) { switch (event.type) { case 'token': process.stdout.write(event.token); break; case 'thinking': console.log('[Thinking]:', event.delta); break; case 'usage': console.log('Usage:', event.usage); break; case 'done': console.log('\nFinished:', event.finishReason); break; case 'error': console.error('Error:', event.message); break; }}Event Types
The streaming system emits various event types:
| Event Lifecycle | Functional Description | Identifier |
|---|---|---|
Individual token received | token | |
Accumulated content with delta | content | |
Model reasoning (Claude extended thinking) | thinking | |
Tool/function call started | tool_call | |
Token usage statistics | usage | |
Stream completed | done | |
Error occurred | error |
import type { LLMStreamEvent } from '@orka-js/core'; // Event type definitionstype StreamEventType = | 'token' // Individual token received | 'content' // Content chunk with accumulated text | 'tool_call' // Tool/function call started | 'thinking' // Model reasoning (Claude) | 'usage' // Token usage update | 'done' // Stream completed | 'error'; // Error occurred // Token event structureinterface TokenEvent { type: 'token'; token: string; // The token text index: number; // Token position timestamp: number; // Event timestamp} // Done event structureinterface DoneEvent { type: 'done'; content: string; // Full response content finishReason: 'stop' | 'length' | 'tool_calls' | 'error'; usage?: { promptTokens: number; completionTokens: number; totalTokens: number; };}Stream Result
The streamGenerate() method returns a StreamResult with additional metrics:
interface StreamResult { content: string; // Full response content usage: { promptTokens: number; completionTokens: number; totalTokens: number; }; model: string; // Model used finishReason: 'stop' | 'length' | 'tool_calls' | 'error'; ttft?: number; // Time to first token (ms) durationMs: number; // Total stream duration (ms)} // Example usageconst result = await llm.streamGenerate('Hello');console.log('TTFT:', result.ttft, 'ms');console.log('Duration:', result.durationMs, 'ms');console.log('Tokens/sec:', result.usage.completionTokens / (result.durationMs / 1000));Checking Streaming Support
You can check if an adapter supports streaming:
import { isStreamingAdapter } from '@orka-js/core';import { OpenAIAdapter } from '@orka-js/openai'; const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); if (isStreamingAdapter(llm)) { // TypeScript knows llm has stream() and streamGenerate() const result = await llm.streamGenerate('Hello'); console.log(result.content);} else { // Fallback to regular generate const result = await llm.generate('Hello'); console.log(result.content);} // Check property directlyconsole.log('Supports streaming:', llm.supportsStreaming); // trueCancellation
Use AbortController to cancel a stream:
import { OpenAIAdapter } from '@orka-js/openai'; const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); // Create abort controllerconst controller = new AbortController(); // Cancel after 5 secondssetTimeout(() => controller.abort(), 5000); try { const result = await llm.streamGenerate('Write a very long essay...', { signal: controller.signal, onToken: (token) => process.stdout.write(token), });} catch (error) { if (error.name === 'AbortError') { console.log('Stream was cancelled'); }}Provider Support
All LLM adapters support streaming:
| Service Infrastructure | Core Capabilities | Provider ID |
|---|---|---|
Stack GPT-4o / GPT-4 Turbo | Full streaming with usage stats | OpenAI |
Stack Claude 3.5 Sonnet / Opus | Streaming with extended thinking support | Anthropic |
Stack Mistral Large / Codestral | OpenAI-compatible streaming | Mistral |
Stack Local Llama 3 / Mistral / Phi | NDJSON streaming format | Ollama |
Best Practices
- Always handle errors in your onEvent callback
- Use TTFT metrics to monitor performance
- Implement cancellation for long-running streams
- Buffer tokens for smoother UI updates
Integration with RAG
Streaming works seamlessly with RAG pipelines:
import { createOrka, OpenAIAdapter, MemoryVectorAdapter } from 'orkajs';import { isStreamingAdapter } from '@orka-js/core'; const orka = createOrka({ llm: new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }), vectorDB: new MemoryVectorAdapter(),}); // Create knowledge baseawait orka.knowledge.create({ name: 'docs', source: ['OrkaJS is a TypeScript framework for LLM systems.'],}); // Retrieve context firstconst context = await orka.knowledge.search('docs', 'What is OrkaJS?', { topK: 3 }); // Then stream the response with contextif (isStreamingAdapter(orka.llm)) { const result = await orka.llm.streamGenerate( `Context: ${context.map(c => c.content).join('\n')}\n\nQuestion: What is OrkaJS?`, { systemPrompt: 'Answer based on the provided context.', onToken: (token) => process.stdout.write(token), } ); console.log('\nAnswer generated with', result.usage.totalTokens, 'tokens');}Streaming Tool Calls
StreamingToolAgent streams tokens in real time while executing tools in parallel. Users see the model "thinking" as tools are invoked, tool_result events are emitted mid-stream, and conversational memory is preserved automatically across requests.
import { StreamingToolAgent } from '@orka-js/agent';import { OpenAIAdapter } from '@orka-js/openai'; const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); const agent = new StreamingToolAgent({ goal: 'Answer questions using available tools', tools: [ { name: 'get_weather', description: 'Get current weather for a location', parameters: [{ name: 'location', type: 'string', description: 'City name', required: true }], execute: async ({ location }) => ({ output: `Weather in ${location}: Sunny, 22°C` }), }, ],}, llm); // Stream tokens + tool execution in real timefor await (const event of agent.runStream('What is the weather in Paris?')) { switch (event.type) { case 'token': process.stdout.write(event.token); // LLM "thinking" as tokens arrive break; case 'tool_result': console.log('\n[Tool result]:', event.result); // Tool output mid-stream break; case 'done': console.log('\n[Final answer]:', event.content); break; }} // Or use run() for a simple non-streaming resultconst result = await agent.run('What is the weather in Lyon?');console.log(result.output); // Final answerconsole.log(result.steps); // Tool execution steps with observations💡 tool_result events
Pass a Memory instance to the constructor to maintain conversation context. History is loaded before each runStream() call and saved after completion — the agent never loses context between turns.