Skip to content

Streaming & Real-Time

Serve streaming responses (chunked, NDJSON), SSE, or WebSockets with production-ready patterns.

Streaming response

from spikard import SseEvent, sse

@sse("/events")
async def events():
    for i in range(3):
        yield SseEvent(data={"tick": i})
import { Spikard, StreamingResponse } from "spikard";

const app = new Spikard();

async function* makeStream() {
  for (let i = 0; i < 3; i++) {
    yield JSON.stringify({ tick: i }) + "\n";
    await new Promise((resolve) => setTimeout(resolve, 100));
  }
}

app.addRoute(
  { method: "GET", path: "/stream", handler_name: "stream", is_async: true },
  async () =>
    new StreamingResponse(makeStream(), {
      statusCode: 200,
      headers: { "Content-Type": "application/x-ndjson" },
    }),
);
require "spikard"
require "json"

app = Spikard::App.new

app.get "/stream" do |_params, _query, _body|
  Enumerator.new do |y|
    3.times do |i|
      y << JSON.dump({ tick: i }) + "\n"
      sleep 0.1
    end
  end
end
<?php

declare(strict_types=1);

use Spikard\App;
use Spikard\Attributes\Get;
use Spikard\Config\ServerConfig;
use Spikard\Http\StreamingResponse;

$app = new App(new ServerConfig(port: 8000));

final class StreamController
{
    #[Get('/stream')]
    public function stream(): StreamingResponse
    {
        $generator = function (): Generator {
            for ($i = 0; $i < 10; $i++) {
                yield json_encode(['chunk' => $i]) . "\n";
                usleep(100000); // 100ms delay
            }
        };

        return new StreamingResponse(
            $generator(),
            headers: ['Content-Type' => 'application/x-ndjson']
        );
    }
}

$app = $app->registerController(new StreamController());
use spikard::prelude::*;
use tokio_stream::StreamExt;

app.route(get("/stream"), |_ctx: Context| async move {
    let stream = tokio_stream::iter(0..3).then(|i| async move {
        serde_json::to_vec(&serde_json::json!({ "tick": i }))
    });
    Ok(StreamingBody::new(stream))
})?;

Production patterns

Client disconnect handling

Detect and handle client disconnections to avoid resource leaks:

import asyncio
from spikard import Spikard, SseEvent, sse

app = Spikard()

@sse("/events")
async def events(request):
    connected_clients = 0
    try:
        connected_clients += 1
        app.logger.info(f"Client connected. Total: {connected_clients}")

        while True:
            # Check if client disconnected
            if await request.is_disconnected():
                app.logger.info("Client disconnected gracefully")
                break

            # Generate and send event
            yield SseEvent(data={"timestamp": time.time()})
            await asyncio.sleep(1)

    except asyncio.CancelledError:
        app.logger.info("Client connection cancelled")
        raise
    finally:
        connected_clients -= 1
        app.logger.info(f"Cleanup complete. Remaining: {connected_clients}")

Backpressure handling

Manage slow consumers to prevent memory buildup:

import asyncio
from collections import deque
from spikard import Spikard, SseEvent, sse

app = Spikard()

# Shared event queue with max size
event_queue = deque(maxlen=100)

async def event_producer():
    """Background task producing events"""
    counter = 0
    while True:
        event_queue.append({"id": counter, "data": "event data"})
        counter += 1
        await asyncio.sleep(0.1)

@sse("/events")
async def events(request):
    last_id = 0

    try:
        while True:
            if await request.is_disconnected():
                break

            # Only send if queue has new events
            if len(event_queue) > 0:
                event = event_queue[-1]
                if event["id"] > last_id:
                    yield SseEvent(data=event["data"], id=str(event["id"]))
                    last_id = event["id"]

            await asyncio.sleep(0.5)  # Throttle to prevent overwhelming client

    except asyncio.CancelledError:
        pass

Error handling and recovery

Handle errors gracefully with retry logic:

import asyncio
from spikard import Spikard, SseEvent, sse

app = Spikard()

@sse("/events")
async def events(request):
    retry_count = 0
    max_retries = 3

    try:
        while True:
            if await request.is_disconnected():
                break

            try:
                # Simulate fetching data that might fail
                data = await fetch_data()
                retry_count = 0  # Reset on success

                yield SseEvent(
                    data=data,
                    retry=1000,  # Client should retry after 1s
                    event="update"
                )

            except Exception as e:
                retry_count += 1
                app.logger.error(f"Error fetching data: {e}")

                if retry_count >= max_retries:
                    yield SseEvent(
                        data={"error": "Service temporarily unavailable"},
                        event="error"
                    )
                    break

                await asyncio.sleep(2 ** retry_count)  # Exponential backoff

            await asyncio.sleep(1)

    except asyncio.CancelledError:
        app.logger.info("Stream cancelled")
    finally:
        app.logger.info("Stream cleanup complete")

