Python Binding¶
Spikard's Python binding uses PyO3 with msgspec-first validation. Decorators feel like FastAPI/Litestar while the Rust core handles routing, middleware, and streaming.
Quickstart¶
from spikard import Spikard
from spikard.config import ServerConfig
from msgspec import Struct
class User(Struct):
id: int
name: str
app = Spikard()
@app.get("/users/{id:int}")
async def get_user(id: int) -> User:
return User(id=id, name="Alice")
if __name__ == "__main__":
app.run(config=ServerConfig(port=8000))
Router¶
Use Router for modular route organization:
from spikard.routing import Router
users = Router(prefix="/users")
@users.get("/{user_id}")
async def get_user(user_id: int) -> dict:
return {"id": user_id}
app.include_router(users)
Validation¶
- msgspec (default): fastest; use
Structtypes for request/response validation. - Pydantic v2 / dataclasses / TypedDict / attrs: auto-detected when used as handler params.
from spikard import Body
from msgspec import Struct
class Payment(Struct):
id: str
amount: float
@app.post("/payments")
async def create_payment(payment: Body[Payment]) -> Payment:
return payment
Dependency Injection¶
Type-based injection (recommended):
from spikard.di import Provide
class DatabasePool:
pass
app.provide(DatabasePool, Provide(create_pool, singleton=True))
@app.get("/data")
async def get_data(pool: DatabasePool) -> dict:
return {"status": "ok"}
Lifecycle hooks¶
@app.on_request
async def logging_hook(request: dict[str, object]):
print(f"{request['method']} {request['path']}")
return request
Async Server¶
import asyncio
from spikard.config import ServerConfig
async def main():
await app.serve(config=ServerConfig(host="0.0.0.0", port=8080))
asyncio.run(main())
Testing¶
from spikard.testing import TestClient
async def test_endpoint():
async with TestClient(app) as client:
response = await client.get("/data")
assert response.status_code == 200
Requests & Responses¶
Request Parameters¶
Use parameter decorators to extract and validate request data:
Response Types¶
Return typed responses or use Response/StreamingResponse for custom control:
Configuration¶
Control server behavior with ServerConfig:
from spikard import ServerConfig
from spikard.config import (
OpenApiConfig,
ServerInfo,
ContactInfo,
LicenseInfo,
)
config = ServerConfig(
openapi=OpenApiConfig(
enabled=True,
title="My API",
version="1.0.0",
description="API documentation",
servers=[
ServerInfo(url="https://api.example.com", description="Production"),
ServerInfo(url="http://localhost:8000", description="Development"),
],
contact=ContactInfo(
name="API Support",
email="support@example.com",
),
license=LicenseInfo(
name="MIT",
url="https://opensource.org/licenses/MIT",
),
)
)
File Uploads¶
Accept multipart file uploads with UploadFile:
from dataclasses import dataclass
from spikard import Spikard
from spikard.datastructures import UploadFile
app = Spikard()
@dataclass
class FileUpload:
file: UploadFile
description: str
@app.post("/upload")
async def upload_file(body: FileUpload):
content = body.file.read() # or await body.file.aread()
return {
"filename": body.file.filename,
"size": body.file.size,
"content_type": body.file.content_type,
"description": body.body.description,
}
UploadFile supports both sync and async operations:
read(size=-1)/aread(size=-1)- Read file contentswrite(data)/awrite(data)- Write dataseek(offset, whence=0)/aseek(offset, whence=0)- Seek to positionclose()/aclose()- Close fileas_bytes_io()- Get BytesIO objectrolled_to_disk- Check if spooled to disk (files > 1MB by default)
WebSocket Support¶
Define WebSocket handlers with the @websocket decorator:
from spikard import Spikard, websocket
app = Spikard()
@app.websocket("/chat")
async def chat_handler(message: dict) -> dict | None:
return {"echo": message}
WebSocket handlers receive JSON messages and can return dicts to send as responses:
- The handler is called with the parsed JSON
message - Return a
dictto send a JSON response, orNoneto send nothing
Server-Sent Events (SSE)¶
Stream events to clients with the @sse decorator:
from spikard import Spikard, sse
import asyncio
app = Spikard()
@app.sse("/notifications")
async def notifications():
for i in range(10):
await asyncio.sleep(1)
yield {"message": f"Notification {i}", "count": i}
SSE handlers are async generators that yield event dicts. Each dict is sent as a Server-Sent Event with JSON serialization.
gRPC Support¶
Implement gRPC handlers for protobuf services:
from spikard.grpc import GrpcHandler, GrpcRequest, GrpcResponse
import user_pb2
class UserServiceHandler(GrpcHandler):
async def handle_request(self, request: GrpcRequest) -> GrpcResponse:
# Deserialize protobuf
req = user_pb2.GetUserRequest()
req.ParseFromString(request.payload)
# Process
user = user_pb2.User(id=req.id, name="John Doe")
# Serialize and return
return GrpcResponse(payload=user.SerializeToString())
async def handle_server_stream(self, request: GrpcRequest):
# Server streaming RPC
req = user_pb2.StreamRequest()
req.ParseFromString(request.payload)
for item in get_items(req):
response = user_pb2.StreamResponse(data=item)
yield GrpcResponse(payload=response.SerializeToString())
async def handle_client_stream(self, request_stream):
# Client streaming RPC
items = []
async for request in request_stream:
req = user_pb2.StreamItem()
req.ParseFromString(request.payload)
items.append(req)
result = aggregate_items(items)
response = user_pb2.AggregateResponse(data=result)
return GrpcResponse(payload=response.SerializeToString())
async def handle_bidi_stream(self, request_stream):
# Bidirectional streaming RPC
async for request in request_stream:
req = user_pb2.BidiRequest()
req.ParseFromString(request.payload)
response_data = await process_bidi_item(req)
response = user_pb2.BidiResponse(data=response_data)
yield GrpcResponse(payload=response.SerializeToString())
GrpcRequest attributes:
payload- Serialized protobuf bytesmethod_name- Name of the method calledservice_name- Name of the servicemetadata- Request metadata dict
GrpcResponse attributes:
payload- Serialized protobuf response bytesmetadata- Optional response metadata dict
Testing¶
TestClient (In-Process)¶
Fast, in-process testing using Rust directly:
from spikard.testing import TestClient
async def test_get_user():
async with TestClient(app) as client:
response = await client.get("/users/1")
assert response.status_code == 200
assert response.json() == {"id": 1, "name": "Alice"}
TestResponse methods:
status_code- HTTP status codeheaders- Response headers dictbytes()- Response body as bytestext()- Response body as textjson()- Response body parsed as JSONassert_status(code)- Assert status code (chainable)assert_status_ok()- Assert status is 200 (chainable)
LiveTestClient (Real Server)¶
Start a real server for specialized testing:
from spikard.testing import LiveTestClient
async def test_with_real_server():
async with LiveTestClient(app) as client:
response = await client.get("/users/1")
assert response.status_code == 200
LiveTestClient starts a real server in a subprocess, useful for testing server behavior, port binding, and signal handling.
Deployment¶
- Local:
python app.pyorawait app.serve(). - Production: build with the binding and set
SPIKARD_PORT/SPIKARD_HOSTvia env.
Event Loop Integration¶
The binding uses pyo3_async_runtimes to convert Python coroutines directly to Tokio futures, eliminating the overhead of a dedicated event loop thread. See Python architecture.
Troubleshooting¶
- Ensure Python 3.10+ is installed.
- Normal installs use prebuilt wheels on macOS, Linux, and Windows.
- Rust is only required when building the extension from source.
- If you see import errors while developing from source, rebuild with
maturin developortask build:py.