Skip to content

XUMI Streaming Mode Developer Guide

This document describes how the streaming running mode works in the XUMI framework when executed with the following command:

xumi model run --streaming

Overview

Streaming mode enables real-time processing and response generation for XUMI models, particularly useful for text generation models where you want to see output as it's generated rather than waiting for the complete response.

How Streaming Mode Works

Command Execution

When you run:

xumi model run --streaming .

The following occurs:

  1. Environment Variable Set: The runtime sets SPI_STREAMING_ENV_VAR to "true"
  2. Server Startup: Both HTTP and WebSocket servers start automatically
  3. Model Import: The runtime imports and initializes your model's entrypoint script
  4. API Availability: RESTful and WebSocket APIs become available for streaming communication

Runtime Architecture

The streaming mode uses two complementary server technologies:

  • HTTP Server (src/xumi/runtime/http_server.py): aiohttp-based REST API
  • WebSocket Server (src/xumi/runtime/ws_server.py): aiohttp-based WebSocket API

Both servers support:

  • V1 and V2 runtime protocols
  • Streaming and non-streaming modes
  • Command-specific inputs (V2)
  • Workflow execution (V2)

REST API Endpoints

V1 Runtime Endpoints

POST /predict or /task

Execute model predictions with optional streaming.

Request Body:

{
  "input": {
    "prompt": "Your text prompt here",
    "temperature": 0.7,
    "max_tokens": 1000
  },
  "output_config": {},
  "is_stream": true,
  "task_id": "optional_task_identifier"
}

Non-Streaming Response:

{
  "type": "task",
  "status": "completed",
  "data": {
    "text": "Complete generated response",
    "finished": true
  },
  "timestamp": 1704067200.0
}

Streaming Response (NDJSON format):

{"type":"task","status":"accepted","timestamp":1704067200.0}
{"type":"task","status":"streaming","data":{"text":"Hello","chunk":"Hello","finished":false},"timestamp":1704067200.1}
{"type":"task","status":"streaming","data":{"text":"Hello world","chunk":" world","finished":false},"timestamp":1704067200.2}
{"type":"task","status":"completed","timestamp":1704067200.3}

GET /health

Check server health and get basic information.

Response:

{
  "status": "healthy",
  "server": "xumi-http",
  "timestamp": 1704067200.0,
  "version": "v1"
}

V2 Runtime Endpoints

POST /v2/execute

Execute specific commands with streaming support.

Request Body:

{
  "command": "predict",
  "input": {
    "prompt": "Your text prompt here",
    "system_prompt": "You are a helpful AI assistant.",
    "temperature": 0.7,
    "max_tokens": 1000,
    "streaming": true
  },
  "output_config": {},
  "is_stream": true,
  "task_id": "predict_123"
}

Streaming Response (NDJSON format):

{"type":"task","status":"accepted","timestamp":1704067200.0,"version":"v2","command":"predict"}
{"type":"task","status":"streaming","data":{"text":"Hello","chunk":"Hello","finished":false},"timestamp":1704067200.1,"version":"v2","command":"predict"}
{"type":"task","status":"completed","timestamp":1704067200.2,"version":"v2","command":"predict"}

GET /v2/commands

Get available commands for the model.

Response:

{
  "commands": {
    "initialize": {...},
    "predict": {...},
    "cleanup": {...}
  },
  "version": "v2",
  "timestamp": 1704067200.0
}

POST /v2/workflow

Execute workflow with multiple commands.

Request Body:

{
  "workflow_id": "text_generation_workflow",
  "input": {
    "prompt": "Generate a story"
  },
  "is_stream": true
}

WebSocket API

Connection URLs

  • V1: ws://localhost:8765/ws
  • V2: ws://localhost:8765/v2/ws

Message Protocol

Basic Message Structure

{
  "type": "ping|pong|task|command|workflow",
  "timestamp": 1704067200.0,
  "data": {...}
}

Task Execution (V1)

{
  "type": "task",
  "task_id": "generate_text_001",
  "input": {
    "prompt": "Write a haiku about technology"
  },
  "is_stream": true
}

Command Execution (V2)

{
  "type": "command",
  "command": "predict",
  "task_id": "predict_001",
  "input": {
    "prompt": "Explain quantum computing",
    "max_tokens": 2000,
    "streaming": true
  },
  "is_stream": true
}

Text Generation Model Example

Based on the resources/examples/text-2-text-claude/ example:

Model Implementation (run.py)

from xumi.runtime.runtime_v2 import XUMIRuntimeV2

spi = XUMIRuntimeV2("manifest.yml")