WebSockets

from spikard import Spikard, websocket

app = Spikard()

@websocket("/ws")
async def echo(message: dict) -> dict | None:
    return {"echo": message}
import { Spikard } from "spikard";

const app = new Spikard();

app.addRoute(
  { method: "WS", path: "/ws", handler_name: "ws", is_async: true },
  async (socket) => {
    for await (const message of socket) {
      await socket.send({ echo: message });
    }
  },
);
require "spikard"

app = Spikard::App.new

class ChatHandler < Spikard::WebSocketHandler
  def handle_message(message)
    # Echo JSON message back
    message
  end
end

app.websocket("/chat") { ChatHandler.new }
<?php

declare(strict_types=1);

use Spikard\App;
use Spikard\Config\ServerConfig;
use Spikard\Handlers\WebSocketHandlerInterface;

final class ChatHandler implements WebSocketHandlerInterface
{
    public function onConnect(): void
    {
        error_log('Client connected');
    }

    public function onMessage(string $message): void
    {
        $data = json_decode($message, true);
        error_log('Received: ' . json_encode($data));
    }

    public function onClose(int $code, ?string $reason = null): void
    {
        error_log("Client disconnected: {$code}" . ($reason ? " ({$reason})" : ''));
    }
}

$app = (new App(new ServerConfig(port: 8000)))
    ->addWebSocket('/ws', new ChatHandler());

$app->run();
use spikard::prelude::*;
use futures::StreamExt;

app.websocket("/ws", |mut socket| async move {
    while let Some(msg) = socket.next().await {
        let text = msg.unwrap_or_default();
        socket.send(text).await.ok();
    }
});

WebSocket connection lifecycle

Production WebSocket handler with complete lifecycle management:

from spikard import Spikard, websocket

app = Spikard()

@websocket("/ws")
async def chat(message: dict) -> dict | None:
    # Simple protocol: respond to pings, otherwise echo.
    if message.get("type") == "ping":
        return {"type": "pong"}
    return {"type": "echo", "data": message}


def _on_connect() -> None:
    app.logger.info("WebSocket client connected")


def _on_disconnect() -> None:
    app.logger.info("WebSocket client disconnected")


# Optional lifecycle hooks.
chat.on_connect = _on_connect
chat.on_disconnect = _on_disconnect

Server-Sent Events (SSE)

from spikard import Spikard, SseEvent, sse

app = Spikard()

@sse("/events")
async def events():
    for i in range(3):
        yield SseEvent(data={"tick": i})
import { Spikard, StreamingResponse } from "spikard";

const app = new Spikard();

async function* sseStream() {
  for (let i = 0; i < 3; i++) {
    yield `data: ${JSON.stringify({ tick: i })}\n\n`;
  }
}

app.addRoute(
  { method: "GET", path: "/events", handler_name: "events", is_async: true },
  async () =>
    new StreamingResponse(sseStream(), {
      statusCode: 200,
      headers: { "Content-Type": "text/event-stream" },
    }),
);
require "spikard"
require "json"

app = Spikard::App.new

app.get "/events" do |_params, _query, _body|
  Enumerator.new do |y|
    3.times do |i|
      y << "data: #{JSON.dump({ tick: i })}\n\n"
      sleep 0.1
    end
  end
end
<?php

declare(strict_types=1);

use Spikard\App;
use Spikard\Attributes\Get;
use Spikard\Config\ServerConfig;
use Spikard\Http\StreamingResponse;

final class EventsController
{
    #[Get('/events')]
    public function events(): StreamingResponse
    {
        $generator = function (): Generator {
            for ($i = 0; $i < 5; $i++) {
                $data = json_encode(['tick' => $i, 'time' => time()]);
                yield "data: {$data}\n\n";
                sleep(1);
            }
            yield "data: " . json_encode(['message' => 'done']) . "\n\n";
        };

        return StreamingResponse::sse($generator());
    }
}

$app = (new App(new ServerConfig(port: 8000)))
    ->registerController(new EventsController());
use spikard::prelude::*;
use tokio_stream::StreamExt;

app.route(get("/events"), |_ctx: Context| async move {
    let stream = tokio_stream::iter(0..3).map(|i| {
        format!("data: {}\n\n", serde_json::json!({"tick": i}))
    });
    Ok(StreamingBody::new(stream).with_header("content-type", "text/event-stream"))
})?;

