XUMI Streaming Mode Developer Guide¶
This document describes how the streaming running mode works in the XUMI framework when executed with the following command:
Overview¶
Streaming mode enables real-time processing and response generation for XUMI models, particularly useful for text generation models where you want to see output as it's generated rather than waiting for the complete response.
How Streaming Mode Works¶
Command Execution¶
When you run:
The following occurs:
- Environment Variable Set: The runtime sets
SPI_STREAMING_ENV_VARto "true" - Server Startup: Both HTTP and WebSocket servers start automatically
- Model Import: The runtime imports and initializes your model's entrypoint script
- API Availability: RESTful and WebSocket APIs become available for streaming communication
Runtime Architecture¶
The streaming mode uses two complementary server technologies:
- HTTP Server (
src/xumi/runtime/http_server.py): aiohttp-based REST API - WebSocket Server (
src/xumi/runtime/ws_server.py): aiohttp-based WebSocket API
Both servers support:
- V1 and V2 runtime protocols
- Streaming and non-streaming modes
- Command-specific inputs (V2)
- Workflow execution (V2)
REST API Endpoints¶
V1 Runtime Endpoints¶
POST /predict or /task¶
Execute model predictions with optional streaming.
Request Body:
{
"input": {
"prompt": "Your text prompt here",
"temperature": 0.7,
"max_tokens": 1000
},
"output_config": {},
"is_stream": true,
"task_id": "optional_task_identifier"
}
Non-Streaming Response:
{
"type": "task",
"status": "completed",
"data": {
"text": "Complete generated response",
"finished": true
},
"timestamp": 1704067200.0
}
Streaming Response (NDJSON format):
{"type":"task","status":"accepted","timestamp":1704067200.0}
{"type":"task","status":"streaming","data":{"text":"Hello","chunk":"Hello","finished":false},"timestamp":1704067200.1}
{"type":"task","status":"streaming","data":{"text":"Hello world","chunk":" world","finished":false},"timestamp":1704067200.2}
{"type":"task","status":"completed","timestamp":1704067200.3}
GET /health¶
Check server health and get basic information.
Response:
V2 Runtime Endpoints¶
POST /v2/execute¶
Execute specific commands with streaming support.
Request Body:
{
"command": "predict",
"input": {
"prompt": "Your text prompt here",
"system_prompt": "You are a helpful AI assistant.",
"temperature": 0.7,
"max_tokens": 1000,
"streaming": true
},
"output_config": {},
"is_stream": true,
"task_id": "predict_123"
}
Streaming Response (NDJSON format):
{"type":"task","status":"accepted","timestamp":1704067200.0,"version":"v2","command":"predict"}
{"type":"task","status":"streaming","data":{"text":"Hello","chunk":"Hello","finished":false},"timestamp":1704067200.1,"version":"v2","command":"predict"}
{"type":"task","status":"completed","timestamp":1704067200.2,"version":"v2","command":"predict"}
GET /v2/commands¶
Get available commands for the model.
Response:
{
"commands": {
"initialize": {...},
"predict": {...},
"cleanup": {...}
},
"version": "v2",
"timestamp": 1704067200.0
}
POST /v2/workflow¶
Execute workflow with multiple commands.
Request Body:
{
"workflow_id": "text_generation_workflow",
"input": {
"prompt": "Generate a story"
},
"is_stream": true
}
WebSocket API¶
Connection URLs¶
- V1:
ws://localhost:8765/ws - V2:
ws://localhost:8765/v2/ws
Message Protocol¶
Basic Message Structure¶
Task Execution (V1)¶
{
"type": "task",
"task_id": "generate_text_001",
"input": {
"prompt": "Write a haiku about technology"
},
"is_stream": true
}
Command Execution (V2)¶
{
"type": "command",
"command": "predict",
"task_id": "predict_001",
"input": {
"prompt": "Explain quantum computing",
"max_tokens": 2000,
"streaming": true
},
"is_stream": true
}
Text Generation Model Example¶
Based on the resources/examples/text-2-text-claude/ example:
Model Implementation (run.py)¶
from xumi.runtime.runtime_v2 import XUMIRuntimeV2
spi = XUMIRuntimeV2("manifest.yml")
@spi.command("initialize")
def initialize(model_name: str = "claude-3-5-sonnet-20240620", api_key: str = None):
claude = ClaudeWrapper(model_name=model_name, api_key=api_key)
return {"model": claude}
@spi.streaming_command("predict")
async def predict(
prompt: str,
system_prompt: str = "You are a helpful AI assistant.",
max_tokens: int = 1000,
temperature: float = 0.7,
streaming: bool = True,
model: ClaudeWrapper = None,
) -> AsyncGenerator[Dict[str, Any], None]:
if streaming:
full_text = ""
async for chunk in model.predict_streaming(prompt, system_prompt, max_tokens, temperature):
full_text += chunk
yield {"text": full_text, "chunk": chunk, "finished": False}
yield {"text": full_text, "chunk": "", "finished": True}
else:
text = model.predict(prompt, system_prompt, max_tokens, temperature, streaming=False)
yield {"text": text, "chunk": text, "finished": True}
Client Usage Examples¶
HTTP Client (curl)¶
Initialize Model:
curl -X POST http://localhost:8000/v2/execute \
-H "Content-Type: application/json" \
-d '{
"command": "initialize",
"input": {"model_name": "claude-3-5-sonnet-20240620"}
}'
Streaming Text Generation:
curl -X POST http://localhost:8000/v2/execute \
-H "Content-Type: application/json" \
-d '{
"command": "predict",
"input": {
"prompt": "Write a short story about AI",
"max_tokens": 2000,
"streaming": true
},
"is_stream": true
}'
Python WebSocket Client¶
import asyncio
import aiohttp
import json
async def streaming_chat():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('ws://localhost:8765/v2/ws') as ws:
# Initialize model
await ws.send_json({
"type": "command",
"command": "initialize",
"input": {"model_name": "claude-3-5-sonnet-20240620"}
})
# Wait for initialization response
async for msg in ws:
data = json.loads(msg.data)
if data.get("status") == "completed":
break
# Send streaming prediction request
await ws.send_json({
"type": "command",
"command": "predict",
"input": {
"prompt": "Explain quantum computing simply",
"streaming": True
},
"is_stream": True
})
# Process streaming response
async for msg in ws:
data = json.loads(msg.data)
if data.get("status") == "streaming":
chunk = data.get("data", {}).get("chunk", "")
if chunk:
print(chunk, end="", flush=True)
elif data.get("status") == "completed":
print("\n--- Generation Complete ---")
break
asyncio.run(streaming_chat())
JavaScript WebSocket Client¶
const ws = new WebSocket('ws://localhost:8765/v2/ws');
ws.onopen = async function() {
// Initialize model
ws.send(JSON.stringify({
type: 'command',
command: 'initialize',
input: {model_name: 'claude-3-5-sonnet-20240620'}
}));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.command === 'initialize' && data.status === 'completed') {
// Send streaming prediction
ws.send(JSON.stringify({
type: 'command',
command: 'predict',
input: {
prompt: 'Write a creative story about the future',
streaming: true
},
is_stream: true
}));
}
if (data.status === 'streaming' && data.data) {
const chunk = data.data.chunk || '';
document.getElementById('output').textContent += chunk;
}
if (data.status === 'completed') {
console.log('Generation completed');
}
};
Default Server Configuration¶
When streaming mode starts, servers use these default settings:
- HTTP Server:
localhost:8000 - WebSocket Server:
localhost:8765 - Message Size Limit: 16MB
- Heartbeat Interval: 30 seconds (WebSocket)
Implementation Notes¶
Model Requirements¶
To support streaming, your model must:
- Use
@spi.streaming_commanddecorator for streaming commands - Return an
AsyncGenerator[Dict[str, Any], None]for streaming responses - Yield incremental results with consistent data structure
Error Handling¶
Streaming responses include error status messages:
{
"type": "task",
"status": "error",
"error": "Error message description",
"timestamp": 1704067200.0
}
Content Type Headers¶
- Non-streaming:
application/json - Streaming HTTP:
application/x-ndjson - WebSocket: JSON messages over WebSocket protocol
This streaming architecture enables real-time, interactive AI model experiences while maintaining compatibility with both simple request-response and complex workflow patterns.