@spi.command("initialize")
def initialize(model_name: str = "claude-3-5-sonnet-20240620", api_key: str = None):
    claude = ClaudeWrapper(model_name=model_name, api_key=api_key)
    return {"model": claude}

@spi.streaming_command("predict")
async def predict(
    prompt: str,
    system_prompt: str = "You are a helpful AI assistant.",
    max_tokens: int = 1000,
    temperature: float = 0.7,
    streaming: bool = True,
    model: ClaudeWrapper = None,
) -> AsyncGenerator[Dict[str, Any], None]:
    if streaming:
        full_text = ""
        async for chunk in model.predict_streaming(prompt, system_prompt, max_tokens, temperature):
            full_text += chunk
            yield {"text": full_text, "chunk": chunk, "finished": False}
        yield {"text": full_text, "chunk": "", "finished": True}
    else:
        text = model.predict(prompt, system_prompt, max_tokens, temperature, streaming=False)
        yield {"text": text, "chunk": text, "finished": True}

Client Usage Examples

HTTP Client (curl)

Initialize Model:

curl -X POST http://localhost:8000/v2/execute \
  -H "Content-Type: application/json" \
  -d '{
    "command": "initialize",
    "input": {"model_name": "claude-3-5-sonnet-20240620"}
  }'

Streaming Text Generation:

curl -X POST http://localhost:8000/v2/execute \
  -H "Content-Type: application/json" \
  -d '{
    "command": "predict", 
    "input": {
      "prompt": "Write a short story about AI",
      "max_tokens": 2000,
      "streaming": true
    },
    "is_stream": true
  }'

Python WebSocket Client

import asyncio
import aiohttp
import json

async def streaming_chat():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('ws://localhost:8765/v2/ws') as ws:
            # Initialize model
            await ws.send_json({
                "type": "command",
                "command": "initialize",
                "input": {"model_name": "claude-3-5-sonnet-20240620"}
            })

            # Wait for initialization response
            async for msg in ws:
                data = json.loads(msg.data)
                if data.get("status") == "completed":
                    break

            # Send streaming prediction request
            await ws.send_json({
                "type": "command", 
                "command": "predict",
                "input": {
                    "prompt": "Explain quantum computing simply",
                    "streaming": True
                },
                "is_stream": True
            })

            # Process streaming response
            async for msg in ws:
                data = json.loads(msg.data)
                if data.get("status") == "streaming":
                    chunk = data.get("data", {}).get("chunk", "")
                    if chunk:
                        print(chunk, end="", flush=True)
                elif data.get("status") == "completed":
                    print("\n--- Generation Complete ---")
                    break

asyncio.run(streaming_chat())

JavaScript WebSocket Client

const ws = new WebSocket('ws://localhost:8765/v2/ws');

ws.onopen = async function() {
    // Initialize model
    ws.send(JSON.stringify({
        type: 'command',
        command: 'initialize', 
        input: {model_name: 'claude-3-5-sonnet-20240620'}
    }));
};

ws.onmessage = function(event) {
    const data = JSON.parse(event.data);

    if (data.command === 'initialize' && data.status === 'completed') {
        // Send streaming prediction
        ws.send(JSON.stringify({
            type: 'command',
            command: 'predict',
            input: {
                prompt: 'Write a creative story about the future',
                streaming: true
            },
            is_stream: true
        }));
    }

    if (data.status === 'streaming' && data.data) {
        const chunk = data.data.chunk || '';
        document.getElementById('output').textContent += chunk;
    }

    if (data.status === 'completed') {
        console.log('Generation completed');
    }
};

Default Server Configuration

When streaming mode starts, servers use these default settings:

  • HTTP Server: localhost:8000
  • WebSocket Server: localhost:8765
  • Message Size Limit: 16MB
  • Heartbeat Interval: 30 seconds (WebSocket)

Implementation Notes

Model Requirements

To support streaming, your model must:

  1. Use @spi.streaming_command decorator for streaming commands
  2. Return an AsyncGenerator[Dict[str, Any], None] for streaming responses
  3. Yield incremental results with consistent data structure

Error Handling

Streaming responses include error status messages:

{
  "type": "task",
  "status": "error", 
  "error": "Error message description",
  "timestamp": 1704067200.0
}

Content Type Headers

  • Non-streaming: application/json
  • Streaming HTTP: application/x-ndjson
  • WebSocket: JSON messages over WebSocket protocol

This streaming architecture enables real-time, interactive AI model experiences while maintaining compatibility with both simple request-response and complex workflow patterns.