Testing¶
Test your Spikard applications with built-in test clients for all languages. Each test client starts a real server for reliable HTTP, WebSocket, and SSE testing.
Quick Start¶
import pytest
from spikard import Spikard, get
from spikard.testing import TestClient
app = Spikard()
@get("/hello")
async def hello():
return {"message": "Hello, World!"}
@pytest.mark.asyncio
async def test_hello():
async with TestClient(app) as client:
response = await client.get("/hello")
assert response.status_code == 200
assert response.json() == {"message": "Hello, World!"}
import { describe, it, expect } from "vitest";
import { Spikard, TestClient } from "@spikard/app";
const app = new Spikard();
app.get("/hello", async () => ({ message: "Hello, World!" }));
describe("Hello endpoint", () => {
it("returns greeting", async () => {
const client = new TestClient(app);
const response = await client.get("/hello");
expect(response.statusCode).toBe(200);
expect(response.json()).toEqual({ message: "Hello, World!" });
});
});
require 'spikard'
require 'spec_helper'
RSpec.describe "Hello endpoint" do
let(:app) do
Spikard::App.new.tap do |a|
a.get('/hello') { { message: 'Hello, World!' } }
end
end
it "returns greeting" do
client = Spikard::Testing::TestClient.new(app)
response = client.get('/hello')
expect(response.status).to eq(200)
expect(response.json).to eq({ 'message' => 'Hello, World!' })
end
end
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
final class HelloTest extends TestCase
{
public function testHelloEndpoint(): void
{
$app = new App();
$app->get('/hello', fn() => ['message' => 'Hello, World!']);
$client = new TestClient($app);
$response = $client->get('/hello');
$this->assertEquals(200, $response->getStatusCode());
$this->assertEquals(
['message' => 'Hello, World!'],
json_decode($response->getBody(), true)
);
}
}
use axum::{routing::get, Router};
use axum_test::TestServer;
use serde_json::json;
async fn hello() -> axum::Json<serde_json::Value> {
axum::Json(json!({"message": "Hello, World!"}))
}
#[tokio::test]
async fn test_hello() {
let app = Router::new().route("/hello", get(hello));
let server = TestServer::new(app).unwrap();
let response = server.get("/hello").await;
assert_eq!(response.status_code(), 200);
assert_eq!(
response.json::<serde_json::Value>(),
json!({"message": "Hello, World!"})
);
}
Unit Testing Handlers¶
Test individual handlers with different inputs:
@pytest.mark.asyncio
async def test_user_creation():
app = Spikard()
@app.post("/users")
async def create_user(name: str, email: str):
return {"id": 1, "name": name, "email": email}
async with TestClient(app) as client:
response = await client.post(
"/users",
json={"name": "Alice", "email": "alice@example.com"}
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Alice"
assert data["email"] == "alice@example.com"
it("creates user", async () => {
const app = new Spikard();
app.post("/users", async (req) => ({
id: 1,
name: req.body.name,
email: req.body.email
}));
const client = new TestClient(app);
const response = await client.post("/users", {
json: { name: "Alice", email: "alice@example.com" }
});
expect(response.statusCode).toBe(200);
const data = response.json();
expect(data.name).toBe("Alice");
});
it "creates user" do
app = Spikard::App.new
app.post('/users') do |params, _query, body|
{ id: 1, name: body['name'], email: body['email'] }
end
client = Spikard::Testing::TestClient.new(app)
response = client.post('/users', json: { name: 'Alice', email: 'alice@example.com' })
expect(response.status).to eq(200)
data = response.json
expect(data['name']).to eq('Alice')
end
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
final class UserCreationTest extends TestCase
{
public function testCreatesUser(): void
{
$app = new App();
$app->post('/users', function ($request) {
return [
'id' => 1,
'name' => $request->body['name'],
'email' => $request->body['email'],
];
});
$client = new TestClient($app);
$response = $client->post('/users', [
'name' => 'Alice',
'email' => 'alice@example.com',
]);
$this->assertEquals(200, $response->getStatusCode());
$data = json_decode($response->getBody(), true);
$this->assertEquals('Alice', $data['name']);
$this->assertEquals('alice@example.com', $data['email']);
}
}
use axum::{routing::post, Json, Router};
use axum_test::TestServer;
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Serialize, Deserialize)]
struct CreateUser {
name: String,
email: String,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct User {
id: i64,
name: String,
email: String,
}
async fn create_user(Json(input): Json<CreateUser>) -> Json<User> {
Json(User {
id: 1,
name: input.name,
email: input.email,
})
}
#[tokio::test]
async fn test_user_creation() {
let app = Router::new().route("/users", post(create_user));
let server = TestServer::new(app).unwrap();
let response = server
.post("/users")
.json(&json!({
"name": "Alice",
"email": "alice@example.com"
}))
.await;
assert_eq!(response.status_code(), 200);
let user: User = response.json();
assert_eq!(user.name, "Alice");
assert_eq!(user.email, "alice@example.com");
}
Testing Validation¶
Test request and response validation with invalid inputs:
from msgspec import Struct
class UserCreate(Struct):
name: str
age: int
@pytest.mark.asyncio
async def test_validation_failure():
app = Spikard()
@app.post("/users")
async def create_user(user: UserCreate):
return {"name": user.name, "age": user.age}
async with TestClient(app) as client:
# Invalid: age is string
response = await client.post(
"/users",
json={"name": "Bob", "age": "not a number"}
)
assert response.status_code == 400
error = response.json()
assert "validation" in str(error).lower()
it("rejects invalid input", async () => {
const app = new Spikard();
app.post("/users", async (req) => {
if (!req.body.name || typeof req.body.age !== "number") {
return { status: 400, body: { error: "Validation failed" } };
}
return { name: req.body.name, age: req.body.age };
});
const client = new TestClient(app);
const response = await client.post("/users", {
json: { name: "Bob", age: "invalid" }
});
expect(response.statusCode).toBe(400);
expect(response.json().error).toContain("Validation");
});
it "rejects invalid input" do
app = Spikard::App.new
app.post('/users') do |params, _query, body|
name = body['name']
age = body['age']
raise ArgumentError, 'Invalid age' unless age.is_a?(Integer)
{ name: name, age: age }
end
client = Spikard::Testing::TestClient.new(app)
response = client.post('/users', json: { name: 'Bob', age: 'invalid' })
expect(response.status).to eq(400)
end
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
final class ValidationTest extends TestCase
{
public function testRejectsInvalidInput(): void
{
$app = new App();
$app->post('/users', function ($request) {
if (!isset($request->body['name']) || !is_int($request->body['age'])) {
return ['status' => 400, 'body' => ['error' => 'Validation failed']];
}
return ['name' => $request->body['name'], 'age' => $request->body['age']];
});
$client = new TestClient($app);
$response = $client->post('/users', [
'name' => 'Bob',
'age' => 'invalid',
]);
$this->assertEquals(400, $response->getStatusCode());
$body = json_decode($response->getBody(), true);
$this->assertStringContainsString('Validation', $body['error']);
}
}
use axum::{
http::StatusCode,
routing::post,
Json, Router,
};
use axum_test::TestServer;
use serde::{Deserialize, Serialize};
use serde_json::json;
use validator::Validate;
#[derive(Serialize, Deserialize, Validate)]
struct UserCreate {
#[validate(length(min = 1))]
name: String,
#[validate(range(min = 0, max = 150))]
age: i32,
}
async fn create_user(
Json(input): Json<UserCreate>,
) -> Result<Json<UserCreate>, (StatusCode, String)> {
input
.validate()
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Validation error: {}", e)))?;
Ok(Json(input))
}
#[tokio::test]
async fn test_validation_failure() {
let app = Router::new().route("/users", post(create_user));
let server = TestServer::new(app).unwrap();
// Invalid: age is string (will fail JSON parsing)
let response = server
.post("/users")
.json(&json!({
"name": "Bob",
"age": "not a number"
}))
.await;
assert_eq!(response.status_code(), 422);
// Invalid: age out of range
let response = server
.post("/users")
.json(&json!({
"name": "Bob",
"age": 200
}))
.await;
assert_eq!(response.status_code(), 400);
let body = response.text();
assert!(body.to_lowercase().contains("validation"));
}
Testing Middleware¶
Test middleware behavior and execution order:
@pytest.mark.asyncio
async def test_auth_middleware():
app = Spikard()
@app.pre_handler
async def check_auth(request: dict) -> dict | tuple:
token = request.get("headers", {}).get("authorization", "")
if not token.startswith("Bearer "):
return {"error": "Unauthorized"}, 401
return request
@app.get("/protected")
async def protected():
return {"data": "secret"}
async with TestClient(app) as client:
# Without auth
response = await client.get("/protected")
assert response.status_code == 401
# With auth
response = await client.get(
"/protected",
headers={"authorization": "Bearer token123"}
)
assert response.status_code == 200
it("enforces auth middleware", async () => {
const app = new Spikard();
app.pre("/protected/*", async (req) => {
if (!req.headers.authorization?.startsWith("Bearer ")) {
return { status: 401, body: { error: "Unauthorized" } };
}
return req;
});
app.get("/protected/data", async () => ({ data: "secret" }));
const client = new TestClient(app);
// Without auth
let response = await client.get("/protected/data");
expect(response.statusCode).toBe(401);
// With auth
response = await client.get("/protected/data", {
authorization: "Bearer token123"
});
expect(response.statusCode).toBe(200);
});
it "enforces auth middleware" do
app = Spikard::App.new
app.pre_handler do |req|
token = req.headers['authorization']
unless token&.start_with?('Bearer ')
return [{ error: 'Unauthorized' }, 401]
end
req
end
app.get('/protected') { { data: 'secret' } }
client = Spikard::Testing::TestClient.new(app)
# Without auth
response = client.get('/protected')
expect(response.status).to eq(401)
# With auth
response = client.get('/protected', headers: { 'authorization' => 'Bearer token123' })
expect(response.status).to eq(200)
end
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
use Spikard\Config\LifecycleHooks;
use Spikard\Config\HookResult;
final class AuthMiddlewareTest extends TestCase
{
public function testEnforcesAuthMiddleware(): void
{
$hooks = LifecycleHooks::builder()
->withOnRequest(function ($request): HookResult {
$token = $request->headers['authorization'] ?? '';
if (!str_starts_with($token, 'Bearer ')) {
return HookResult::respond(['error' => 'Unauthorized'], 401);
}
return HookResult::continue();
})
->build();
$app = new App();
$app->withLifecycleHooks($hooks);
$app->get('/protected', fn() => ['data' => 'secret']);
$client = new TestClient($app);
// Without auth
$response = $client->get('/protected');
$this->assertEquals(401, $response->getStatusCode());
// With auth
$response = $client->get('/protected', [
'headers' => ['authorization' => 'Bearer token123'],
]);
$this->assertEquals(200, $response->getStatusCode());
}
}
use axum::{
body::Body,
http::{Request, StatusCode},
middleware::{self, Next},
response::Response,
routing::get,
Router,
};
use axum_test::TestServer;
use serde_json::json;
async fn auth_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
let auth_header = request
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok());
match auth_header {
Some(token) if token.starts_with("Bearer ") => Ok(next.run(request).await),
_ => Err(StatusCode::UNAUTHORIZED),
}
}
async fn protected() -> axum::Json<serde_json::Value> {
axum::Json(json!({"data": "secret"}))
}
#[tokio::test]
async fn test_auth_middleware() {
let app = Router::new()
.route("/protected", get(protected))
.layer(middleware::from_fn(auth_middleware));
let server = TestServer::new(app).unwrap();
// Without auth - should fail
let response = server.get("/protected").await;
assert_eq!(response.status_code(), 401);
// With auth - should succeed
let response = server
.get("/protected")
.add_header("authorization", "Bearer token123")
.await;
assert_eq!(response.status_code(), 200);
assert_eq!(
response.json::<serde_json::Value>(),
json!({"data": "secret"})
);
}
Testing WebSocket¶
Test WebSocket connections and message exchange:
@pytest.mark.asyncio
async def test_websocket_echo():
app = Spikard()
@app.websocket("/echo")
async def echo(message):
return message
async with TestClient(app) as client:
async with client.websocket("/echo") as ws:
await ws.send("Hello")
response = await ws.recv()
assert response == "Hello"
# Send JSON
import json
await ws.send(json.dumps({"type": "ping"}))
data = await ws.recv()
assert json.loads(data) == {"type": "ping"}
it("echoes websocket messages", async () => {
const app = new Spikard();
app.websocket("/echo", async (message) => message);
const client = new TestClient(app);
const ws = await client.websocketConnect("/echo");
await ws.send_json({ hello: "world" });
const response = await ws.receive_json();
expect(response).toEqual({ hello: "world" });
});
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
final class WebSocketTest extends TestCase
{
public function testEchoesWebSocketMessages(): void
{
$app = new App();
$app->websocket('/echo', fn(string $message) => $message);
$client = new TestClient($app);
$ws = $client->websocketConnect('/echo');
$ws->sendJson(['hello' => 'world']);
$response = $ws->receiveJson();
$this->assertEquals(['hello' => 'world'], $response);
}
}
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::Response,
routing::get,
Router,
};
use axum_test::TestServer;
use futures::{SinkExt, StreamExt};
async fn echo_handler(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
while let Some(Ok(msg)) = socket.next().await {
if let Message::Text(text) = msg {
if socket.send(Message::Text(text)).await.is_err() {
break;
}
}
}
}
#[tokio::test]
async fn test_websocket_echo() {
let app = Router::new().route("/echo", get(echo_handler));
let server = TestServer::new(app).expect("failed to create test server");
let mut ws = server.into_websocket("/echo").await;
// Send text message
ws.send(Message::Text("Hello".into())).await.expect("failed to send message");
let response = ws.next().await.expect("failed to receive message").expect("received empty message");
assert_eq!(response, Message::Text("Hello".into()));
// Send JSON message
let json_msg = serde_json::json!({"type": "ping"}).to_string();
ws.send(Message::Text(json_msg.clone())).await.expect("failed to send message");
let response = ws.next().await.expect("failed to receive message").expect("received empty message");
assert_eq!(response, Message::Text(json_msg));
}
Testing Server-Sent Events (SSE)¶
Test SSE streams:
@pytest.mark.asyncio
async def test_sse_stream():
from spikard.streaming import SseEvent
app = Spikard()
@app.sse("/notifications")
async def notifications():
for i in range(3):
yield SseEvent(data={"count": i})
async with TestClient(app) as client:
async with client.sse("/notifications") as event_stream:
events = []
async for event in event_stream:
import json
data = json.loads(event.data)
events.append(data)
if len(events) >= 3:
break
assert len(events) == 3
assert events[0] == {"count": 0}
assert events[2] == {"count": 2}
it("streams SSE events", async () => {
const app = new Spikard();
app.sse("/events", async function* () {
for (let i = 0; i < 3; i++) {
yield { event: "message", data: { count: i } };
}
});
const client = new TestClient(app);
const response = await client.get("/events");
// For testing SSE, you might collect the stream
expect(response.statusCode).toBe(200);
});
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
final class SseTest extends TestCase
{
public function testStreamsSseEvents(): void
{
$app = new App();
$app->sse('/events', function () {
for ($i = 0; $i < 3; $i++) {
yield ['event' => 'message', 'data' => ['count' => $i]];
}
});
$client = new TestClient($app);
$response = $client->get('/events');
// SSE responses return status 200
$this->assertEquals(200, $response->getStatusCode());
}
}
use axum::{
response::sse::{Event, Sse},
routing::get,
Router,
};
use axum_test::TestServer;
use futures::stream::{self, Stream};
use serde_json::json;
use std::convert::Infallible;
fn notifications() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::iter(0..3).map(|i| {
Ok(Event::default().data(json!({"count": i}).to_string()))
});
Sse::new(stream)
}
#[tokio::test]
async fn test_sse_stream() {
let app = Router::new().route("/notifications", get(notifications));
let server = TestServer::new(app).unwrap();
let response = server.get("/notifications").await;
assert_eq!(response.status_code(), 200);
// Verify content type
let content_type = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
assert!(content_type.contains("text/event-stream"));
// Parse SSE events from response body
let body = response.text();
let events: Vec<&str> = body
.split("\n\n")
.filter(|s| !s.is_empty())
.collect();
assert_eq!(events.len(), 3);
assert!(events[0].contains(r#"{"count":0}"#));
assert!(events[2].contains(r#"{"count":2}"#));
}
Integration Testing¶
Test multiple endpoints together:
@pytest.mark.asyncio
async def test_user_workflow():
"""Test complete user creation and retrieval workflow."""
app = Spikard()
users_db = {}
@app.post("/users")
async def create_user(name: str):
user_id = len(users_db) + 1
users_db[user_id] = {"id": user_id, "name": name}
return users_db[user_id]
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return users_db.get(user_id, {"error": "Not found"})
async with TestClient(app) as client:
# Create user
create_response = await client.post(
"/users",
json={"name": "Alice"}
)
assert create_response.status_code == 200
user = create_response.json()
assert user["name"] == "Alice"
# Retrieve user
get_response = await client.get(f"/users/{user['id']}")
assert get_response.status_code == 200
retrieved = get_response.json()
assert retrieved == user
it("completes user workflow", async () => {
const usersDb = new Map();
const app = new Spikard();
app.post("/users", async (req) => {
const id = usersDb.size + 1;
const user = { id, name: req.body.name };
usersDb.set(id, user);
return user;
});
app.get("/users/:id", async (req) => {
const user = usersDb.get(Number(req.params.id));
return user || { status: 404, body: { error: "Not found" } };
});
const client = new TestClient(app);
// Create user
const createRes = await client.post("/users", {
json: { name: "Alice" }
});
const user = createRes.json();
expect(user.name).toBe("Alice");
// Retrieve user
const getRes = await client.get(`/users/${user.id}`);
expect(getRes.json()).toEqual(user);
});
it "completes user workflow" do
users_db = {}
app = Spikard::App.new
app.post('/users') do |params, _query, body|
id = users_db.size + 1
user = { id: id, name: body['name'] }
users_db[id] = user
user
end
app.get('/users/:id') do |params, _query, _body|
users_db[params['id'].to_i] || { error: 'Not found' }
end
client = Spikard::Testing::TestClient.new(app)
# Create user
create_res = client.post('/users', json: { name: 'Alice' })
user = create_res.json
expect(user['name']).to eq('Alice')
# Retrieve user
get_res = client.get("/users/#{user['id']}")
expect(get_res.json).to eq(user)
end
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\App;
use Spikard\Testing\TestClient;
final class UserWorkflowTest extends TestCase
{
public function testCompletesUserWorkflow(): void
{
$usersDb = [];
$app = new App();
$app->post('/users', function ($request) use (&$usersDb) {
$id = count($usersDb) + 1;
$user = ['id' => $id, 'name' => $request->body['name']];
$usersDb[$id] = $user;
return $user;
});
$app->get('/users/{id}', function ($request) use (&$usersDb) {
$id = (int) $request->params['id'];
return $usersDb[$id] ?? ['error' => 'Not found'];
});
$client = new TestClient($app);
// Create user
$createRes = $client->post('/users', ['name' => 'Alice']);
$user = json_decode($createRes->getBody(), true);
$this->assertEquals('Alice', $user['name']);
// Retrieve user
$getRes = $client->get('/users/' . $user['id']);
$this->assertEquals($user, json_decode($getRes->getBody(), true));
}
}
use axum::{
extract::Path,
routing::{get, post},
Json, Router,
};
use axum_test::TestServer;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
struct User {
id: i64,
name: String,
}
type UsersDb = Arc<Mutex<HashMap<i64, User>>>;
/// Test complete user creation and retrieval workflow.
#[tokio::test]
async fn test_user_workflow() {
let users_db: UsersDb = Arc::new(Mutex::new(HashMap::new()));
let db_clone = users_db.clone();
let app = Router::new()
.route(
"/users",
post({
let db = users_db.clone();
move |Json(payload): Json<serde_json::Value>| {
let db = db.clone();
async move {
let mut users = db.lock().expect("lock poisoned");
let id = (users.len() + 1) as i64;
let user = User {
id,
name: payload["name"].as_str().unwrap_or("").to_string(),
};
users.insert(id, user.clone());
Json(user)
}
}
}),
)
.route(
"/users/:id",
get({
let db = db_clone;
move |Path(user_id): Path<i64>| {
let db = db.clone();
async move {
let users = db.lock().expect("lock poisoned");
match users.get(&user_id) {
Some(user) => Json(json!(user)),
None => Json(json!({"error": "Not found"})),
}
}
}
}),
);
let server = TestServer::new(app).expect("failed to create test server");
// Create user
let create_response = server
.post("/users")
.json(&json!({"name": "Alice"}))
.await;
assert_eq!(create_response.status_code(), 200);
let user: User = create_response.json();
assert_eq!(user.name, "Alice");
// Retrieve user
let get_response = server.get(&format!("/users/{}", user.id)).await;
assert_eq!(get_response.status_code(), 200);
let retrieved: User = get_response.json();
assert_eq!(retrieved, user);
}
Best Practices¶
- Use context managers (Python) or cleanup (TypeScript/Ruby) to ensure servers stop
- Test error cases - don't just test happy paths
- Test validation - ensure invalid inputs are rejected
- Test middleware - verify auth, logging, etc. work correctly
- Use fixtures - reuse common test setups
- Keep tests fast - TestClient starts real servers, but they're fast
- Test streaming - verify WebSocket and SSE endpoints work correctly
Verify It Works¶
Run your tests:
Next Steps¶
- See the Validation Guide for testing validation schemas
- See the Middleware Guide for testing middleware chains
- See the Streaming Guide for advanced SSE/WebSocket testing