Production SSE with keepalive

SSE handler with keepalive, reconnection logic, and error handling:

import asyncio
import time
from spikard import Spikard, SseEvent, sse

app = Spikard()

@sse("/events")
async def events(request):
    client_id = request.headers.get("X-Client-ID", "unknown")
    last_event_id = request.headers.get("Last-Event-ID", "0")

    app.logger.info(f"Client {client_id} connected, last_event_id: {last_event_id}")

    try:
        event_id = int(last_event_id)
    except ValueError:
        event_id = 0

    keepalive_interval = 15  # Send keepalive every 15s
    last_keepalive = time.time()

    try:
        while True:
            # Check for client disconnect
            if await request.is_disconnected():
                app.logger.info(f"Client {client_id} disconnected")
                break

            current_time = time.time()

            # Send keepalive comment to prevent timeout
            if current_time - last_keepalive > keepalive_interval:
                yield SseEvent(comment="keepalive")
                last_keepalive = current_time

            # Fetch and send data
            try:
                data = await fetch_event_data()
                event_id += 1

                yield SseEvent(
                    data=data,
                    id=str(event_id),
                    event="update",
                    retry=3000  # Client should retry after 3s on disconnect
                )

                last_keepalive = current_time

            except Exception as e:
                app.logger.error(f"Error fetching data: {e}")
                yield SseEvent(
                    data={"error": "Temporary error, retrying..."},
                    event="error"
                )

            await asyncio.sleep(1)

    except asyncio.CancelledError:
        app.logger.info(f"Client {client_id} stream cancelled")
        raise
    finally:
        app.logger.info(f"Client {client_id} cleanup complete")

async def fetch_event_data():
    """Simulate fetching data from a source"""
    return {"timestamp": time.time(), "value": "data"}

Testing streaming handlers

Testing SSE endpoints

import pytest
from spikard.testing import TestClient

@pytest.mark.asyncio
async def test_sse_stream():
    async with TestClient(app) as client:
        # Connect to SSE endpoint
        async with client.stream("GET", "/events") as response:
            assert response.status_code == 200
            assert response.headers["content-type"] == "text/event-stream"

            # Read first few events
            events = []
            async for line in response.aiter_lines():
                if line.startswith("data:"):
                    events.append(line)
                if len(events) >= 3:
                    break

            assert len(events) == 3

@pytest.mark.asyncio
async def test_sse_reconnection():
    """Test SSE reconnection with Last-Event-ID"""
    async with TestClient(app) as client:
        headers = {"Last-Event-ID": "5"}
        async with client.stream("GET", "/events", headers=headers) as response:
            # Should resume from event ID 5
            assert response.status_code == 200

Testing WebSocket handlers

import pytest
from spikard.testing import TestClient

@pytest.mark.asyncio
async def test_websocket_echo():
    async with TestClient(app) as client:
        async with client.websocket_connect("/ws") as websocket:
            # Test echo
            await websocket.send_json({"type": "echo", "message": "hello"})
            response = await websocket.receive_json()
            assert response["type"] == "echo"

            # Test ping/pong
            await websocket.send_json({"type": "ping"})
            response = await websocket.receive_json()
            assert response["type"] == "pong"

@pytest.mark.asyncio
async def test_websocket_disconnect():
    """Test proper cleanup on disconnect"""
    async with TestClient(app) as client:
        async with client.websocket_connect("/ws") as websocket:
            # Get initial connection count
            await websocket.send_json({"type": "ping"})
            await websocket.receive_json()

        # Connection should be cleaned up after context exit
        # Verify by checking active_connections count in logs

Testing streaming responses

import pytest
from spikard.testing import TestClient

@pytest.mark.asyncio
async def test_streaming_response():
    async with TestClient(app) as client:
        async with client.stream("GET", "/stream") as response:
            assert response.status_code == 200

            chunks = []
            async for chunk in response.aiter_bytes():
                chunks.append(chunk)
                if len(chunks) >= 5:
                    break

            assert len(chunks) == 5
            assert all(len(chunk) > 0 for chunk in chunks)

Best practices

  • Set appropriate content types (application/x-ndjson, text/event-stream)
  • Handle client disconnects gracefully; stop producing when the connection closes
  • Implement keepalive for long-lived connections to prevent proxy timeouts
  • Use exponential backoff for retry logic
  • Track active connections and clean up resources in finally blocks
  • Add authentication middleware before handler execution
  • Test disconnect scenarios and ensure proper cleanup
  • Monitor memory usage with slow consumers and implement backpressure