Protobuf/gRPC Guide¶
Build high-performance, type-safe gRPC services with Spikard. This guide covers proto3 syntax, code generation, and service implementation across all supported languages.
What You'll Learn¶
- Write
.protofiles with proto3 syntax - Generate type-safe code with
spikard generate protobuf - Implement gRPC service handlers
- Handle errors with gRPC status codes
- Use streaming patterns
Why gRPC and Protobuf?¶
Protocol Buffers provide type safety, 3-5x smaller payloads than JSON, cross-language code generation, and schema evolution support.
gRPC adds HTTP/2 multiplexing, four streaming modes, standardized error codes, and metadata support.
Part 1: Proto3 Syntax¶
Basic Service Definition¶
UserService Proto Definition¶
Basic protobuf definition for a UserService with GetUser and CreateUser methods.
syntax = "proto3";
package userservice;
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
}
message GetUserRequest {
int32 id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
string created_at = 4;
}
Usage¶
This proto definition is used across all language examples in the Spikard documentation. It demonstrates:
- Service definition: Defines the RPC methods available
- Request messages: Input types for each RPC method
- Response messages: Output types returned by the service
- Field numbering: Each field has a unique number for wire format compatibility
Key concepts:
syntax = "proto3": Required (Spikard only supports proto3)package: Namespace for types (use versioning:v1,v2)- Field numbers: Unique identifiers (1-536870911) that must never change
- Field labels:
optional,repeated, or none
Scalar Types¶
| Proto3 Type | Description | Example |
|---|---|---|
int32 | 32-bit signed integer | 42, -100 |
int64 | 64-bit signed integer | 9223372036854775807 |
uint32 | 32-bit unsigned integer | 42 |
uint64 | 64-bit unsigned integer | 18446744073709551615 |
float | 32-bit floating point | 3.14 |
double | 64-bit floating point | 3.141592653589793 |
bool | Boolean | true, false |
string | UTF-8 string | "Hello, World!" |
bytes | Binary data | b"\x00\x01\x02" |
Enums¶
enum UserStatus {
USER_STATUS_UNSPECIFIED = 0; // Always have a zero value
ACTIVE = 1;
INACTIVE = 2;
SUSPENDED = 3;
}
Best practices: Always include a zero-value, use UPPER_SNAKE_CASE, prefix values with enum name.
Service Definitions¶
service UserService {
// Unary RPC
rpc GetUser(GetUserRequest) returns (User) {}
// Server streaming
rpc ListUsers(ListUsersRequest) returns (stream User) {}
// Client streaming
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse) {}
// Bidirectional streaming
rpc Chat(stream ChatMessage) returns (stream ChatMessage) {}
}
Part 2: Type Mapping¶
Protobuf Type Mapping Across Languages¶
Reference table showing how proto3 types map to native language types in all Spikard-supported languages.
Scalar Types¶
| Proto3 Type | Python | TypeScript | Ruby | PHP | Rust |
|---|---|---|---|---|---|
double | float | number | Float | float | f64 |
float | float | number | Float | float | f32 |
int32 | int | number | Integer | int | i32 |
int64 | int | number\|bigint | Integer | int | i64 |
uint32 | int | number | Integer | int | u32 |
uint64 | int | number\|bigint | Integer | int | u64 |
bool | bool | boolean | Boolean | bool | bool |
string | str | string | String | string | String |
bytes | bytes | Uint8Array | String | string | Bytes |
Complex Types¶
| Proto3 Type | Python | TypeScript | Ruby | PHP | Rust |
|---|---|---|---|---|---|
message | dataclass | interface | class | class | struct |
enum | Literal[...] | union type | module | class | enum |
repeated T | list[T] | T[] | Array<T> | array<T> | Vec<T> |
map<K,V> | dict[K,V] | Map<K,V> | Hash{K=>V} | array<K,V> | HashMap<K,V> |
optional T | T\|None | T\|undefined | T\|nil | ?T | Option<T> |
Default Values in Proto3¶
When a field is not set, proto3 uses these default values:
| Type | Default Value |
|---|---|
| Numbers | 0 |
| Strings | "" (empty string) |
| Booleans | false |
| Enums | First value (must be 0) |
| Messages | None/null/undefined |
| Repeated | Empty list |
| Maps | Empty map |
Optional Field Handling¶
Use optional keyword to distinguish between "not set" and "default value":
message User {
int32 id = 1; // Never null (defaults to 0)
string name = 2; // Never null (defaults to "")
optional string email = 3; // Can be null/None/undefined
optional int32 age = 4; // Can be null (distinguishes 0 from unset)
}
Checking Optional Fields by Language¶
Python:
TypeScript:
Ruby:
PHP:
Rust:
Part 3: Code Generation¶
Installation¶
Generate Code¶
Code Generation Commands¶
Generate protobuf code for each language:
Python:
TypeScript:
# Install protobufjs CLI
npm install -g protobufjs-cli
# Generate TypeScript definitions
pbjs -t static-module -w commonjs -o user_service.js user_service.proto
pbts -o user_service.d.ts user_service.js
Ruby:
PHP:
Rust (add to build.rs):
Part 4: Implementing Handlers¶
Python gRPC Handler¶
Complete Python handler implementation for UserService with GetUser and CreateUser methods.
from spikard.grpc import GrpcHandler, GrpcRequest, GrpcResponse
import userservice_pb2 # Generated from proto
from datetime import datetime
class UserServiceHandler(GrpcHandler):
"""UserService gRPC handler implementation."""
def __init__(self, user_repository):
"""Initialize handler with dependencies."""
self.user_repository = user_repository
async def handle_request(self, request: GrpcRequest) -> GrpcResponse:
"""
Handle incoming gRPC requests.
Routes to appropriate method based on request.method_name.
"""
if request.method_name == "GetUser":
return await self._get_user(request)
elif request.method_name == "CreateUser":
return await self._create_user(request)
else:
raise NotImplementedError(f"Unknown method: {request.method_name}")
async def _get_user(self, request: GrpcRequest) -> GrpcResponse:
"""Handle GetUser RPC."""
# 1. Deserialize request
req = userservice_pb2.GetUserRequest()
req.ParseFromString(request.payload)
# 2. Validate input
if req.id <= 0:
raise ValueError("User ID must be positive")
# 3. Business logic
user = await self.user_repository.find_by_id(req.id)
if not user:
raise ValueError(f"User {req.id} not found")
# 4. Build response
response_user = userservice_pb2.User()
response_user.id = user.id
response_user.name = user.name
response_user.email = user.email
response_user.created_at = user.created_at.isoformat()
# 5. Serialize and return
return GrpcResponse(
payload=response_user.SerializeToString(),
metadata={"x-user-found": "true"}
)
async def _create_user(self, request: GrpcRequest) -> GrpcResponse:
"""Handle CreateUser RPC."""
# 1. Deserialize request
req = userservice_pb2.CreateUserRequest()
req.ParseFromString(request.payload)
# 2. Validate input
if not req.name or not req.email:
raise ValueError("Name and email are required")
# 3. Check authorization from metadata
auth_token = request.get_metadata("authorization")
if not auth_token:
raise PermissionError("Authentication required")
# 4. Business logic
user = await self.user_repository.create(
name=req.name,
email=req.email
)
# 5. Build response
response_user = userservice_pb2.User()
response_user.id = user.id
response_user.name = user.name
response_user.email = user.email
response_user.created_at = datetime.utcnow().isoformat()
# 6. Serialize with metadata
return GrpcResponse(
payload=response_user.SerializeToString(),
metadata={
"x-user-id": str(user.id),
"x-created": "true"
}
)
Key Patterns¶
- Async/await: All handlers are async for non-blocking I/O
ParseFromString(): Deserializes binary protobuf to Python objectSerializeToString(): Serializes Python object to binary protobuf- Exception mapping:
ValueError->INVALID_ARGUMENT,PermissionError->PERMISSION_DENIED - Metadata access:
request.get_metadata(key)returnsstr | None
Registration¶
from spikard import create_app
from spikard.grpc import GrpcService
# Create app
app = create_app()
# Create service registry
grpc_service = GrpcService()
# Register handler
handler = UserServiceHandler(user_repository=UserRepository())
grpc_service.register_handler("userservice.UserService", handler)
# App is now ready to serve gRPC requests
TypeScript gRPC Handler¶
Complete TypeScript handler implementation for UserService with GetUser and CreateUser methods.
import {
GrpcHandler,
GrpcRequest,
GrpcResponse,
GrpcError,
GrpcStatusCode,
createServiceHandler,
createUnaryHandler,
} from 'spikard';
import * as userservice from './userservice_pb'; // Generated protobufjs types
class UserServiceHandler implements GrpcHandler {
constructor(private userRepository: UserRepository) {}
async handleRequest(request: GrpcRequest): Promise<GrpcResponse> {
/**
* Handle incoming gRPC requests.
* Routes to appropriate method based on request.methodName.
*/
switch (request.methodName) {
case 'GetUser':
return this.getUser(request);
case 'CreateUser':
return this.createUser(request);
default:
throw new GrpcError(
GrpcStatusCode.UNIMPLEMENTED,
`Method ${request.methodName} not implemented`
);
}
}
private async getUser(request: GrpcRequest): Promise<GrpcResponse> {
// 1. Deserialize request
const req = userservice.GetUserRequest.decode(request.payload);
// 2. Validate input
if (req.id <= 0) {
throw new GrpcError(
GrpcStatusCode.INVALID_ARGUMENT,
'User ID must be positive'
);
}
// 3. Business logic
const user = await this.userRepository.findById(req.id);
if (!user) {
throw new GrpcError(
GrpcStatusCode.NOT_FOUND,
`User ${req.id} not found`
);
}
// 4. Build response
const responseUser = userservice.User.create({
id: user.id,
name: user.name,
email: user.email,
createdAt: user.createdAt.toISOString(),
});
// 5. Serialize and return
const encoded = userservice.User.encode(responseUser).finish();
return {
payload: Buffer.from(encoded),
metadata: { 'x-user-found': 'true' },
};
}
private async createUser(request: GrpcRequest): Promise<GrpcResponse> {
// 1. Deserialize request
const req = userservice.CreateUserRequest.decode(request.payload);
// 2. Validate input
if (!req.name || !req.email) {
throw new GrpcError(
GrpcStatusCode.INVALID_ARGUMENT,
'Name and email are required'
);
}
// 3. Check authorization from metadata
const authToken = request.metadata['authorization'];
if (!authToken) {
throw new GrpcError(
GrpcStatusCode.UNAUTHENTICATED,
'Authentication required'
);
}
// 4. Business logic
const user = await this.userRepository.create({
name: req.name,
email: req.email,
});
// 5. Build response
const responseUser = userservice.User.create({
id: user.id,
name: user.name,
email: user.email,
createdAt: new Date().toISOString(),
});
// 6. Serialize with metadata
const encoded = userservice.User.encode(responseUser).finish();
return {
payload: Buffer.from(encoded),
metadata: {
'x-user-id': user.id.toString(),
'x-created': 'true',
},
};
}
}
Key Patterns¶
- Protobufjs: Uses
.decode()and.encode().finish()for serialization - Buffer: gRPC payloads are Node.js
Bufferobjects - GrpcError: Throw with explicit status codes for proper error responses
- Helper functions:
createUnaryHandlerandcreateServiceHandlerreduce boilerplate - Type safety: Full TypeScript type inference for protobuf messages
Registration¶
Ruby gRPC Handler¶
Complete Ruby handler implementation for UserService with GetUser and CreateUser methods.
require 'spikard/grpc'
require 'userservice_pb' # Generated from proto
class UserServiceHandler < Spikard::Grpc::Handler
def initialize(user_repository)
@user_repository = user_repository
end
def handle_request(request)
# Route based on method name
case request.method_name
when 'GetUser'
get_user(request)
when 'CreateUser'
create_user(request)
else
raise "Unknown method: #{request.method_name}"
end
end
private
def get_user(request)
# 1. Deserialize request
req = Userservice::GetUserRequest.decode(request.payload)
# 2. Validate input
raise ArgumentError, 'User ID must be positive' if req.id <= 0
# 3. Business logic
user = @user_repository.find_by_id(req.id)
raise ArgumentError, "User #{req.id} not found" unless user
# 4. Build response
response_user = Userservice::User.new(
id: user.id,
name: user.name,
email: user.email,
created_at: user.created_at.iso8601
)
# 5. Serialize and return
response = Spikard::Grpc::Response.new(
payload: Userservice::User.encode(response_user)
)
response.metadata = { 'x-user-found' => 'true' }
response
end
def create_user(request)
# 1. Deserialize request
req = Userservice::CreateUserRequest.decode(request.payload)
# 2. Validate input
if req.name.empty? || req.email.empty?
raise ArgumentError, 'Name and email are required'
end
# 3. Check authorization from metadata
auth_token = request.get_metadata('authorization')
unless auth_token
raise SecurityError, 'Authentication required'
end
# 4. Business logic
user = @user_repository.create(
name: req.name,
email: req.email
)
# 5. Build response
response_user = Userservice::User.new(
id: user.id,
name: user.name,
email: user.email,
created_at: Time.now.utc.iso8601
)
# 6. Serialize with metadata
response = Spikard::Grpc::Response.new(
payload: Userservice::User.encode(response_user)
)
response.metadata = {
'x-user-id' => user.id.to_s,
'x-created' => 'true'
}
response
end
end
Key Patterns¶
- Synchronous: Ruby handlers are synchronous (Rust runtime handles async)
.decode()/.encode(): Ruby protobuf methods for serialization- Metadata access:
request.get_metadata(key)returnsString | nil - Response construction: Create response, then set metadata separately
- Error responses: Use
Response.error()for error cases - Exception mapping: Rescue exceptions and convert to gRPC status codes
Error Handling¶
class UserServiceHandler < Spikard::Grpc::Handler
def handle_request(request)
case request.method_name
when 'GetUser'
get_user(request)
else
# Return error response
Spikard::Grpc::Response.error(
"Method not implemented: #{request.method_name}"
)
end
rescue ArgumentError => e
# Invalid argument error
Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue SecurityError => e
# Authentication error
Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
rescue StandardError => e
# Internal error
Spikard::Grpc::Response.error("Internal error: #{e.message}")
end
end
Registration¶
PHP gRPC Handler¶
Complete PHP handler implementation for UserService with GetUser and CreateUser methods.
<?php
declare(strict_types=1);
use Spikard\Grpc\HandlerInterface;
use Spikard\Grpc\Request;
use Spikard\Grpc\Response;
use Userservice\GetUserRequest;
use Userservice\CreateUserRequest;
use Userservice\User;
class UserServiceHandler implements HandlerInterface
{
public function __construct(
private UserRepository $userRepository,
) {}
public function handleRequest(Request $request): Response
{
// Route based on method name
return match ($request->methodName) {
'GetUser' => $this->getUser($request),
'CreateUser' => $this->createUser($request),
default => Response::error("Unknown method: {$request->methodName}"),
};
}
private function getUser(Request $request): Response
{
try {
// 1. Deserialize request
$req = new GetUserRequest();
$req->mergeFromString($request->payload);
// 2. Validate input
if ($req->getId() <= 0) {
return Response::error('User ID must be positive');
}
// 3. Business logic
$user = $this->userRepository->findById($req->getId());
if (!$user) {
return Response::error("User {$req->getId()} not found");
}
// 4. Build response
$responseUser = new User();
$responseUser->setId($user->getId());
$responseUser->setName($user->getName());
$responseUser->setEmail($user->getEmail());
$responseUser->setCreatedAt($user->getCreatedAt()->format('c'));
// 5. Serialize and return
return new Response(
payload: $responseUser->serializeToString(),
metadata: ['x-user-found' => 'true']
);
} catch (\Exception $e) {
return Response::error("Error: {$e->getMessage()}");
}
}
private function createUser(Request $request): Response
{
try {
// 1. Deserialize request
$req = new CreateUserRequest();
$req->mergeFromString($request->payload);
// 2. Validate input
if (empty($req->getName()) || empty($req->getEmail())) {
return Response::error('Name and email are required');
}
// 3. Check authorization from metadata
$authToken = $request->getMetadata('authorization');
if (!$authToken) {
return Response::error(
'Authentication required',
'UNAUTHENTICATED'
);
}
// 4. Business logic
$user = $this->userRepository->create(
name: $req->getName(),
email: $req->getEmail()
);
// 5. Build response
$responseUser = new User();
$responseUser->setId($user->getId());
$responseUser->setName($user->getName());
$responseUser->setEmail($user->getEmail());
$responseUser->setCreatedAt((new \DateTime())->format('c'));
// 6. Serialize with metadata
return new Response(
payload: $responseUser->serializeToString(),
metadata: [
'x-user-id' => (string)$user->getId(),
'x-created' => 'true',
]
);
} catch (\Exception $e) {
return Response::error("Error: {$e->getMessage()}");
}
}
}
Key Patterns¶
- Synchronous: PHP handlers are synchronous
mergeFromString(): Deserializes binary protobuf (use merge, not parse)serializeToString(): Serializes protobuf to binary- Getters/Setters: PHP protobuf uses getter/setter methods
- Error responses: Return
Response::error()instead of throwing - Named arguments: PHP 8.0+ named arguments for clarity
- Type hints: Leverage PHP type system for safety
Registration¶
<?php
declare(strict_types=1);
use Spikard\Grpc;
// Create service registry
$service = Grpc::createService();
// Register handler
$handler = new UserServiceHandler(
userRepository: new UserRepository()
);
$service->registerHandler('userservice.UserService', $handler);
// Service ready to handle requests
Rust gRPC Handler¶
Complete Rust handler implementation for UserService with GetUser and CreateUser methods.
use bytes::Bytes;
use spikard_http::grpc::{GrpcHandler, GrpcHandlerResult, GrpcRequestData, GrpcResponseData};
use tonic::Status;
use std::sync::Arc;
// Generated protobuf types
mod userservice {
include!("userservice.rs"); // Generated by prost
}
pub struct UserServiceHandler {
user_repository: Arc<dyn UserRepository + Send + Sync>,
}
impl UserServiceHandler {
pub fn new(user_repository: Arc<dyn UserRepository + Send + Sync>) -> Self {
Self { user_repository }
}
async fn get_user(&self, request: GrpcRequestData) -> GrpcHandlerResult {
// 1. Deserialize request
use prost::Message;
let req = userservice::GetUserRequest::decode(request.payload)
.map_err(|e| Status::invalid_argument(format!("Invalid request: {}", e)))?;
// 2. Validate input
if req.id <= 0 {
return Err(Status::invalid_argument("User ID must be positive"));
}
// 3. Business logic
let user = self.user_repository.find_by_id(req.id).await
.map_err(|e| Status::internal(format!("Database error: {}", e)))?
.ok_or_else(|| Status::not_found(format!("User {} not found", req.id)))?;
// 4. Build response
let response_user = userservice::User {
id: user.id,
name: user.name.clone(),
email: user.email.clone(),
created_at: user.created_at.to_rfc3339(),
};
// 5. Serialize
let mut buf = Vec::new();
response_user.encode(&mut buf)
.map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
// 6. Add metadata
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert("x-user-found", "true".parse().map_err(|_| Status::internal("failed to parse metadata value"))?);
Ok(GrpcResponseData {
payload: Bytes::from(buf),
metadata,
})
}
async fn create_user(&self, request: GrpcRequestData) -> GrpcHandlerResult {
// 1. Deserialize request
use prost::Message;
let req = userservice::CreateUserRequest::decode(request.payload)
.map_err(|e| Status::invalid_argument(format!("Invalid request: {}", e)))?;
// 2. Validate input
if req.name.is_empty() || req.email.is_empty() {
return Err(Status::invalid_argument("Name and email are required"));
}
// 3. Check authorization from metadata
let auth_token = request.metadata
.get("authorization")
.and_then(|v| v.to_str().ok());
if auth_token.is_none() {
return Err(Status::unauthenticated("Authentication required"));
}
// 4. Business logic
let user = self.user_repository.create(&req.name, &req.email).await
.map_err(|e| Status::internal(format!("Create failed: {}", e)))?;
// 5. Build response
let response_user = userservice::User {
id: user.id,
name: user.name.clone(),
email: user.email.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
};
// 6. Serialize
let mut buf = Vec::new();
response_user.encode(&mut buf)
.map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
// 7. Add metadata
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert("x-user-id", user.id.to_string().parse().map_err(|_| Status::internal("failed to parse metadata value"))?);
metadata.insert("x-created", "true".parse().map_err(|_| Status::internal("failed to parse metadata value"))?);
Ok(GrpcResponseData {
payload: Bytes::from(buf),
metadata,
})
}
}
impl GrpcHandler for UserServiceHandler {
fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
Box::pin(async move {
match request.method_name.as_str() {
"GetUser" => self.get_user(request).await,
"CreateUser" => self.create_user(request).await,
_ => Err(Status::unimplemented(format!("Unknown method: {}", request.method_name))),
}
})
}
fn service_name(&self) -> &'static str {
"userservice.UserService"
}
}
Key Patterns¶
- Async/await: Handlers return
Pin<Box<dyn Future>> - prost: Uses
.decode()and.encode()for protobuf - Error handling: Return
tonic::Statusdirectly - Zero-copy: Uses
Bytesfor efficient payload handling - Type safety: Full compile-time type checking
- Arc: Shared ownership for thread-safe repository access
Registration¶
use spikard_http::grpc::{GrpcRegistry, RpcMode};
use std::sync::Arc;
// Create handler
let user_repository = Arc::new(UserRepositoryImpl::new());
let handler = Arc::new(UserServiceHandler::new(user_repository));
// Register with gRPC runtime
let mut registry = GrpcRegistry::new();
registry.register("userservice.UserService", handler, RpcMode::Unary);
// Runtime ready to serve
Part 5: Error Handling¶
gRPC Status Codes¶
gRPC Status Codes Reference¶
Complete reference table for all 17 standard gRPC status codes.
| Code | Numeric | When to Use | HTTP Equivalent |
|---|---|---|---|
| OK | 0 | Request completed successfully | 200 OK |
| CANCELLED | 1 | Operation was cancelled (typically by caller) | 499 Client Closed Request |
| UNKNOWN | 2 | Unknown error or unmapped status from another system | 500 Internal Server Error |
| INVALID_ARGUMENT | 3 | Client specified an invalid argument (validation errors) | 400 Bad Request |
| DEADLINE_EXCEEDED | 4 | Operation deadline was exceeded before completion | 504 Gateway Timeout |
| NOT_FOUND | 5 | Requested entity (e.g., file, user) was not found | 404 Not Found |
| ALREADY_EXISTS | 6 | Entity that client attempted to create already exists | 409 Conflict |
| PERMISSION_DENIED | 7 | Caller lacks permission for the operation | 403 Forbidden |
| RESOURCE_EXHAUSTED | 8 | Resource has been exhausted (quota, rate limit) | 429 Too Many Requests |
| FAILED_PRECONDITION | 9 | Operation rejected because system not in required state | 400 Bad Request |
| ABORTED | 10 | Operation aborted due to concurrency issues | 409 Conflict |
| OUT_OF_RANGE | 11 | Operation attempted past valid range | 400 Bad Request |
| UNIMPLEMENTED | 12 | Operation is not implemented or not supported | 501 Not Implemented |
| INTERNAL | 13 | Internal server error | 500 Internal Server Error |
| UNAVAILABLE | 14 | Service is currently unavailable (temporary condition) | 503 Service Unavailable |
| DATA_LOSS | 15 | Unrecoverable data loss or corruption | 500 Internal Server Error |
| UNAUTHENTICATED | 16 | Request missing or invalid authentication credentials | 401 Unauthorized |
Usage Guidelines¶
-
Choose the most specific code: Use the most descriptive status code that accurately represents the error condition.
-
Provide helpful messages: Include clear, actionable error messages that help clients understand and resolve the issue.
-
Never expose sensitive information: Don't include stack traces, database errors, or internal system details in error messages.
-
Use INTERNAL for unexpected errors: When encountering unexpected server errors, return INTERNAL and log the details server-side.
-
Distinguish UNAUTHENTICATED vs PERMISSION_DENIED: Use UNAUTHENTICATED for missing/invalid credentials, PERMISSION_DENIED for authenticated users lacking permissions.
-
Consider retry behavior: Clients may automatically retry certain codes (UNAVAILABLE, DEADLINE_EXCEEDED) but not others (INVALID_ARGUMENT, PERMISSION_DENIED).
Error Handling Patterns¶
try:
# Handler logic
pass
except ValueError as e:
# Maps to INVALID_ARGUMENT
raise
except PermissionError as e:
# Maps to PERMISSION_DENIED
raise
except NotImplementedError as e:
# Maps to UNIMPLEMENTED
raise
except Exception as e:
# Maps to INTERNAL
raise
Mapping is automatic via FFI layer (pyerr_to_grpc_status).
import { GrpcError, GrpcStatusCode } from 'spikard';
// Explicit status codes
throw new GrpcError(GrpcStatusCode.INVALID_ARGUMENT, 'Invalid ID');
throw new GrpcError(GrpcStatusCode.NOT_FOUND, 'User not found');
throw new GrpcError(GrpcStatusCode.UNAUTHENTICATED, 'Auth required');
throw new GrpcError(GrpcStatusCode.PERMISSION_DENIED, 'Access denied');
throw new GrpcError(GrpcStatusCode.INTERNAL, 'Internal error');
Explicit GrpcError for all status codes.
class UserServiceHandler < Spikard::Grpc::Handler
def handle_request(request)
case request.method_name
when 'GetUser'
get_user(request)
else
# Return error response
Spikard::Grpc::Response.error(
"Method not implemented: #{request.method_name}"
)
end
rescue ArgumentError => e
# Invalid argument error
Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue SecurityError => e
# Authentication error
Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
rescue StandardError => e
# Internal error
Spikard::Grpc::Response.error("Internal error: #{e.message}")
end
end
<?php declare(strict_types=1);
// Return error response
return Response::error('Error message');
// With status code in metadata
return Response::error(
'Error message',
'INVALID_ARGUMENT'
);
// Try-catch pattern
try {
// Handler logic
} catch (\InvalidArgumentException $e) {
return Response::error($e->getMessage());
} catch (\Exception $e) {
return Response::error("Internal error: {$e->getMessage()}");
}
Return error responses instead of throwing.
// Direct tonic::Status
return Err(Status::invalid_argument("Invalid ID"));
return Err(Status::not_found("User not found"));
return Err(Status::unauthenticated("Auth required"));
return Err(Status::permission_denied("Access denied"));
return Err(Status::internal("Internal error"));
// With .map_err()
user_repository.find_by_id(id)
.await
.map_err(|e| Status::internal(format!("DB error: {}", e)))?;
Type-safe Result<T, Status> pattern.
Part 6: Testing¶
Python gRPC Handler Tests¶
Comprehensive test examples for gRPC handlers using pytest.
# test_user_handler.py
import pytest
from user_handler import UserServiceHandler
from spikard import GrpcRequest
import user_service_pb2 as pb
@pytest.mark.asyncio
async def test_get_user_success():
"""Test getting an existing user."""
handler = UserServiceHandler()
# Create request
req = pb.GetUserRequest(user_id=1)
grpc_request = GrpcRequest(
service_name="userservice.v1.UserService",
method_name="GetUser",
payload=req.SerializeToString(),
)
# Call handler
response = await handler.handle_request(grpc_request)
# Deserialize response
user_response = pb.UserResponse()
user_response.ParseFromString(response.payload)
# Assertions
assert user_response.success is True
assert user_response.user.id == 1
assert user_response.user.name == "Alice"
assert user_response.user.email == "alice@example.com"
@pytest.mark.asyncio
async def test_get_user_not_found():
"""Test getting a non-existent user."""
handler = UserServiceHandler()
# Create request for non-existent user
req = pb.GetUserRequest(user_id=999)
grpc_request = GrpcRequest(
service_name="userservice.v1.UserService",
method_name="GetUser",
payload=req.SerializeToString(),
)
# Call handler
response = await handler.handle_request(grpc_request)
# Deserialize response
user_response = pb.UserResponse()
user_response.ParseFromString(response.payload)
# Assertions
assert user_response.success is False
assert "not found" in user_response.error_message
@pytest.mark.asyncio
async def test_create_user_success():
"""Test creating a new user."""
handler = UserServiceHandler()
# Create request
req = pb.CreateUserRequest(
name="Charlie",
email="charlie@example.com",
phone="555-1234",
tags=["developer", "remote"],
)
grpc_request = GrpcRequest(
service_name="userservice.v1.UserService",
method_name="CreateUser",
payload=req.SerializeToString(),
)
# Call handler
response = await handler.handle_request(grpc_request)
# Deserialize response
user_response = pb.UserResponse()
user_response.ParseFromString(response.payload)
# Assertions
assert user_response.success is True
assert user_response.user.id == 3 # Next available ID
assert user_response.user.name == "Charlie"
assert user_response.user.email == "charlie@example.com"
assert user_response.user.phone == "555-1234"
assert list(user_response.user.tags) == ["developer", "remote"]
@pytest.mark.asyncio
async def test_create_user_validation_error():
"""Test creating a user with missing required fields."""
handler = UserServiceHandler()
# Create request with missing email
req = pb.CreateUserRequest(name="Invalid")
grpc_request = GrpcRequest(
service_name="userservice.v1.UserService",
method_name="CreateUser",
payload=req.SerializeToString(),
)
# Call handler
response = await handler.handle_request(grpc_request)
# Deserialize response
user_response = pb.UserResponse()
user_response.ParseFromString(response.payload)
# Assertions
assert user_response.success is False
assert "required" in user_response.error_message
@pytest.mark.asyncio
async def test_unknown_method():
"""Test calling an unknown method."""
handler = UserServiceHandler()
grpc_request = GrpcRequest(
service_name="userservice.v1.UserService",
method_name="DeleteUser", # Not implemented
payload=b"",
)
# Should raise NotImplementedError
with pytest.raises(NotImplementedError, match="Unknown method"):
await handler.handle_request(grpc_request)
# Run tests
if __name__ == "__main__":
pytest.main([__file__, "-v"])
Test Patterns¶
Using Fixtures¶
import pytest
from user_handler import UserServiceHandler
@pytest.fixture
def handler():
"""Create handler with test data."""
return UserServiceHandler()
@pytest.fixture
def sample_user():
"""Create sample user for tests."""
return pb.User(
id=1,
name="Test User",
email="test@example.com",
)
@pytest.mark.asyncio
async def test_with_fixtures(handler, sample_user):
# Test using fixtures
pass
Testing Error Cases¶
@pytest.mark.asyncio
async def test_handles_malformed_payload():
"""Test handler with malformed protobuf."""
handler = UserServiceHandler()
# Create request with invalid protobuf
grpc_request = GrpcRequest(
service_name="userservice.v1.UserService",
method_name="GetUser",
payload=b"invalid protobuf data",
)
# Should handle deserialization error gracefully
with pytest.raises(Exception): # Or specific protobuf exception
await handler.handle_request(grpc_request)
Running Tests¶
TypeScript gRPC Handler Tests¶
Comprehensive test examples for gRPC handlers using Vitest.
// user_handler.test.ts
import { describe, it, expect } from 'vitest';
import { GrpcRequest } from '@spikard/node';
import { UserServiceHandler } from './user_handler';
import { userservice } from './user_service';
describe('UserServiceHandler', () => {
it('should get an existing user', async () => {
const handler = new UserServiceHandler();
// Create request
const req = userservice.v1.GetUserRequest.create({ userId: 1 });
const payload = userservice.v1.GetUserRequest.encode(req).finish();
const grpcRequest = new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload,
});
// Call handler
const response = await handler.handleRequest(grpcRequest);
// Deserialize response
const userResponse = userservice.v1.UserResponse.decode(response.payload);
// Assertions
expect(userResponse.success).toBe(true);
expect(userResponse.user?.id).toBe(1);
expect(userResponse.user?.name).toBe('Alice');
});
it('should return error for non-existent user', async () => {
const handler = new UserServiceHandler();
const req = userservice.v1.GetUserRequest.create({ userId: 999 });
const payload = userservice.v1.GetUserRequest.encode(req).finish();
const grpcRequest = new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload,
});
const response = await handler.handleRequest(grpcRequest);
const userResponse = userservice.v1.UserResponse.decode(response.payload);
expect(userResponse.success).toBe(false);
expect(userResponse.errorMessage).toContain('not found');
});
it('should create a new user', async () => {
const handler = new UserServiceHandler();
const req = userservice.v1.CreateUserRequest.create({
name: 'Charlie',
email: 'charlie@example.com',
tags: ['developer'],
});
const payload = userservice.v1.CreateUserRequest.encode(req).finish();
const grpcRequest = new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload,
});
const response = await handler.handleRequest(grpcRequest);
const userResponse = userservice.v1.UserResponse.decode(response.payload);
expect(userResponse.success).toBe(true);
expect(userResponse.user?.name).toBe('Charlie');
expect(userResponse.user?.id).toBe(3);
});
it('should validate required fields on create', async () => {
const handler = new UserServiceHandler();
const req = userservice.v1.CreateUserRequest.create({
name: 'Test User',
email: '', // Missing email
});
const payload = userservice.v1.CreateUserRequest.encode(req).finish();
const grpcRequest = new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload,
});
const response = await handler.handleRequest(grpcRequest);
const userResponse = userservice.v1.UserResponse.decode(response.payload);
expect(userResponse.success).toBe(false);
expect(userResponse.errorMessage).toContain('required');
});
it('should throw for unknown methods', async () => {
const handler = new UserServiceHandler();
const grpcRequest = new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'DeleteUser', // Not implemented
payload: new Uint8Array(),
});
await expect(handler.handleRequest(grpcRequest)).rejects.toThrow(
'Unknown method'
);
});
});
Test Patterns¶
Using Test Helpers¶
// test-helpers.ts
import { GrpcRequest } from '@spikard/node';
import { userservice } from './user_service';
export function createGetUserRequest(userId: number): GrpcRequest {
const req = userservice.v1.GetUserRequest.create({ userId });
const payload = userservice.v1.GetUserRequest.encode(req).finish();
return new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload,
});
}
export function createCreateUserRequest(
name: string,
email: string,
tags?: string[]
): GrpcRequest {
const req = userservice.v1.CreateUserRequest.create({
name,
email,
tags: tags || [],
});
const payload = userservice.v1.CreateUserRequest.encode(req).finish();
return new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload,
});
}
Testing with Metadata¶
it('should handle authorization header', async () => {
const handler = new UserServiceHandler();
const req = userservice.v1.CreateUserRequest.create({
name: 'Test',
email: 'test@example.com',
});
const payload = userservice.v1.CreateUserRequest.encode(req).finish();
const grpcRequest = new GrpcRequest({
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload,
metadata: {
authorization: 'Bearer valid-token',
},
});
const response = await handler.handleRequest(grpcRequest);
const userResponse = userservice.v1.UserResponse.decode(response.payload);
expect(userResponse.success).toBe(true);
});
Running Tests¶
Ruby gRPC Handler Tests¶
Comprehensive test examples for gRPC handlers using RSpec.
# spec/user_service_handler_spec.rb
require 'spec_helper'
require 'spikard/grpc'
require 'userservice_pb'
require_relative '../lib/user_service_handler'
RSpec.describe UserServiceHandler do
let(:user_repository) { instance_double('UserRepository') }
let(:handler) { described_class.new(user_repository) }
describe '#handle_request' do
context 'GetUser' do
it 'returns an existing user successfully' do
# Setup mock data
mock_user = OpenStruct.new(
id: 1,
name: 'Alice',
email: 'alice@example.com',
created_at: Time.now.utc
)
allow(user_repository).to receive(:find_by_id).with(1).and_return(mock_user)
# Create request
req = Userservice::GetUserRequest.new(id: 1)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'GetUser',
payload: Userservice::GetUserRequest.encode(req)
)
# Call handler
response = handler.handle_request(grpc_request)
# Deserialize response
user_response = Userservice::User.decode(response.payload)
# Assertions
expect(user_response.id).to eq(1)
expect(user_response.name).to eq('Alice')
expect(user_response.email).to eq('alice@example.com')
expect(response.metadata['x-user-found']).to eq('true')
end
it 'returns error for non-existent user' do
allow(user_repository).to receive(:find_by_id).with(999).and_return(nil)
# Create request for non-existent user
req = Userservice::GetUserRequest.new(id: 999)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'GetUser',
payload: Userservice::GetUserRequest.encode(req)
)
# Call handler - should raise error
expect { handler.handle_request(grpc_request) }.to raise_error(
ArgumentError, /not found/
)
end
it 'validates user ID is positive' do
req = Userservice::GetUserRequest.new(id: 0)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'GetUser',
payload: Userservice::GetUserRequest.encode(req)
)
expect { handler.handle_request(grpc_request) }.to raise_error(
ArgumentError, /must be positive/
)
end
end
context 'CreateUser' do
it 'creates a new user successfully' do
mock_user = OpenStruct.new(
id: 3,
name: 'Charlie',
email: 'charlie@example.com'
)
allow(user_repository).to receive(:create).and_return(mock_user)
# Create request with authorization metadata
req = Userservice::CreateUserRequest.new(
name: 'Charlie',
email: 'charlie@example.com'
)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'CreateUser',
payload: Userservice::CreateUserRequest.encode(req),
metadata: { 'authorization' => 'Bearer valid-token' }
)
# Call handler
response = handler.handle_request(grpc_request)
# Deserialize response
user_response = Userservice::User.decode(response.payload)
# Assertions
expect(user_response.id).to eq(3)
expect(user_response.name).to eq('Charlie')
expect(user_response.email).to eq('charlie@example.com')
expect(response.metadata['x-user-id']).to eq('3')
expect(response.metadata['x-created']).to eq('true')
end
it 'returns error when name is missing' do
req = Userservice::CreateUserRequest.new(
name: '',
email: 'test@example.com'
)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'CreateUser',
payload: Userservice::CreateUserRequest.encode(req),
metadata: { 'authorization' => 'Bearer token' }
)
expect { handler.handle_request(grpc_request) }.to raise_error(
ArgumentError, /required/
)
end
it 'returns error when authorization is missing' do
req = Userservice::CreateUserRequest.new(
name: 'Test',
email: 'test@example.com'
)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'CreateUser',
payload: Userservice::CreateUserRequest.encode(req)
)
expect { handler.handle_request(grpc_request) }.to raise_error(
SecurityError, /Authentication required/
)
end
end
context 'unknown method' do
it 'raises error for unknown method' do
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'DeleteUser',
payload: ''
)
expect { handler.handle_request(grpc_request) }.to raise_error(
RuntimeError, /Unknown method/
)
end
end
end
end
Test Patterns¶
Using Shared Examples¶
RSpec.shared_examples 'authenticated endpoint' do |method_name|
it 'requires authentication' do
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: method_name,
payload: request_payload
)
expect { handler.handle_request(grpc_request) }.to raise_error(
SecurityError, /Authentication required/
)
end
end
RSpec.describe UserServiceHandler do
describe 'CreateUser' do
let(:request_payload) do
req = Userservice::CreateUserRequest.new(name: 'Test', email: 'test@example.com')
Userservice::CreateUserRequest.encode(req)
end
include_examples 'authenticated endpoint', 'CreateUser'
end
end
Testing Error Responses¶
RSpec.describe UserServiceHandler do
describe 'error handling' do
it 'handles malformed protobuf gracefully' do
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'GetUser',
payload: 'invalid protobuf data'
)
expect { handler.handle_request(grpc_request) }.to raise_error(
Google::Protobuf::ParseError
)
end
end
end
Using let! for Setup¶
RSpec.describe UserServiceHandler do
let!(:alice) do
OpenStruct.new(id: 1, name: 'Alice', email: 'alice@example.com', created_at: Time.now)
end
let!(:bob) do
OpenStruct.new(id: 2, name: 'Bob', email: 'bob@example.com', created_at: Time.now)
end
before do
allow(user_repository).to receive(:find_by_id).with(1).and_return(alice)
allow(user_repository).to receive(:find_by_id).with(2).and_return(bob)
end
it 'retrieves different users' do
# Test with alice
req = Userservice::GetUserRequest.new(id: 1)
grpc_request = Spikard::Grpc::Request.new(
service_name: 'userservice.v1.UserService',
method_name: 'GetUser',
payload: Userservice::GetUserRequest.encode(req)
)
response = handler.handle_request(grpc_request)
user = Userservice::User.decode(response.payload)
expect(user.name).to eq('Alice')
end
end
Running Tests¶
PHP gRPC Handler Tests¶
Comprehensive test examples for gRPC handlers using PHPUnit.
<?php
// tests/UserServiceHandlerTest.php
declare(strict_types=1);
namespace Tests;
use PHPUnit\Framework\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use Spikard\Grpc\Request;
use Spikard\Grpc\Response;
use Userservice\GetUserRequest;
use Userservice\CreateUserRequest;
use Userservice\User;
class UserServiceHandlerTest extends TestCase
{
private UserServiceHandler $handler;
private MockObject $userRepository;
protected function setUp(): void
{
$this->userRepository = $this->createMock(UserRepository::class);
$this->handler = new UserServiceHandler($this->userRepository);
}
public function testGetUserSuccess(): void
{
// Setup mock data
$mockUser = new \stdClass();
$mockUser->id = 1;
$mockUser->name = 'Alice';
$mockUser->email = 'alice@example.com';
$mockUser->createdAt = new \DateTime();
$this->userRepository
->expects($this->once())
->method('findById')
->with(1)
->willReturn($mockUser);
// Create request
$req = new GetUserRequest();
$req->setId(1);
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: $req->serializeToString()
);
// Call handler
$response = $this->handler->handleRequest($grpcRequest);
// Deserialize response
$userResponse = new User();
$userResponse->mergeFromString($response->payload);
// Assertions
$this->assertEquals(1, $userResponse->getId());
$this->assertEquals('Alice', $userResponse->getName());
$this->assertEquals('alice@example.com', $userResponse->getEmail());
$this->assertEquals('true', $response->getMetadata('x-user-found'));
}
public function testGetUserNotFound(): void
{
$this->userRepository
->expects($this->once())
->method('findById')
->with(999)
->willReturn(null);
// Create request for non-existent user
$req = new GetUserRequest();
$req->setId(999);
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: $req->serializeToString()
);
// Call handler
$response = $this->handler->handleRequest($grpcRequest);
// Should return error response
$this->assertTrue($response->isError());
$this->assertStringContainsString('not found', $response->errorMessage);
}
public function testGetUserInvalidId(): void
{
$req = new GetUserRequest();
$req->setId(0);
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: $req->serializeToString()
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertTrue($response->isError());
$this->assertStringContainsString('must be positive', $response->errorMessage);
}
public function testCreateUserSuccess(): void
{
$mockUser = new \stdClass();
$mockUser->id = 3;
$mockUser->name = 'Charlie';
$mockUser->email = 'charlie@example.com';
$this->userRepository
->expects($this->once())
->method('create')
->willReturn($mockUser);
// Create request
$req = new CreateUserRequest();
$req->setName('Charlie');
$req->setEmail('charlie@example.com');
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload: $req->serializeToString(),
metadata: ['authorization' => 'Bearer valid-token']
);
// Call handler
$response = $this->handler->handleRequest($grpcRequest);
// Deserialize response
$userResponse = new User();
$userResponse->mergeFromString($response->payload);
// Assertions
$this->assertEquals(3, $userResponse->getId());
$this->assertEquals('Charlie', $userResponse->getName());
$this->assertEquals('charlie@example.com', $userResponse->getEmail());
$this->assertEquals('3', $response->getMetadata('x-user-id'));
$this->assertEquals('true', $response->getMetadata('x-created'));
}
public function testCreateUserValidationError(): void
{
// Create request with missing email
$req = new CreateUserRequest();
$req->setName('Test User');
$req->setEmail('');
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload: $req->serializeToString(),
metadata: ['authorization' => 'Bearer token']
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertTrue($response->isError());
$this->assertStringContainsString('required', $response->errorMessage);
}
public function testCreateUserRequiresAuthentication(): void
{
$req = new CreateUserRequest();
$req->setName('Test');
$req->setEmail('test@example.com');
// Request without authorization header
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'CreateUser',
payload: $req->serializeToString()
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertTrue($response->isError());
$this->assertStringContainsString('Authentication required', $response->errorMessage);
$this->assertEquals('16', $response->getMetadata('grpc-status'));
}
public function testUnknownMethod(): void
{
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'DeleteUser',
payload: ''
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertTrue($response->isError());
$this->assertStringContainsString('Unknown method', $response->errorMessage);
}
}
Test Patterns¶
Using Data Providers¶
<?php
class UserServiceHandlerTest extends TestCase
{
/**
* @dataProvider invalidUserIdProvider
*/
public function testGetUserWithInvalidIds(int $invalidId, string $expectedError): void
{
$req = new GetUserRequest();
$req->setId($invalidId);
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: $req->serializeToString()
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertTrue($response->isError());
$this->assertStringContainsString($expectedError, $response->errorMessage);
}
public static function invalidUserIdProvider(): array
{
return [
'zero id' => [0, 'must be positive'],
'negative id' => [-1, 'must be positive'],
];
}
}
Testing Error Handling¶
<?php
class UserServiceHandlerTest extends TestCase
{
public function testHandlesMalformedPayload(): void
{
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: 'invalid protobuf data'
);
$response = $this->handler->handleRequest($grpcRequest);
// Should return error, not throw exception
$this->assertTrue($response->isError());
$this->assertStringContainsString('Error', $response->errorMessage);
}
public function testHandlesRepositoryException(): void
{
$this->userRepository
->method('findById')
->willThrowException(new \RuntimeException('Database connection failed'));
$req = new GetUserRequest();
$req->setId(1);
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: $req->serializeToString()
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertTrue($response->isError());
$this->assertStringContainsString('Database connection failed', $response->errorMessage);
}
}
Testing with Metadata¶
<?php
class UserServiceHandlerTest extends TestCase
{
public function testHandlerReadsCustomMetadata(): void
{
$mockUser = new \stdClass();
$mockUser->id = 1;
$mockUser->name = 'Alice';
$mockUser->email = 'alice@example.com';
$mockUser->createdAt = new \DateTime();
$this->userRepository->method('findById')->willReturn($mockUser);
$req = new GetUserRequest();
$req->setId(1);
$grpcRequest = new Request(
serviceName: 'userservice.v1.UserService',
methodName: 'GetUser',
payload: $req->serializeToString(),
metadata: [
'x-request-id' => 'abc-123',
'x-trace-id' => 'trace-456',
]
);
$response = $this->handler->handleRequest($grpcRequest);
$this->assertFalse($response->isError());
}
}
Running Tests¶
# Run all tests
./vendor/bin/phpunit
# Run with verbose output
./vendor/bin/phpunit --verbose
# Run specific test file
./vendor/bin/phpunit tests/UserServiceHandlerTest.php
# Run specific test method
./vendor/bin/phpunit --filter testGetUserSuccess
# Run with coverage
./vendor/bin/phpunit --coverage-html coverage/
Rust gRPC Handler Tests¶
Comprehensive test examples for gRPC handlers using Tokio test.
// tests/user_handler_test.rs
use bytes::Bytes;
use spikard_http::grpc::{GrpcHandler, GrpcRequestData, GrpcResponseData};
use std::sync::Arc;
use tonic::metadata::MetadataMap;
mod userservice {
include!("../src/userservice.rs");
}
use crate::user_handler::UserServiceHandler;
// Mock repository for testing
struct MockUserRepository {
users: std::sync::RwLock<Vec<userservice::User>>,
}
impl MockUserRepository {
fn new() -> Self {
let users = vec![
userservice::User {
id: 1,
name: "Alice".to_string(),
email: "alice@example.com".to_string(),
created_at: "2024-01-01T00:00:00Z".to_string(),
},
userservice::User {
id: 2,
name: "Bob".to_string(),
email: "bob@example.com".to_string(),
created_at: "2024-01-02T00:00:00Z".to_string(),
},
];
Self {
users: std::sync::RwLock::new(users),
}
}
}
#[async_trait::async_trait]
impl UserRepository for MockUserRepository {
async fn find_by_id(&self, id: i32) -> Result<Option<userservice::User>, String> {
let users = self.users.read().expect("lock poisoned");
Ok(users.iter().find(|u| u.id == id).cloned())
}
async fn create(&self, name: &str, email: &str) -> Result<userservice::User, String> {
let mut users = self.users.write().expect("lock poisoned");
let new_id = users.len() as i32 + 1;
let user = userservice::User {
id: new_id,
name: name.to_string(),
email: email.to_string(),
created_at: chrono::Utc::now().to_rfc3339(),
};
users.push(user.clone());
Ok(user)
}
}
fn create_handler() -> UserServiceHandler {
let repo = Arc::new(MockUserRepository::new());
UserServiceHandler::new(repo)
}
#[tokio::test]
async fn test_get_user_success() {
use prost::Message;
let handler = create_handler();
// Create request
let req = userservice::GetUserRequest { id: 1 };
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from(buf),
metadata: MetadataMap::new(),
};
// Call handler
let response = handler.call(grpc_request).await.expect("handler call failed");
// Deserialize response
let user_response = userservice::User::decode(response.payload).expect("failed to decode response payload");
// Assertions
assert_eq!(user_response.id, 1);
assert_eq!(user_response.name, "Alice");
assert_eq!(user_response.email, "alice@example.com");
assert_eq!(
response.metadata.get("x-user-found").expect("x-user-found header missing").to_str().expect("invalid metadata value"),
"true"
);
}
#[tokio::test]
async fn test_get_user_not_found() {
use prost::Message;
let handler = create_handler();
// Create request for non-existent user
let req = userservice::GetUserRequest { id: 999 };
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from(buf),
metadata: MetadataMap::new(),
};
// Call handler - should return error
let result = handler.call(grpc_request).await;
assert!(result.is_err());
let status = result.unwrap_err();
assert_eq!(status.code(), tonic::Code::NotFound);
assert!(status.message().contains("not found"));
}
#[tokio::test]
async fn test_get_user_invalid_id() {
use prost::Message;
let handler = create_handler();
// Create request with invalid ID
let req = userservice::GetUserRequest { id: 0 };
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from(buf),
metadata: MetadataMap::new(),
};
let result = handler.call(grpc_request).await;
assert!(result.is_err());
let status = result.unwrap_err();
assert_eq!(status.code(), tonic::Code::InvalidArgument);
assert!(status.message().contains("must be positive"));
}
#[tokio::test]
async fn test_create_user_success() {
use prost::Message;
let handler = create_handler();
// Create request
let req = userservice::CreateUserRequest {
name: "Charlie".to_string(),
email: "charlie@example.com".to_string(),
};
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
// Add authorization metadata
let mut metadata = MetadataMap::new();
metadata.insert("authorization", "Bearer valid-token".parse().expect("failed to parse authorization header"));
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "CreateUser".to_string(),
payload: Bytes::from(buf),
metadata,
};
// Call handler
let response = handler.call(grpc_request).await.expect("handler call failed");
// Deserialize response
let user_response = userservice::User::decode(response.payload).expect("failed to decode response payload");
// Assertions
assert_eq!(user_response.id, 3); // Next available ID
assert_eq!(user_response.name, "Charlie");
assert_eq!(user_response.email, "charlie@example.com");
assert_eq!(
response.metadata.get("x-user-id").expect("x-user-id header missing").to_str().expect("invalid metadata value"),
"3"
);
assert_eq!(
response.metadata.get("x-created").expect("x-created header missing").to_str().expect("invalid metadata value"),
"true"
);
}
#[tokio::test]
async fn test_create_user_validation_error() {
use prost::Message;
let handler = create_handler();
// Create request with missing email
let req = userservice::CreateUserRequest {
name: "Test User".to_string(),
email: "".to_string(),
};
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
let mut metadata = MetadataMap::new();
metadata.insert("authorization", "Bearer token".parse().expect("failed to parse authorization header"));
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "CreateUser".to_string(),
payload: Bytes::from(buf),
metadata,
};
let result = handler.call(grpc_request).await;
assert!(result.is_err());
let status = result.unwrap_err();
assert_eq!(status.code(), tonic::Code::InvalidArgument);
assert!(status.message().contains("required"));
}
#[tokio::test]
async fn test_create_user_requires_authentication() {
use prost::Message;
let handler = create_handler();
let req = userservice::CreateUserRequest {
name: "Test".to_string(),
email: "test@example.com".to_string(),
};
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
// Request without authorization header
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "CreateUser".to_string(),
payload: Bytes::from(buf),
metadata: MetadataMap::new(),
};
let result = handler.call(grpc_request).await;
assert!(result.is_err());
let status = result.unwrap_err();
assert_eq!(status.code(), tonic::Code::Unauthenticated);
assert!(status.message().contains("Authentication required"));
}
#[tokio::test]
async fn test_unknown_method() {
let handler = create_handler();
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "DeleteUser".to_string(),
payload: Bytes::new(),
metadata: MetadataMap::new(),
};
let result = handler.call(grpc_request).await;
assert!(result.is_err());
let status = result.unwrap_err();
assert_eq!(status.code(), tonic::Code::Unimplemented);
assert!(status.message().contains("Unknown method"));
}
Test Patterns¶
Using Test Fixtures¶
use once_cell::sync::Lazy;
static TEST_HANDLER: Lazy<UserServiceHandler> = Lazy::new(|| {
let repo = Arc::new(MockUserRepository::new());
UserServiceHandler::new(repo)
});
#[tokio::test]
async fn test_with_shared_handler() {
use prost::Message;
let req = userservice::GetUserRequest { id: 1 };
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from(buf),
metadata: MetadataMap::new(),
};
let response = TEST_HANDLER.call(grpc_request).await.expect("handler call failed");
let user = userservice::User::decode(response.payload).expect("failed to decode response payload");
assert_eq!(user.name, "Alice");
}
Testing Error Cases¶
#[tokio::test]
async fn test_handles_malformed_payload() {
let handler = create_handler();
let grpc_request = GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from("invalid protobuf data"),
metadata: MetadataMap::new(),
};
let result = handler.call(grpc_request).await;
assert!(result.is_err());
let status = result.unwrap_err();
assert_eq!(status.code(), tonic::Code::InvalidArgument);
}
Helper Functions¶
fn create_get_user_request(user_id: i32) -> GrpcRequestData {
use prost::Message;
let req = userservice::GetUserRequest { id: user_id };
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "GetUser".to_string(),
payload: Bytes::from(buf),
metadata: MetadataMap::new(),
}
}
fn create_create_user_request(name: &str, email: &str, auth_token: Option<&str>) -> GrpcRequestData {
use prost::Message;
let req = userservice::CreateUserRequest {
name: name.to_string(),
email: email.to_string(),
};
let mut buf = Vec::new();
req.encode(&mut buf).expect("failed to encode protobuf request");
let mut metadata = MetadataMap::new();
if let Some(token) = auth_token {
metadata.insert("authorization", token.parse().expect("failed to parse authorization header"));
}
GrpcRequestData {
service_name: "userservice.v1.UserService".to_string(),
method_name: "CreateUser".to_string(),
payload: Bytes::from(buf),
metadata,
}
}
#[tokio::test]
async fn test_with_helpers() {
let handler = create_handler();
let request = create_get_user_request(1);
let response = handler.call(request).await.expect("handler call failed");
let user = userservice::User::decode(response.payload).expect("failed to decode response payload");
assert_eq!(user.id, 1);
}
Running Tests¶
Part 7: Streaming¶
Streaming Modes¶
| Mode | Definition | Use Cases |
|---|---|---|
| Unary | rpc Method(Request) returns (Response) | CRUD operations, simple queries |
| Server Streaming | rpc Method(Request) returns (stream Response) | Large result sets, real-time updates |
| Client Streaming | rpc Method(stream Request) returns (Response) | File uploads, batch operations |
| Bidirectional | rpc Method(stream Request) returns (stream Response) | Chat, real-time collaboration |
Support Status¶
| Mode | Python | TypeScript | Ruby | PHP | Rust |
|---|---|---|---|---|---|
| Unary | Supported | Supported | Supported | Supported | Supported |
| Server Streaming | Supported | Supported | Supported | Supported | Supported |
| Client Streaming | Supported | Supported | Supported | Supported | Supported |
| Bidirectional | Supported | Supported | Supported | Supported | Supported |
Documentation¶
Streaming handler examples are available in the tabbed section above showing client streaming and bidirectional streaming patterns for Python, TypeScript, Ruby, and PHP.
Streaming Handler Implementations¶
Complete examples of client streaming and bidirectional streaming handlers:
Python gRPC Streaming Handlers¶
Complete Python implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.
Client Streaming Handler¶
Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.
Handler Signature¶
from typing import List
from spikard.grpc import GrpcClientStreamRequest, GrpcResponse
class GrpcClientStreamRequest:
"""Request object for client streaming RPC."""
service_name: str
method_name: str
metadata: dict[str, str]
messages: List[bytes] # All client messages collected as list
async def handle_client_stream(
request: GrpcClientStreamRequest,
) -> GrpcResponse:
"""Process all messages and return single response."""
pass
Example: Batch Message Processing¶
from typing import List
from spikard.grpc import GrpcClientStreamRequest, GrpcResponse
import messageservice_pb2 # Generated from proto
from datetime import datetime
import uuid
async def handle_batch_create(
request: GrpcClientStreamRequest,
) -> GrpcResponse:
"""
Handle client streaming: receive multiple messages, send single response.
Pattern: Collect all input messages, process together, return aggregated response.
"""
messages = request.messages
metadata = request.metadata
# Validate authorization
auth_token = metadata.get('authorization')
if not auth_token:
raise PermissionError('Authentication required')
# Step 1: Deserialize all input messages
items = []
for i, msg in enumerate(messages):
try:
item = messageservice_pb2.Item()
item.ParseFromString(msg)
items.append(item)
except Exception as err:
raise ValueError(f'Failed to decode message {i}: {str(err)}')
# Validate all items before processing
for item in items:
if not item.name or not item.value:
raise ValueError('Each item must have name and value')
# Step 2: Process all items atomically
success_count = 0
total_value = 0
try:
# Simulate database transaction
async with database_transaction() as tx:
for item in items:
# Simulate creating item in database
await tx.items.create(
name=item.name,
value=item.value,
)
success_count += 1
total_value += item.value
except Exception as err:
raise RuntimeError(f'Transaction failed: {str(err)}')
# Step 3: Build aggregate response
response = messageservice_pb2.BatchCreateResponse()
response.success_count = success_count
response.total_value = total_value
response.batch_id = str(uuid.uuid4())
response.timestamp = datetime.utcnow().isoformat()
# Step 4: Serialize and return
return GrpcResponse(
payload=response.SerializeToString(),
metadata={
'x-batch-id': response.batch_id,
'x-count': str(success_count),
},
)
async def database_transaction():
"""Context manager for database transactions."""
# Placeholder for actual database transaction logic
class Transaction:
class Items:
async def create(self, name: str, value: int):
pass
def __init__(self):
self.items = self.Items()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
return Transaction()
Bidirectional Streaming Handler¶
Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.
Handler Signature¶
from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
class GrpcBidiStreamRequest:
"""Request object for bidirectional streaming RPC."""
service_name: str
method_name: str
metadata: dict[str, str]
messages: List[bytes] # All input messages collected as list
class GrpcBidiStreamResponse:
"""Response object for bidirectional streaming RPC."""
messages: List[bytes] # Array of response messages
metadata: dict[str, str]
async def handle_bidi_stream(
request: GrpcBidiStreamRequest,
) -> GrpcBidiStreamResponse:
"""Process input messages and generate output messages."""
pass
Example: Message Transformation Pipeline¶
from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
import transformservice_pb2 # Generated from proto
from datetime import datetime
async def handle_transform_stream(
request: GrpcBidiStreamRequest,
) -> GrpcBidiStreamResponse:
"""
Handle bidirectional streaming: receive multiple messages, send multiple messages.
Pattern: Collect all input messages, process, generate output messages, return array.
"""
messages = request.messages
metadata = request.metadata
# Step 1: Deserialize all input messages
input_documents = []
for index, msg in enumerate(messages):
try:
document = transformservice_pb2.Document()
document.ParseFromString(msg)
input_documents.append({
'index': index,
'document': document,
})
except Exception as err:
raise ValueError(
f'Failed to decode message {index}: {str(err)}'
)
# Step 2: Process each document and generate response
output_messages: List[bytes] = []
for item in input_documents:
index = item['index']
document = item['document']
try:
# Transform the document
transformed = await transform_document(document)
# Build response message
result = transformservice_pb2.TransformResult()
result.original_id = document.id
result.transformed_content = transformed['content']
result.transformed_at = datetime.utcnow().isoformat()
result.status = 'SUCCESS' if transformed['success'] else 'PARTIAL'
result.metadata['original_size'] = str(len(document.content))
result.metadata['transformed_size'] = str(len(transformed['content']))
# Serialize response message
output_messages.append(result.SerializeToString())
except Exception as err:
# Create error response for this document
error_result = transformservice_pb2.TransformResult()
error_result.original_id = document.id
error_result.status = 'ERROR'
error_result.error_message = str(err)
output_messages.append(error_result.SerializeToString())
# Step 3: Return all response messages
return GrpcBidiStreamResponse(
messages=output_messages,
metadata={
'x-processed-count': str(len(output_messages)),
'x-timestamp': datetime.utcnow().isoformat(),
},
)
async def transform_document(doc) -> dict:
"""Simulate document transformation."""
# Simulate async transformation work
return {
'content': doc.content.upper(),
'success': True,
}
Advanced Example: Filtering and Aggregation¶
Bidirectional Stream with Filtering¶
from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
import recordservice_pb2 # Generated from proto
async def handle_filter_stream(
request: GrpcBidiStreamRequest,
) -> GrpcBidiStreamResponse:
"""
Filter records and apply transformations in bidirectional stream.
Pattern: Filter input based on metadata criteria, transform, return filtered output.
"""
messages = request.messages
metadata = request.metadata
# Parse filter criteria from metadata
filter_type = metadata.get('x-filter-type', 'all')
min_value = int(metadata.get('x-min-value', '0'))
# Step 1: Deserialize and filter
filtered_items = []
for msg in messages:
item = recordservice_pb2.Record()
item.ParseFromString(msg)
# Apply filter logic
if (filter_type == 'all' or
(filter_type == 'high-value' and item.value >= min_value)):
filtered_items.append(item)
# Step 2: Transform filtered items
output_messages: List[bytes] = []
for item in filtered_items:
response = recordservice_pb2.ProcessedRecord()
response.id = item.id
response.original_value = item.value
response.processed_value = item.value * 1.1 # Apply multiplier
response.filtered = False
output_messages.append(response.SerializeToString())
# Step 3: Return response with statistics
return GrpcBidiStreamResponse(
messages=output_messages,
metadata={
'x-input-count': str(len(messages)),
'x-output-count': str(len(output_messages)),
'x-filtered-count': str(len(messages) - len(output_messages)),
},
)
Key Patterns¶
Message Collection¶
- All client messages are collected in a single
messageslist - Messages are provided as
bytesobjects (protobuf serialized) - No streaming iteration needed - full list is provided
- Order of messages is preserved
Processing Strategy¶
- Collect: All input messages received as list
- Validate: Check all messages before processing
- Transform: Process and generate output
- Serialize: Encode response messages as bytes
- Return: List of response message bytes
Error Handling¶
- Raise Python exceptions with appropriate types:
ValueErrormaps toINVALID_ARGUMENTstatusPermissionErrormaps toPERMISSION_DENIEDstatusNotImplementedErrormaps toUNIMPLEMENTEDstatusRuntimeErroror other exceptions map toINTERNALstatus- Errors in message deserialization should use
ValueError - Processing errors should use
RuntimeErroror domain-specific exceptions - Per-message errors can be included in response messages (with ERROR status)
Metadata¶
- Client streaming: Metadata passed in request, can be included in response
- Bidirectional streaming: Metadata passed in request, can be included in response
- Use metadata for non-payload information (timestamps, counts, filters)
- Access with
metadata.get(key)and provide defaults
Limits and Constraints¶
- MAX_STREAM_MESSAGES: 10,000 messages per stream
- Resource Exhaustion: Streams exceeding limit raise
MemoryError - Memory: All messages collected in memory - appropriate for moderate message counts
- Atomicity: All messages processed together (transaction-like semantics)
Testing Client Streaming¶
import pytest
from typing import List
from spikard.grpc import GrpcClientStreamRequest, GrpcResponse
from batch_handler import handle_batch_create
import messageservice_pb2 as pb
@pytest.mark.asyncio
async def test_batch_create_success():
"""Test processing batch of items."""
# Create multiple input messages
items = [
pb.Item(name='Item 1', value=100),
pb.Item(name='Item 2', value=200),
pb.Item(name='Item 3', value=300),
]
messages = [item.SerializeToString() for item in items]
# Create request
request = GrpcClientStreamRequest(
service_name='myservice.MyService',
method_name='BatchCreate',
metadata={'authorization': 'Bearer token'},
messages=messages,
)
# Call handler
response = await handle_batch_create(request)
# Verify response
result = pb.BatchCreateResponse()
result.ParseFromString(response.payload)
assert result.success_count == 3
assert result.total_value == 600
assert 'x-batch-id' in response.metadata
assert response.metadata['x-count'] == '3'
@pytest.mark.asyncio
async def test_batch_create_missing_authorization():
"""Test batch create without authorization."""
items = [pb.Item(name='Item 1', value=100)]
messages = [item.SerializeToString() for item in items]
request = GrpcClientStreamRequest(
service_name='myservice.MyService',
method_name='BatchCreate',
metadata={}, # No authorization
messages=messages,
)
# Should raise PermissionError
with pytest.raises(PermissionError, match='Authentication required'):
await handle_batch_create(request)
@pytest.mark.asyncio
async def test_batch_create_invalid_item():
"""Test batch create with invalid item (missing required fields)."""
# Item with missing value
items = [pb.Item(name='Item 1')] # No value
messages = [item.SerializeToString() for item in items]
request = GrpcClientStreamRequest(
service_name='myservice.MyService',
method_name='BatchCreate',
metadata={'authorization': 'Bearer token'},
messages=messages,
)
# Should raise ValueError
with pytest.raises(ValueError, match='must have name and value'):
await handle_batch_create(request)
@pytest.mark.asyncio
async def test_batch_create_malformed_message():
"""Test batch create with malformed protobuf."""
messages = [b'invalid protobuf data']
request = GrpcClientStreamRequest(
service_name='myservice.MyService',
method_name='BatchCreate',
metadata={'authorization': 'Bearer token'},
messages=messages,
)
# Should raise ValueError for decode error
with pytest.raises(ValueError, match='Failed to decode message'):
await handle_batch_create(request)
Testing Bidirectional Streaming¶
import pytest
from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
from transform_handler import handle_transform_stream
import transformservice_pb2 as pb
@pytest.mark.asyncio
async def test_transform_stream_success():
"""Test transforming multiple documents."""
# Create input messages
documents = [
pb.Document(id=1, content='hello world'),
pb.Document(id=2, content='goodbye world'),
]
messages = [doc.SerializeToString() for doc in documents]
# Create request
request = GrpcBidiStreamRequest(
service_name='myservice.MyService',
method_name='TransformStream',
metadata={},
messages=messages,
)
# Call handler
response = await handle_transform_stream(request)
# Verify multiple response messages
assert len(response.messages) == 2
assert 'x-processed-count' in response.metadata
assert response.metadata['x-processed-count'] == '2'
# Verify each response
for msg in response.messages:
result = pb.TransformResult()
result.ParseFromString(msg)
assert result.status == 'SUCCESS'
assert result.transformed_content # Should have transformed content
assert 'HELLO' in result.transformed_content or 'GOODBYE' in result.transformed_content
@pytest.mark.asyncio
async def test_transform_stream_partial_failure():
"""Test transform stream with some valid and some invalid documents."""
documents = [
pb.Document(id=1, content='valid content'),
pb.Document(id=2, content='another valid'),
]
messages = [doc.SerializeToString() for doc in documents]
request = GrpcBidiStreamRequest(
service_name='myservice.MyService',
method_name='TransformStream',
metadata={},
messages=messages,
)
response = await handle_transform_stream(request)
# All documents should get responses (some may be errors)
assert len(response.messages) == 2
# Check that we get both SUCCESS and possibly ERROR responses
statuses = []
for msg in response.messages:
result = pb.TransformResult()
result.ParseFromString(msg)
statuses.append(result.status)
assert 'SUCCESS' in statuses or 'ERROR' in statuses
@pytest.mark.asyncio
async def test_filter_stream_high_value():
"""Test filtering records by minimum value."""
from filter_handler import handle_filter_stream
records = [
pb.Record(id=1, value=50),
pb.Record(id=2, value=150),
pb.Record(id=3, value=250),
]
messages = [rec.SerializeToString() for rec in records]
request = GrpcBidiStreamRequest(
service_name='myservice.MyService',
method_name='FilterStream',
metadata={
'x-filter-type': 'high-value',
'x-min-value': '100',
},
messages=messages,
)
response = await handle_filter_stream(request)
# Only records with value >= 100 should be returned
assert len(response.messages) == 2
assert response.metadata['x-output-count'] == '2'
assert response.metadata['x-filtered-count'] == '1'
# Verify output values
for msg in response.messages:
result = pb.ProcessedRecord()
result.ParseFromString(msg)
assert result.original_value >= 100
@pytest.mark.asyncio
async def test_filter_stream_all_records():
"""Test returning all records without filtering."""
from filter_handler import handle_filter_stream
records = [
pb.Record(id=1, value=50),
pb.Record(id=2, value=75),
pb.Record(id=3, value=150),
]
messages = [rec.SerializeToString() for rec in records]
request = GrpcBidiStreamRequest(
service_name='myservice.MyService',
method_name='FilterStream',
metadata={'x-filter-type': 'all'},
messages=messages,
)
response = await handle_filter_stream(request)
# All records should be returned
assert len(response.messages) == 3
assert response.metadata['x-output-count'] == '3'
assert response.metadata['x-filtered-count'] == '0'
@pytest.mark.asyncio
async def test_filter_stream_malformed_input():
"""Test filter stream with malformed protobuf."""
from filter_handler import handle_filter_stream
messages = [b'invalid protobuf']
request = GrpcBidiStreamRequest(
service_name='myservice.MyService',
method_name='FilterStream',
metadata={'x-filter-type': 'all'},
messages=messages,
)
# Should raise ValueError
with pytest.raises(ValueError, match='Failed to decode'):
await handle_filter_stream(request)
Running Tests¶
# Run all streaming tests
pytest test_streaming_handlers.py -v
# Run specific test
pytest test_streaming_handlers.py::test_batch_create_success -v
# Run with async support and verbose output
pytest test_streaming_handlers.py -v -k "batch" --asyncio-mode=auto
Comparison with Other Patterns¶
| Aspect | Client Streaming | Bidirectional | Unary |
|---|---|---|---|
| Input | Multiple messages | Multiple messages | Single message |
| Output | Single response | Multiple messages | Single response |
| Use case | Batch operations | Stream processing | Simple requests |
| Message order | Important | Important | N/A |
| Atomicity | Full batch atomic | Per-message or batch | Single atomic |
Common Pitfalls¶
1. Forgetting to Deserialize Messages¶
# WRONG: Using raw bytes
for msg in messages:
print(msg) # prints raw bytes
# CORRECT: Deserialize protobuf
for msg in messages:
item = messageservice_pb2.Item()
item.ParseFromString(msg)
print(item.name)
2. Not Handling All Message Errors¶
# WRONG: Failing entire stream on first error
for msg in messages:
item = messageservice_pb2.Item()
item.ParseFromString(msg) # May raise exception
# CORRECT: Handle per-message errors
for msg in messages:
try:
item = messageservice_pb2.Item()
item.ParseFromString(msg)
except Exception as e:
# Generate error response for this message
error_result = messageservice_pb2.ItemResult()
error_result.status = 'ERROR'
error_result.error_message = str(e)
output_messages.append(error_result.SerializeToString())
3. Forgetting to Serialize Response Messages¶
# WRONG: Returning protobuf objects instead of bytes
response = pb.Item(name='test')
return GrpcBidiStreamResponse(messages=[response]) # Wrong!
# CORRECT: Serialize to bytes
response = pb.Item(name='test')
return GrpcBidiStreamResponse(
messages=[response.SerializeToString()] # Correct
)
4. Not Using Async Functions¶
# WRONG: Blocking operation in async handler
def handle_batch_create(request: GrpcClientStreamRequest) -> GrpcResponse:
time.sleep(1) # Blocks entire server!
return response
# CORRECT: Use async/await
async def handle_batch_create(request: GrpcClientStreamRequest) -> GrpcResponse:
await asyncio.sleep(1) # Non-blocking
return response
See the gRPC documentation for more examples.
TypeScript gRPC Streaming Handlers¶
The TypeScript package does not yet expose public streaming gRPC helper types.
Current public gRPC APIs in @spikard/node are:
GrpcHandlerGrpcResponseGrpcErrorGrpcStatusCodeGrpcServicecreateUnaryHandler(...)createServiceHandler(...)
For the current TypeScript surface, use unary handlers and register them through GrpcService.
Ruby gRPC Streaming Handlers¶
Complete Ruby implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.
Client Streaming Handler¶
Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.
Handler Signature¶
class ClientStreamRequest
attr_reader :service_name, :method_name, :metadata, :messages
def initialize(service_name:, method_name:, metadata:, messages:)
@service_name = service_name
@method_name = method_name
@metadata = metadata
@messages = messages # All client messages collected as array
end
end
def handle_client_stream(request)
# Process all messages and return single response
end
Example: Batch Message Processing¶
require 'spikard/grpc'
require 'messageservice_pb'
class BatchCreateHandler < Spikard::Grpc::Handler
def initialize(database)
@database = database
end
def handle_request(request)
case request.method_name
when 'BatchCreate'
handle_batch_create(request)
else
raise "Unknown method: #{request.method_name}"
end
end
private
def handle_batch_create(request)
# Validate authorization
auth_token = request.metadata['authorization']
unless auth_token
raise SecurityError, 'Authentication required'
end
# Step 1: Deserialize all input messages
items = request.messages.map.with_index do |msg, index|
begin
Messageservice::Item.decode(msg)
rescue Google::Protobuf::ParseError => e
raise ArgumentError, "Failed to decode message #{index}: #{e.message}"
end
end
# Validate all items before processing
items.each do |item|
unless item.name && !item.name.empty? && item.value > 0
raise ArgumentError, 'Each item must have name and positive value'
end
end
# Step 2: Process all items atomically
success_count = 0
total_value = 0
batch_id = SecureRandom.uuid
begin
# Begin transaction
@database.transaction do
items.each do |item|
created = @database[:items].insert(
name: item.name,
value: item.value,
batch_id: batch_id
)
success_count += 1
total_value += item.value
end
end
rescue StandardError => e
raise StandardError, "Transaction failed: #{e.message}"
end
# Step 3: Build aggregate response
response_proto = Messageservice::BatchCreateResponse.new(
success_count: success_count,
total_value: total_value,
batch_id: batch_id,
timestamp: Time.now.utc.iso8601
)
# Step 4: Serialize and return
response = Spikard::Grpc::Response.new(
payload: Messageservice::BatchCreateResponse.encode(response_proto)
)
response.metadata = {
'x-batch-id' => batch_id,
'x-count' => success_count.to_s
}
response
rescue ArgumentError => e
Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue SecurityError => e
Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
rescue StandardError => e
Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
end
end
Bidirectional Streaming Handler¶
Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.
Handler Signature¶
class BidiStreamRequest
attr_reader :service_name, :method_name, :metadata, :messages
def initialize(service_name:, method_name:, metadata:, messages:)
@service_name = service_name
@method_name = method_name
@metadata = metadata
@messages = messages # All input messages collected as array
end
end
class BidiStreamResponse
attr_accessor :messages, :metadata
def initialize(messages:, metadata: {})
@messages = messages
@metadata = metadata
end
end
def handle_bidi_stream(request)
# Process input messages and generate output messages
# Return hash with :messages and :metadata keys
end
Example: Message Transformation Pipeline¶
require 'spikard/grpc'
require 'transformservice_pb'
class TransformStreamHandler < Spikard::Grpc::Handler
def initialize(transformer_service)
@transformer_service = transformer_service
end
def handle_request(request)
case request.method_name
when 'TransformStream'
handle_transform_stream(request)
else
raise "Unknown method: #{request.method_name}"
end
end
private
def handle_transform_stream(request)
# Step 1: Deserialize all input messages
input_documents = request.messages.map.with_index do |msg, index|
begin
{
index: index,
document: Transformservice::Document.decode(msg)
}
rescue Google::Protobuf::ParseError => e
raise ArgumentError, "Failed to decode message #{index}: #{e.message}"
end
end
# Step 2: Process each document and generate response
output_messages = []
input_documents.each do |input|
index = input[:index]
document = input[:document]
begin
# Transform the document
transformed = transform_document(document)
# Build response message
result = Transformservice::TransformResult.new(
original_id: document.id,
transformed_content: transformed[:content],
transformed_at: Time.now.utc.iso8601,
status: transformed[:success] ? 'SUCCESS' : 'PARTIAL',
metadata: {
'original_size' => document.content.length.to_s,
'transformed_size' => transformed[:content].length.to_s
}
)
# Serialize response message
encoded = Transformservice::TransformResult.encode(result)
output_messages << encoded
rescue StandardError => e
# Create error response for this document
error_result = Transformservice::TransformResult.new(
original_id: document.id,
status: 'ERROR',
error_message: e.message
)
encoded = Transformservice::TransformResult.encode(error_result)
output_messages << encoded
end
end
# Step 3: Return all response messages
{
messages: output_messages,
metadata: {
'x-processed-count' => output_messages.length.to_s,
'x-timestamp' => Time.now.utc.iso8601
}
}
rescue ArgumentError => e
Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue StandardError => e
Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
end
def transform_document(doc)
# Simulate document transformation
{
content: doc.content.upcase,
success: true
}
end
end
Advanced Example: Filtering and Aggregation¶
Bidirectional Stream with Filtering¶
class FilterStreamHandler < Spikard::Grpc::Handler
def initialize(record_service)
@record_service = record_service
end
def handle_request(request)
case request.method_name
when 'FilterStream'
handle_filter_stream(request)
else
raise "Unknown method: #{request.method_name}"
end
end
private
def handle_filter_stream(request)
# Parse filter criteria from metadata
filter_type = request.metadata['x-filter-type'] || 'all'
min_value = (request.metadata['x-min-value'] || '0').to_i
# Step 1: Deserialize and filter
filtered_items = request.messages.filter_map do |msg|
item = Recordservice::Record.decode(msg)
# Apply filter logic
if filter_type == 'all' || (filter_type == 'high-value' && item.value >= min_value)
item
end
end
# Step 2: Transform filtered items
output_messages = filtered_items.map do |item|
response = Recordservice::ProcessedRecord.new(
id: item.id,
original_value: item.value,
processed_value: item.value * 1.1, # Apply multiplier
filtered: false
)
Recordservice::ProcessedRecord.encode(response)
end
# Step 3: Return response with statistics
{
messages: output_messages,
metadata: {
'x-input-count' => request.messages.length.to_s,
'x-output-count' => output_messages.length.to_s,
'x-filtered-count' => (request.messages.length - output_messages.length).to_s
}
}
rescue ArgumentError => e
Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue StandardError => e
Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
end
end
Key Patterns¶
Message Collection¶
- All client messages are collected in a single
messagesarray viarequest.messages - Messages are binary strings (serialized protobuf)
- No streaming iteration needed - full array is provided
- Use
.each.with_indexor.map.with_indexto process with index
Processing Strategy¶
- Collect: All input messages received as array via
request.messages - Validate: Check all messages before processing with
.eachor.filter_map - Transform: Process and generate output using blocks
- Serialize: Encode response messages with protobuf
.encode()method - Return: Hash with
:messagesand:metadatakeys for bidi;Responseobject for client stream
Error Handling¶
- Use Ruby exceptions (
ArgumentError,StandardError, custom errors) to signal failures - Catch exceptions in
handle_requestand convert toSpikard::Grpc::Response.error(message, status_code) - Errors in message deserialization should use
INVALID_ARGUMENTstatus - Processing errors should use
INTERNALor domain-specific codes - Per-message errors can be included in response messages (with ERROR status)
Metadata¶
- Client streaming: Metadata passed in request, can be included in response
- Bidirectional streaming: Metadata passed in request, can be included in response
- Use metadata for non-payload information (timestamps, counts, filters)
- Access request metadata via
request.metadata[key]returningString | nil
Limits and Constraints¶
- MAX_STREAM_MESSAGES: 10,000 messages per stream
- Resource Exhaustion: Streams exceeding limit return
RESOURCE_EXHAUSTEDerror - Memory: All messages collected in memory - appropriate for moderate message counts
- Atomicity: All messages processed together (transaction-like semantics)
Testing Client Streaming¶
require 'rspec'
require 'spikard/grpc'
require 'messageservice_pb'
require_relative '../lib/batch_create_handler'
RSpec.describe BatchCreateHandler do
let(:database) { instance_double('Database') }
let(:handler) { described_class.new(database) }
describe '#handle_request' do
context 'BatchCreate' do
it 'should process batch of items' do
# Create multiple input messages
items = [
Messageservice::Item.new(name: 'Item 1', value: 100),
Messageservice::Item.new(name: 'Item 2', value: 200),
Messageservice::Item.new(name: 'Item 3', value: 300)
]
messages = items.map { |item| Messageservice::Item.encode(item) }
# Setup database mock
allow(database).to receive(:transaction).and_yield(database)
allow(database).to receive(:[]).and_return(
instance_double('Table', insert: true)
)
# Create request
request = Spikard::Grpc::Request.new(
service_name: 'messageservice.MessageService',
method_name: 'BatchCreate',
metadata: { 'authorization' => 'Bearer token' },
payload: '' # Not used in client streaming
)
request.instance_variable_set(:@messages, messages)
# Call handler
response = handler.handle_request(request)
# Verify response
result = Messageservice::BatchCreateResponse.decode(response.payload)
expect(result.success_count).to eq(3)
expect(result.total_value).to eq(600)
expect(response.metadata['x-count']).to eq('3')
end
it 'returns error when authorization is missing' do
items = [
Messageservice::Item.new(name: 'Item 1', value: 100)
]
messages = items.map { |item| Messageservice::Item.encode(item) }
request = Spikard::Grpc::Request.new(
service_name: 'messageservice.MessageService',
method_name: 'BatchCreate',
metadata: {}, # Missing authorization
payload: ''
)
request.instance_variable_set(:@messages, messages)
expect { handler.handle_request(request) }.to raise_error(
SecurityError, /Authentication required/
)
end
it 'validates all items have values' do
items = [
Messageservice::Item.new(name: 'Item 1', value: 100),
Messageservice::Item.new(name: 'Item 2', value: 0) # Invalid
]
messages = items.map { |item| Messageservice::Item.encode(item) }
request = Spikard::Grpc::Request.new(
service_name: 'messageservice.MessageService',
method_name: 'BatchCreate',
metadata: { 'authorization' => 'Bearer token' },
payload: ''
)
request.instance_variable_set(:@messages, messages)
expect { handler.handle_request(request) }.to raise_error(
ArgumentError, /positive value/
)
end
end
end
end
Testing Bidirectional Streaming¶
require 'rspec'
require 'spikard/grpc'
require 'transformservice_pb'
require_relative '../lib/transform_stream_handler'
RSpec.describe TransformStreamHandler do
let(:transformer_service) { instance_double('TransformerService') }
let(:handler) { described_class.new(transformer_service) }
describe '#handle_request' do
context 'TransformStream' do
it 'should transform and return multiple messages' do
# Create input messages
documents = [
Transformservice::Document.new(id: '1', content: 'hello'),
Transformservice::Document.new(id: '2', content: 'world')
]
messages = documents.map { |doc| Transformservice::Document.encode(doc) }
# Create request
request = Spikard::Grpc::Request.new(
service_name: 'transformservice.TransformService',
method_name: 'TransformStream',
metadata: { 'authorization' => 'Bearer token' },
payload: ''
)
request.instance_variable_set(:@messages, messages)
# Call handler
response = handler.handle_request(request)
# Verify response
expect(response).to include(:messages, :metadata)
expect(response[:messages]).to have_length(2)
response[:messages].each do |msg|
result = Transformservice::TransformResult.decode(msg)
expect(result.status).to eq('SUCCESS')
expect(result.transformed_content).not_to be_empty
end
expect(response[:metadata]['x-processed-count']).to eq('2')
end
it 'handles per-message errors gracefully' do
documents = [
Transformservice::Document.new(id: '1', content: 'valid'),
Transformservice::Document.new(id: '2', content: nil) # Will fail
]
messages = documents.map { |doc| Transformservice::Document.encode(doc) }
request = Spikard::Grpc::Request.new(
service_name: 'transformservice.TransformService',
method_name: 'TransformStream',
metadata: { 'authorization' => 'Bearer token' },
payload: ''
)
request.instance_variable_set(:@messages, messages)
# Call handler - should return response with error for second message
response = handler.handle_request(request)
expect(response[:messages]).to have_length(2)
# First message should be SUCCESS
result1 = Transformservice::TransformResult.decode(response[:messages][0])
expect(result1.status).to eq('SUCCESS')
# Second message should be ERROR
result2 = Transformservice::TransformResult.decode(response[:messages][1])
expect(result2.status).to eq('ERROR')
expect(result2.error_message).not_to be_empty
end
it 'filters items based on metadata' do
records = [
Recordservice::Record.new(id: '1', value: 50),
Recordservice::Record.new(id: '2', value: 100),
Recordservice::Record.new(id: '3', value: 150)
]
messages = records.map { |rec| Recordservice::Record.encode(rec) }
request = Spikard::Grpc::Request.new(
service_name: 'recordservice.RecordService',
method_name: 'FilterStream',
metadata: {
'x-filter-type' => 'high-value',
'x-min-value' => '100'
},
payload: ''
)
request.instance_variable_set(:@messages, messages)
# Call handler
response = handler.handle_request(request)
# Only 2 items should pass filter (value >= 100)
expect(response[:messages]).to have_length(2)
expect(response[:metadata]['x-output-count']).to eq('2')
expect(response[:metadata]['x-filtered-count']).to eq('1')
end
end
end
end
Comparison with Other Patterns¶
| Aspect | Client Streaming | Bidirectional | Unary |
|---|---|---|---|
| Input | Multiple messages | Multiple messages | Single message |
| Output | Single response | Multiple messages | Single response |
| Use case | Batch operations | Stream processing | Simple requests |
| Message order | Important | Important | N/A |
| Atomicity | Full batch atomic | Per-message or batch | Single atomic |
Ruby-Specific Patterns¶
Using Enumerables Effectively¶
# Filter and transform in one pass
output_messages = request.messages.filter_map do |msg|
item = decode_message(msg)
next unless item.valid? # Filters out invalid items
encode_response(item) # Transforms to response
end
# Using each_with_index for detailed indexing
request.messages.each_with_index do |msg, idx|
process_with_position(msg, idx)
end
# Collect with error handling
results = request.messages.map do |msg|
decode_and_process(msg)
rescue StandardError => e
handle_error(e)
end
Exception Handling in Streaming¶
# Catch and convert exceptions to gRPC responses
def handle_request(request)
case request.method_name
when 'BatchCreate'
handle_batch_create(request)
else
raise "Unknown method"
end
rescue ArgumentError => e
Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue SecurityError => e
Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
rescue StandardError => e
Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
end
Transaction Patterns with Blocks¶
begin
@database.transaction do
items.each do |item|
@database[:items].insert(item.to_h)
end
end
rescue StandardError => e
raise StandardError, "Transaction failed: #{e.message}"
end
See the gRPC documentation for more examples.
PHP gRPC Streaming Handlers¶
Complete PHP implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.
Client Streaming Handler¶
Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.
Handler Signature¶
<?php
declare(strict_types=1);
use Spikard\Grpc\ClientStreamRequest;
use Spikard\Grpc\Response;
/**
* Client streaming handler signature
*/
final class ExampleClientStreamHandler
{
public function handleClientStream(ClientStreamRequest $request): Response
{
// $request->serviceName: string
// $request->methodName: string
// $request->metadata: array<string, string>
// $request->messages: string[] // Array of serialized messages
// Process all messages and return single response
}
}
Example: Batch Message Processing¶
<?php
declare(strict_types=1);
use Spikard\Grpc\HandlerInterface;
use Spikard\Grpc\ClientStreamRequest;
use Spikard\Grpc\Response;
use Messageservice\Item;
use Messageservice\BatchCreateResponse;
class MessageServiceHandler implements HandlerInterface
{
public function __construct(
private MessageRepository $repository,
) {}
/**
* Handle client streaming: receive multiple messages, send single response
*
* Pattern: Collect all input messages, process together, return aggregated response
*/
public function handleClientStream(ClientStreamRequest $request): Response
{
try {
// Step 1: Validate authorization
$authToken = $request->getMetadata('authorization');
if (!$authToken) {
return Response::error(
'Authentication required',
'UNAUTHENTICATED'
);
}
// Step 2: Deserialize all input messages
$items = [];
$successCount = 0;
$totalValue = 0;
foreach ($request->messages as $index => $messagePayload) {
try {
$item = new Item();
$item->mergeFromString($messagePayload);
// Validate item
if (empty($item->getName()) || $item->getValue() <= 0) {
return Response::error(
"Invalid item at index {$index}: name and positive value required"
);
}
$items[] = $item;
} catch (\Exception $e) {
return Response::error(
"Failed to decode message at index {$index}: {$e->getMessage()}"
);
}
}
if (empty($items)) {
return Response::error('At least one item is required');
}
// Step 3: Process all items atomically
try {
$this->repository->transaction(function () use ($items, &$successCount, &$totalValue): void {
foreach ($items as $item) {
$this->repository->create(
name: $item->getName(),
value: $item->getValue()
);
$successCount++;
$totalValue += $item->getValue();
}
});
} catch (\Exception $e) {
return Response::error("Transaction failed: {$e->getMessage()}");
}
// Step 4: Build aggregate response
$batchId = bin2hex(random_bytes(8));
$response = new BatchCreateResponse();
$response->setSuccessCount($successCount);
$response->setTotalValue($totalValue);
$response->setBatchId($batchId);
$response->setTimestamp((new \DateTime())->format('c'));
// Step 5: Serialize and return
return new Response(
payload: $response->serializeToString(),
metadata: [
'x-batch-id' => $batchId,
'x-count' => (string)$successCount,
]
);
} catch (\Exception $e) {
return Response::error("Error: {$e->getMessage()}");
}
}
public function handleRequest(\Spikard\Grpc\Request $request): Response
{
// Route unary requests
return match ($request->methodName) {
default => Response::error("Unknown method: {$request->methodName}"),
};
}
}
Bidirectional Streaming Handler¶
Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.
Handler Signature¶
<?php
declare(strict_types=1);
use Spikard\Grpc\BidiStreamRequest;
use Spikard\Grpc\BidiStreamResponse;
/**
* Bidirectional streaming handler signature
*/
final class ExampleBidiStreamHandler
{
public function handleBidiStream(BidiStreamRequest $request): BidiStreamResponse
{
// $request->serviceName: string
// $request->methodName: string
// $request->metadata: array<string, string>
// $request->messages: string[] // Array of serialized input messages
// Process input messages and generate output messages
// return new BidiStreamResponse(
// messages: string[], // Array of serialized response messages
// metadata: array<string, string>
// );
}
}
Example: Message Transformation Pipeline¶
<?php
declare(strict_types=1);
use Spikard\Grpc\HandlerInterface;
use Spikard\Grpc\BidiStreamRequest;
use Spikard\Grpc\BidiStreamResponse;
use Spikard\Grpc\Request;
use Spikard\Grpc\Response;
use Transformservice\Document;
use Transformservice\TransformResult;
class TransformServiceHandler implements HandlerInterface
{
public function __construct(
private DocumentTransformer $transformer,
) {}
/**
* Handle bidirectional streaming: receive multiple messages, send multiple messages
*
* Pattern: Collect all input messages, process, generate output messages, return array
*/
public function handleBidiStream(BidiStreamRequest $request): BidiStreamResponse
{
try {
// Step 1: Deserialize all input messages
$inputDocuments = [];
foreach ($request->messages as $index => $messagePayload) {
try {
$document = new Document();
$document->mergeFromString($messagePayload);
$inputDocuments[] = [
'index' => $index,
'document' => $document,
];
} catch (\Exception $e) {
// Create error response for this document
$errorResult = new TransformResult();
$errorResult->setStatus('ERROR');
$errorResult->setErrorMessage(
"Failed to decode message at index {$index}: {$e->getMessage()}"
);
// Still return partial response
return new BidiStreamResponse(
messages: [$errorResult->serializeToString()],
metadata: [
'x-error-at-index' => (string)$index,
]
);
}
}
if (empty($inputDocuments)) {
return new BidiStreamResponse(
messages: [],
metadata: ['x-processed-count' => '0']
);
}
// Step 2: Process each document and generate response
$outputMessages = [];
$processedCount = 0;
$errorCount = 0;
foreach ($inputDocuments as $input) {
try {
$document = $input['document'];
// Transform the document
$transformedContent = $this->transformer->transform(
$document->getContent()
);
// Build response message
$result = new TransformResult();
$result->setOriginalId($document->getId());
$result->setTransformedContent($transformedContent);
$result->setTransformedAt((new \DateTime())->format('c'));
$result->setStatus('SUCCESS');
// Add metadata about transformation
$metadata = new \Google\Protobuf\StringValue();
$metadata->setValue(json_encode([
'original_size' => strlen($document->getContent()),
'transformed_size' => strlen($transformedContent),
], JSON_THROW_ON_ERROR));
$result->setMetadata($metadata);
// Serialize and collect response
$outputMessages[] = $result->serializeToString();
$processedCount++;
} catch (\Exception $e) {
// Create error response for this document
$errorResult = new TransformResult();
$errorResult->setOriginalId($input['document']->getId());
$errorResult->setStatus('ERROR');
$errorResult->setErrorMessage($e->getMessage());
$outputMessages[] = $errorResult->serializeToString();
$errorCount++;
}
}
// Step 3: Return all response messages
return new BidiStreamResponse(
messages: $outputMessages,
metadata: [
'x-processed-count' => (string)$processedCount,
'x-error-count' => (string)$errorCount,
'x-timestamp' => (new \DateTime())->format('c'),
]
);
} catch (\Exception $e) {
// Return error response
$errorResult = new TransformResult();
$errorResult->setStatus('ERROR');
$errorResult->setErrorMessage("Stream processing failed: {$e->getMessage()}");
return new BidiStreamResponse(
messages: [$errorResult->serializeToString()],
metadata: ['x-error' => 'true']
);
}
}
public function handleRequest(Request $request): Response
{
// Route unary requests
return Response::error("Unknown method: {$request->methodName}");
}
}
Advanced Example: Filtering and Aggregation¶
Bidirectional Stream with Filtering¶
<?php
declare(strict_types=1);
use Spikard\Grpc\BidiStreamRequest;
use Spikard\Grpc\BidiStreamResponse;
use Recordservice\Record;
use Recordservice\ProcessedRecord;
class RecordServiceHandler
{
public function handleBidiStream(BidiStreamRequest $request): BidiStreamResponse
{
try {
// Parse filter criteria from metadata
$filterType = $request->getMetadata('x-filter-type') ?? 'all';
$minValue = (int)($request->getMetadata('x-min-value') ?? '0');
$maxValue = (int)($request->getMetadata('x-max-value') ?? PHP_INT_MAX);
// Step 1: Deserialize and filter
$filteredRecords = [];
$inputCount = count($request->messages);
foreach ($request->messages as $messagePayload) {
$record = new Record();
$record->mergeFromString($messagePayload);
// Apply filter logic
$value = $record->getValue();
if ($this->matchesFilter($record, $filterType, $minValue, $maxValue)) {
$filteredRecords[] = $record;
}
}
// Step 2: Transform filtered records
$outputMessages = [];
foreach ($filteredRecords as $record) {
$processed = new ProcessedRecord();
$processed->setId($record->getId());
$processed->setOriginalValue($record->getValue());
// Apply business logic: apply multiplier
$multiplier = $this->calculateMultiplier($record);
$processed->setProcessedValue($record->getValue() * $multiplier);
$processed->setFiltered(false);
$processed->setProcessedAt((new \DateTime())->format('c'));
$outputMessages[] = $processed->serializeToString();
}
// Step 3: Return response with statistics
return new BidiStreamResponse(
messages: $outputMessages,
metadata: [
'x-input-count' => (string)$inputCount,
'x-output-count' => (string)count($outputMessages),
'x-filtered-count' => (string)($inputCount - count($outputMessages)),
'x-filter-type' => $filterType,
]
);
} catch (\Exception $e) {
$errorResult = new ProcessedRecord();
$errorResult->setId('error');
return new BidiStreamResponse(
messages: [$errorResult->serializeToString()],
metadata: ['x-error' => $e->getMessage()]
);
}
}
private function matchesFilter(
Record $record,
string $filterType,
int $minValue,
int $maxValue
): bool {
return match ($filterType) {
'all' => true,
'high-value' => $record->getValue() >= $minValue && $record->getValue() <= $maxValue,
'range' => $record->getValue() >= $minValue && $record->getValue() <= $maxValue,
'low-value' => $record->getValue() < $minValue,
default => true,
};
}
private function calculateMultiplier(Record $record): float
{
return match (true) {
$record->getValue() > 1000 => 1.05,
$record->getValue() > 500 => 1.03,
default => 1.01,
};
}
}
Key Patterns¶
Message Collection¶
- All client messages are collected in the
messagesarray property - Messages are already serialized as binary strings (use
mergeFromString()to deserialize) - No streaming iteration needed - full array is provided
Processing Strategy¶
- Collect: All input messages received as array
- Validate: Check all messages before processing
- Transform: Process and generate output
- Serialize: Encode response messages with
serializeToString() - Return: Array of response messages in response object
Error Handling¶
- Return
Response::error()for fatal errors that abort the stream - Per-message errors can be included in response messages (with ERROR status)
- Deserialization errors should return error response immediately
- Processing errors can return partial results (successful messages + error messages)
Metadata¶
- Client streaming: Metadata passed in request, can be included in response
- Bidirectional streaming: Metadata passed in request, can be included in response
- Use metadata for non-payload information (timestamps, counts, filters)
Limits and Constraints¶
- MAX_STREAM_MESSAGES: 10,000 messages per stream
- Resource Exhaustion: Streams exceeding limit return error response
- Memory: All messages collected in memory - appropriate for moderate message counts
- Atomicity: All messages processed together (transaction-like semantics for client streaming)
Testing Client Streaming¶
<?php
declare(strict_types=1);
namespace Tests;
use PHPUnit\Framework\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use Spikard\Grpc\ClientStreamRequest;
use Messageservice\Item;
use Messageservice\BatchCreateResponse;
class ClientStreamHandlerTest extends TestCase
{
private MessageServiceHandler $handler;
private MockObject $repository;
protected function setUp(): void
{
$this->repository = $this->createMock(MessageRepository::class);
$this->handler = new MessageServiceHandler($this->repository);
}
public function testClientStreamBatchCreate(): void
{
// Setup mock
$this->repository
->expects($this->once())
->method('transaction')
->willReturnCallback(function (callable $callback): void {
$callback();
});
$this->repository
->expects($this->exactly(3))
->method('create')
->willReturn(true);
// Create multiple input messages
$items = [
$this->createItem('Item 1', 100),
$this->createItem('Item 2', 200),
$this->createItem('Item 3', 300),
];
$messages = array_map(
fn(Item $item) => $item->serializeToString(),
$items
);
// Create request
$request = new ClientStreamRequest(
serviceName: 'messageservice.MessageService',
methodName: 'BatchCreate',
messages: $messages,
metadata: ['authorization' => 'Bearer token']
);
// Call handler
$response = $this->handler->handleClientStream($request);
// Verify response
$this->assertFalse($response->isError());
$result = new BatchCreateResponse();
$result->mergeFromString($response->payload);
$this->assertEquals(3, $result->getSuccessCount());
$this->assertEquals(600, $result->getTotalValue());
$this->assertNotEmpty($result->getBatchId());
$this->assertEquals('3', $response->getMetadata('x-count'));
}
public function testClientStreamRequiresAuthentication(): void
{
// Create request without auth
$item = $this->createItem('Item 1', 100);
$request = new ClientStreamRequest(
serviceName: 'messageservice.MessageService',
methodName: 'BatchCreate',
messages: [$item->serializeToString()],
metadata: [] // No authorization
);
// Call handler
$response = $this->handler->handleClientStream($request);
// Should return auth error
$this->assertTrue($response->isError());
$this->assertStringContainsString('Authentication required', $response->errorMessage);
$this->assertEquals('16', $response->getMetadata('grpc-status'));
}
public function testClientStreamValidatesItems(): void
{
// Create item with invalid value
$item = $this->createItem('Item 1', -100);
$request = new ClientStreamRequest(
serviceName: 'messageservice.MessageService',
methodName: 'BatchCreate',
messages: [$item->serializeToString()],
metadata: ['authorization' => 'Bearer token']
);
// Call handler
$response = $this->handler->handleClientStream($request);
// Should return validation error
$this->assertTrue($response->isError());
$this->assertStringContainsString('positive value required', $response->errorMessage);
}
public function testClientStreamHandlesDeserializationError(): void
{
$request = new ClientStreamRequest(
serviceName: 'messageservice.MessageService',
methodName: 'BatchCreate',
messages: ['invalid protobuf data'],
metadata: ['authorization' => 'Bearer token']
);
// Call handler
$response = $this->handler->handleClientStream($request);
// Should return deserialization error
$this->assertTrue($response->isError());
$this->assertStringContainsString('Failed to decode', $response->errorMessage);
}
private function createItem(string $name, int $value): Item
{
$item = new Item();
$item->setName($name);
$item->setValue($value);
return $item;
}
}
Testing Bidirectional Streaming¶
<?php
declare(strict_types=1);
namespace Tests;
use PHPUnit\Framework\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use Spikard\Grpc\BidiStreamRequest;
use Transformservice\Document;
use Transformservice\TransformResult;
class BidiStreamHandlerTest extends TestCase
{
private TransformServiceHandler $handler;
private MockObject $transformer;
protected function setUp(): void
{
$this->transformer = $this->createMock(DocumentTransformer::class);
$this->handler = new TransformServiceHandler($this->transformer);
}
public function testBidiStreamTransformMultipleDocuments(): void
{
// Setup mock transformer
$this->transformer
->method('transform')
->willReturnCallback(fn(string $content) => strtoupper($content));
// Create input documents
$documents = [
$this->createDocument(1, 'hello world'),
$this->createDocument(2, 'foo bar'),
$this->createDocument(3, 'test content'),
];
$messages = array_map(
fn(Document $doc) => $doc->serializeToString(),
$documents
);
// Create request
$request = new BidiStreamRequest(
serviceName: 'transformservice.TransformService',
methodName: 'TransformStream',
messages: $messages,
metadata: ['authorization' => 'Bearer token']
);
// Call handler
$response = $this->handler->handleBidiStream($request);
// Verify response contains multiple messages
$this->assertCount(3, $response->messages);
// Verify each response message
foreach ($response->messages as $index => $messagePayload) {
$result = new TransformResult();
$result->mergeFromString($messagePayload);
$this->assertEquals('SUCCESS', $result->getStatus());
$this->assertNotEmpty($result->getTransformedContent());
$this->assertEquals((string)($index + 1), $result->getOriginalId());
}
// Verify metadata
$this->assertEquals('3', $response->getMetadata('x-processed-count'));
$this->assertEquals('0', $response->getMetadata('x-error-count'));
}
public function testBidiStreamPartialErrors(): void
{
// Setup transformer to fail on specific content
$this->transformer
->method('transform')
->willReturnCallback(function (string $content) {
if ($content === 'error') {
throw new \Exception('Transform failed');
}
return strtoupper($content);
});
// Create documents, one will fail
$documents = [
$this->createDocument(1, 'success'),
$this->createDocument(2, 'error'),
$this->createDocument(3, 'success'),
];
$messages = array_map(
fn(Document $doc) => $doc->serializeToString(),
$documents
);
$request = new BidiStreamRequest(
serviceName: 'transformservice.TransformService',
methodName: 'TransformStream',
messages: $messages,
metadata: []
);
// Call handler
$response = $this->handler->handleBidiStream($request);
// Should have response for all documents
$this->assertCount(3, $response->messages);
// Check results
$results = [];
foreach ($response->messages as $messagePayload) {
$result = new TransformResult();
$result->mergeFromString($messagePayload);
$results[] = $result->getStatus();
}
$this->assertContains('SUCCESS', $results);
$this->assertContains('ERROR', $results);
// Verify error count in metadata
$this->assertEquals('1', $response->getMetadata('x-error-count'));
}
public function testBidiStreamEmptyInput(): void
{
$request = new BidiStreamRequest(
serviceName: 'transformservice.TransformService',
methodName: 'TransformStream',
messages: [],
metadata: []
);
// Call handler
$response = $this->handler->handleBidiStream($request);
// Should return empty messages
$this->assertEmpty($response->messages);
$this->assertEquals('0', $response->getMetadata('x-processed-count'));
}
private function createDocument(int $id, string $content): Document
{
$document = new Document();
$document->setId((string)$id);
$document->setContent($content);
return $document;
}
}
Testing Filtering and Aggregation¶
<?php
declare(strict_types=1);
namespace Tests;
use PHPUnit\Framework\TestCase;
use Spikard\Grpc\BidiStreamRequest;
use Recordservice\Record;
use Recordservice\ProcessedRecord;
class FilterStreamHandlerTest extends TestCase
{
private RecordServiceHandler $handler;
protected function setUp(): void
{
$this->handler = new RecordServiceHandler();
}
/**
* @dataProvider filterTypeProvider
*/
public function testBidiStreamWithFilters(
string $filterType,
int $minValue,
int $expectedOutputCount
): void {
// Create records with various values
$records = [
$this->createRecord('1', 100),
$this->createRecord('2', 500),
$this->createRecord('3', 1500),
$this->createRecord('4', 2500),
];
$messages = array_map(
fn(Record $record) => $record->serializeToString(),
$records
);
$request = new BidiStreamRequest(
serviceName: 'recordservice.RecordService',
methodName: 'FilterStream',
messages: $messages,
metadata: [
'x-filter-type' => $filterType,
'x-min-value' => (string)$minValue,
]
);
// Call handler
$response = $this->handler->handleBidiStream($request);
// Verify filtered count
$this->assertCount($expectedOutputCount, $response->messages);
$this->assertEquals(
(string)$expectedOutputCount,
$response->getMetadata('x-output-count')
);
}
public static function filterTypeProvider(): array
{
return [
'all records' => ['all', 0, 4],
'high value (>=500)' => ['high-value', 500, 3],
'range (>=1000, <=2000)' => ['range', 1000, 1],
'low value (<500)' => ['low-value', 500, 1],
];
}
public function testBidiStreamAppliesMultiplier(): void
{
$record = $this->createRecord('1', 1500);
$request = new BidiStreamRequest(
serviceName: 'recordservice.RecordService',
methodName: 'FilterStream',
messages: [$record->serializeToString()],
metadata: ['x-filter-type' => 'all']
);
$response = $this->handler->handleBidiStream($request);
// Parse response
$result = new ProcessedRecord();
$result->mergeFromString($response->messages[0]);
// Verify multiplier applied (1500 >= 1000, so multiplier = 1.05)
$expectedValue = 1500 * 1.05;
$this->assertEquals($expectedValue, $result->getProcessedValue());
}
private function createRecord(string $id, int $value): Record
{
$record = new Record();
$record->setId($id);
$record->setValue($value);
return $record;
}
}
Comparison with Other Patterns¶
| Aspect | Client Streaming | Bidirectional | Unary |
|---|---|---|---|
| Input | Multiple messages | Multiple messages | Single message |
| Output | Single response | Multiple messages | Single response |
| Use case | Batch operations | Stream processing | Simple requests |
| Message order | Important | Important | N/A |
| Atomicity | Full batch atomic | Per-message or batch | Single atomic |
| PHP Pattern | handleClientStream() | handleBidiStream() | handleRequest() |
Key PHP-Specific Patterns¶
Using serializeToString() and mergeFromString()¶
- Always use
mergeFromString()to deserialize protobuf messages - Always use
serializeToString()to serialize protobuf responses - Never use
parse()for deserialization in handlers
Error Handling¶
- Prefer returning error responses over throwing exceptions
- Use try-catch to handle deserialization and business logic errors
- Include context in error messages (index, original value, etc.)
Metadata Access¶
- Use
getMetadata(string $key)to safely access metadata with null coalescing - Return metadata in response for logging and debugging
- Use standard header names (e.g., 'x-batch-id', 'x-count')
Type Hints¶
- Use strict type hints on all function parameters
- Return types must be
ResponseorBidiStreamResponse - Use
declare(strict_types=1)in all files
See the gRPC documentation for more examples.
Rust gRPC Streaming Handlers¶
Complete Rust implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.
Client Streaming Handler¶
Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.
Handler Signature¶
use bytes::Bytes;
use spikard_http::grpc::{GrpcClientStreamRequest, GrpcResponse};
use tonic::metadata::MetadataMap;
pub struct GrpcClientStreamRequest {
/// Service name (e.g., "myservice.MyService")
pub service_name: String,
/// Method name (e.g., "BatchCreate")
pub method_name: String,
/// Request metadata from client
pub metadata: MetadataMap,
/// All client messages collected as Vec
pub messages: Vec<Bytes>,
}
async fn handle_client_stream(
request: GrpcClientStreamRequest,
) -> Result<GrpcResponse, tonic::Status> {
// Process all messages and return single response
}
Example: Batch Message Processing¶
use bytes::Bytes;
use prost::Message;
use spikard_http::grpc::{GrpcClientStreamRequest, GrpcResponse};
use tonic::{Status, metadata::MetadataMap};
use std::sync::Arc;
// Generated protobuf types
mod messageservice {
include!("messageservice.rs"); // Generated by prost
}
/// Handle client streaming: receive multiple messages, send single response.
///
/// Pattern: Collect all input messages, process together, return aggregated response.
pub async fn handle_batch_create(
request: GrpcClientStreamRequest,
repository: Arc<dyn ItemRepository + Send + Sync>,
) -> Result<GrpcResponse, Status> {
let messages = request.messages;
let metadata = request.metadata;
// Validate authorization
let auth_token = metadata
.get("authorization")
.and_then(|v| v.to_str().ok());
if auth_token.is_none() {
return Err(Status::unauthenticated("Authentication required"));
}
// Step 1: Deserialize all input messages
let mut items = Vec::with_capacity(messages.len());
for (index, msg) in messages.iter().enumerate() {
let item = messageservice::Item::decode(msg.clone())
.map_err(|e| {
Status::invalid_argument(format!("Failed to decode message {}: {}", index, e))
})?;
items.push(item);
}
// Validate all items before processing
for item in &items {
if item.name.is_empty() || item.value == 0 {
return Err(Status::invalid_argument(
"Each item must have name and value"
));
}
}
// Step 2: Process all items atomically
let mut success_count = 0u32;
let mut total_value = 0i32;
// Simulate database transaction
repository.with_transaction(|tx| async move {
for item in items {
tx.items().create(&item.name, item.value).await
.map_err(|e| Status::internal(format!("Transaction failed: {}", e)))?;
success_count += 1;
total_value += item.value;
}
Ok::<_, Status>(())
}).await?;
// Step 3: Build aggregate response
let batch_id = uuid::Uuid::new_v4().to_string();
let timestamp = chrono::Utc::now().to_rfc3339();
let response = messageservice::BatchCreateResponse {
success_count,
total_value,
batch_id: batch_id.clone(),
timestamp,
};
// Step 4: Serialize and return
let mut buf = Vec::new();
response.encode(&mut buf)
.map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
let mut response_metadata = MetadataMap::new();
response_metadata.insert(
"x-batch-id",
batch_id.parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
response_metadata.insert(
"x-count",
success_count.to_string().parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
Ok(GrpcResponse {
payload: Bytes::from(buf),
metadata: response_metadata,
})
}
// Mock trait for demonstration
#[async_trait::async_trait]
pub trait ItemRepository {
async fn with_transaction<F, Fut, T>(&self, f: F) -> Result<T, Box<dyn std::error::Error>>
where
F: FnOnce(&dyn Transaction) -> Fut + Send,
Fut: std::future::Future<Output = Result<T, Status>> + Send;
}
#[async_trait::async_trait]
pub trait Transaction: Send + Sync {
fn items(&self) -> &dyn ItemStore;
}
#[async_trait::async_trait]
pub trait ItemStore: Send + Sync {
async fn create(&self, name: &str, value: i32) -> Result<(), Box<dyn std::error::Error>>;
}
Bidirectional Streaming Handler¶
Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.
Handler Signature¶
use bytes::Bytes;
use spikard_http::grpc::{GrpcBidiStreamRequest, GrpcBidiStreamResponse};
use tonic::metadata::MetadataMap;
pub struct GrpcBidiStreamRequest {
/// Service name (e.g., "myservice.MyService")
pub service_name: String,
/// Method name (e.g., "TransformStream")
pub method_name: String,
/// Request metadata from client
pub metadata: MetadataMap,
/// All input messages collected as Vec
pub messages: Vec<Bytes>,
}
pub struct GrpcBidiStreamResponse {
/// Array of response messages
pub messages: Vec<Bytes>,
/// Response metadata
pub metadata: MetadataMap,
}
async fn handle_bidi_stream(
request: GrpcBidiStreamRequest,
) -> Result<GrpcBidiStreamResponse, tonic::Status> {
// Process input messages and generate output messages
}
Example: Message Transformation Pipeline¶
use bytes::Bytes;
use prost::Message;
use spikard_http::grpc::{GrpcBidiStreamRequest, GrpcBidiStreamResponse};
use tonic::{Status, metadata::MetadataMap};
// Generated protobuf types
mod transformservice {
include!("transformservice.rs"); // Generated by prost
}
/// Handle bidirectional streaming: receive multiple messages, send multiple messages.
///
/// Pattern: Collect all input messages, process, generate output messages, return Vec.
pub async fn handle_transform_stream(
request: GrpcBidiStreamRequest,
) -> Result<GrpcBidiStreamResponse, Status> {
let messages = request.messages;
let _metadata = request.metadata;
// Step 1: Deserialize all input messages
let mut input_documents = Vec::with_capacity(messages.len());
for (index, msg) in messages.iter().enumerate() {
let document = transformservice::Document::decode(msg.clone())
.map_err(|e| {
Status::invalid_argument(format!("Failed to decode message {}: {}", index, e))
})?;
input_documents.push((index, document));
}
// Step 2: Process each document and generate response
let mut output_messages = Vec::with_capacity(input_documents.len());
for (index, document) in input_documents {
match transform_document(&document).await {
Ok(transformed) => {
// Build response message
let mut metadata_map = std::collections::HashMap::new();
metadata_map.insert(
"original_size".to_string(),
document.content.len().to_string(),
);
metadata_map.insert(
"transformed_size".to_string(),
transformed.content.len().to_string(),
);
let result = transformservice::TransformResult {
original_id: document.id,
transformed_content: transformed.content,
transformed_at: chrono::Utc::now().to_rfc3339(),
status: if transformed.success { "SUCCESS" } else { "PARTIAL" }.to_string(),
metadata: metadata_map,
error_message: String::new(),
};
// Serialize response message
let mut buf = Vec::new();
result.encode(&mut buf)
.map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
output_messages.push(Bytes::from(buf));
}
Err(err) => {
// Create error response for this document
let error_result = transformservice::TransformResult {
original_id: document.id,
transformed_content: String::new(),
transformed_at: String::new(),
status: "ERROR".to_string(),
metadata: std::collections::HashMap::new(),
error_message: err.to_string(),
};
let mut buf = Vec::new();
error_result.encode(&mut buf)
.map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
output_messages.push(Bytes::from(buf));
}
}
}
// Step 3: Return all response messages
let mut response_metadata = MetadataMap::new();
response_metadata.insert(
"x-processed-count",
output_messages.len().to_string().parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
response_metadata.insert(
"x-timestamp",
chrono::Utc::now().to_rfc3339().parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
Ok(GrpcBidiStreamResponse {
messages: output_messages,
metadata: response_metadata,
})
}
struct TransformResult {
content: String,
success: bool,
}
async fn transform_document(doc: &transformservice::Document) -> Result<TransformResult, Box<dyn std::error::Error>> {
// Simulate async transformation work
Ok(TransformResult {
content: doc.content.to_uppercase(),
success: true,
})
}
Advanced Example: Filtering and Aggregation¶
Bidirectional Stream with Filtering¶
use bytes::Bytes;
use prost::Message;
use spikard_http::grpc::{GrpcBidiStreamRequest, GrpcBidiStreamResponse};
use tonic::{Status, metadata::MetadataMap};
// Generated protobuf types
mod recordservice {
include!("recordservice.rs"); // Generated by prost
}
/// Filter records and apply transformations in bidirectional stream.
///
/// Pattern: Filter input based on metadata criteria, transform, return filtered output.
pub async fn handle_filter_stream(
request: GrpcBidiStreamRequest,
) -> Result<GrpcBidiStreamResponse, Status> {
let messages = request.messages;
let metadata = request.metadata;
// Parse filter criteria from metadata
let filter_type = metadata
.get("x-filter-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("all");
let min_value = metadata
.get("x-min-value")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(0);
// Step 1: Deserialize and filter
let mut filtered_items = Vec::new();
for msg in messages.iter() {
let item = recordservice::Record::decode(msg.clone())
.map_err(|e| Status::invalid_argument(format!("Failed to decode: {}", e)))?;
// Apply filter logic
if filter_type == "all" || (filter_type == "high-value" && item.value >= min_value) {
filtered_items.push(item);
}
}
// Step 2: Transform filtered items
let mut output_messages = Vec::with_capacity(filtered_items.len());
for item in filtered_items {
let response = recordservice::ProcessedRecord {
id: item.id,
original_value: item.value,
processed_value: (item.value as f64 * 1.1) as i32, // Apply multiplier
filtered: false,
};
let mut buf = Vec::new();
response.encode(&mut buf)
.map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
output_messages.push(Bytes::from(buf));
}
// Step 3: Return response with statistics
let mut response_metadata = MetadataMap::new();
response_metadata.insert(
"x-input-count",
messages.len().to_string().parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
response_metadata.insert(
"x-output-count",
output_messages.len().to_string().parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
response_metadata.insert(
"x-filtered-count",
(messages.len() - output_messages.len()).to_string().parse()
.map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
);
Ok(GrpcBidiStreamResponse {
messages: output_messages,
metadata: response_metadata,
})
}
Key Patterns¶
Message Collection¶
- All client messages are collected in a single
messages: Vec<Bytes> - Messages are provided as
Bytesobjects (protobuf serialized) - No streaming iteration needed - full Vec is provided
- Order of messages is preserved
Processing Strategy¶
- Collect: All input messages received as Vec
- Validate: Check all messages before processing (use
?operator) - Transform: Process and generate output
- Serialize: Encode response messages as Bytes
- Return: Vec of response message Bytes wrapped in Result
Error Handling¶
- Return
Result<T, tonic::Status>for all handlers - Use
?operator for error propagation (NO.unwrap()) - Map domain errors to appropriate Status codes:
Status::invalid_argument()for validation errorsStatus::unauthenticated()for auth failuresStatus::not_found()for missing resourcesStatus::internal()for processing errors- Use
.map_err()to convert errors to Status - Per-message errors can be included in response messages (with ERROR status)
Metadata¶
- Client streaming: Metadata passed in request, can be included in response
- Bidirectional streaming: Metadata passed in request, can be included in response
- Use metadata for non-payload information (timestamps, counts, filters)
- Access with
metadata.get(key).and_then(|v| v.to_str().ok()) - Provide defaults with
.unwrap_or()or.unwrap_or_default()
Limits and Constraints¶
- MAX_STREAM_MESSAGES: 10,000 messages per stream
- Resource Exhaustion: Streams exceeding limit return
RESOURCE_EXHAUSTEDstatus - Memory: All messages collected in memory - appropriate for moderate message counts
- Atomicity: All messages processed together (transaction-like semantics)
Testing Client Streaming¶
#[cfg(test)]
mod tests {
use super::*;
use prost::Message;
use bytes::Bytes;
use tonic::metadata::MetadataMap;
mod messageservice {
include!("messageservice.rs");
}
#[tokio::test]
async fn test_batch_create_success() {
// Create multiple input messages
let items = vec![
messageservice::Item {
name: "Item 1".to_string(),
value: 100,
},
messageservice::Item {
name: "Item 2".to_string(),
value: 200,
},
messageservice::Item {
name: "Item 3".to_string(),
value: 300,
},
];
let messages: Vec<Bytes> = items
.iter()
.map(|item| {
let mut buf = Vec::new();
item.encode(&mut buf).expect("Failed to encode Item protobuf");
Bytes::from(buf)
})
.collect();
// Create request
let mut metadata = MetadataMap::new();
metadata.insert("authorization", "Bearer token".parse().expect("Failed to parse authorization header"));
let request = GrpcClientStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "BatchCreate".to_string(),
metadata,
messages,
};
// Mock repository
let repository = Arc::new(MockItemRepository::new());
// Call handler
let response = handle_batch_create(request, repository).await.expect("Handler call failed");
// Verify response
let result = messageservice::BatchCreateResponse::decode(response.payload)
.expect("Failed to decode BatchCreateResponse");
assert_eq!(result.success_count, 3);
assert_eq!(result.total_value, 600);
assert!(response.metadata.get("x-batch-id").is_some());
assert_eq!(
response.metadata.get("x-count").expect("Missing x-count header").to_str().expect("Invalid x-count header"),
"3"
);
}
#[tokio::test]
async fn test_batch_create_missing_authorization() {
// Create request without authorization
let items = vec![messageservice::Item {
name: "Item 1".to_string(),
value: 100,
}];
let messages: Vec<Bytes> = items
.iter()
.map(|item| {
let mut buf = Vec::new();
item.encode(&mut buf).expect("Failed to encode Item protobuf");
Bytes::from(buf)
})
.collect();
let request = GrpcClientStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "BatchCreate".to_string(),
metadata: MetadataMap::new(), // No authorization
messages,
};
let repository = Arc::new(MockItemRepository::new());
// Should return Unauthenticated error
let result = handle_batch_create(request, repository).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::Unauthenticated);
assert!(err.message().contains("Authentication required"));
}
#[tokio::test]
async fn test_batch_create_invalid_item() {
// Create request with invalid item (missing value)
let items = vec![messageservice::Item {
name: "Item 1".to_string(),
value: 0, // Invalid: value is 0
}];
let messages: Vec<Bytes> = items
.iter()
.map(|item| {
let mut buf = Vec::new();
item.encode(&mut buf).expect("Failed to encode Item protobuf");
Bytes::from(buf)
})
.collect();
let mut metadata = MetadataMap::new();
metadata.insert("authorization", "Bearer token".parse().expect("Failed to parse authorization header"));
let request = GrpcClientStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "BatchCreate".to_string(),
metadata,
messages,
};
let repository = Arc::new(MockItemRepository::new());
// Should return InvalidArgument error
let result = handle_batch_create(request, repository).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("must have name and value"));
}
#[tokio::test]
async fn test_batch_create_malformed_message() {
// Create request with malformed protobuf
let messages = vec![Bytes::from("invalid protobuf data")];
let mut metadata = MetadataMap::new();
metadata.insert("authorization", "Bearer token".parse().expect("Failed to parse authorization header"));
let request = GrpcClientStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "BatchCreate".to_string(),
metadata,
messages,
};
let repository = Arc::new(MockItemRepository::new());
// Should return InvalidArgument error for decode error
let result = handle_batch_create(request, repository).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("Failed to decode message"));
}
// Mock implementation for testing
struct MockItemRepository;
impl MockItemRepository {
fn new() -> Self {
Self
}
}
#[async_trait::async_trait]
impl ItemRepository for MockItemRepository {
async fn with_transaction<F, Fut, T>(&self, f: F) -> Result<T, Box<dyn std::error::Error>>
where
F: FnOnce(&dyn Transaction) -> Fut + Send,
Fut: std::future::Future<Output = Result<T, Status>> + Send,
{
let tx = MockTransaction;
f(&tx).await.map_err(|e| e.to_string().into())
}
}
struct MockTransaction;
impl Transaction for MockTransaction {
fn items(&self) -> &dyn ItemStore {
&MockItemStore
}
}
struct MockItemStore;
#[async_trait::async_trait]
impl ItemStore for MockItemStore {
async fn create(&self, _name: &str, _value: i32) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}
}
Testing Bidirectional Streaming¶
#[cfg(test)]
mod bidi_tests {
use super::*;
use prost::Message;
use bytes::Bytes;
use tonic::metadata::MetadataMap;
mod transformservice {
include!("transformservice.rs");
}
#[tokio::test]
async fn test_transform_stream_success() {
// Create input messages
let documents = vec![
transformservice::Document {
id: 1,
content: "hello world".to_string(),
},
transformservice::Document {
id: 2,
content: "goodbye world".to_string(),
},
];
let messages: Vec<Bytes> = documents
.iter()
.map(|doc| {
let mut buf = Vec::new();
doc.encode(&mut buf).expect("Failed to encode Document protobuf");
Bytes::from(buf)
})
.collect();
// Create request
let request = GrpcBidiStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "TransformStream".to_string(),
metadata: MetadataMap::new(),
messages,
};
// Call handler
let response = handle_transform_stream(request).await.expect("Handler call failed");
// Verify multiple response messages
assert_eq!(response.messages.len(), 2);
assert!(response.metadata.get("x-processed-count").is_some());
assert_eq!(
response.metadata.get("x-processed-count").expect("Missing x-processed-count header").to_str().expect("Invalid x-processed-count header"),
"2"
);
// Verify each response
for msg in response.messages {
let result = transformservice::TransformResult::decode(msg).expect("Failed to decode TransformResult");
assert_eq!(result.status, "SUCCESS");
assert!(!result.transformed_content.is_empty());
assert!(
result.transformed_content.contains("HELLO") ||
result.transformed_content.contains("GOODBYE")
);
}
}
#[tokio::test]
async fn test_filter_stream_high_value() {
mod recordservice {
include!("recordservice.rs");
}
let records = vec![
recordservice::Record { id: 1, value: 50 },
recordservice::Record { id: 2, value: 150 },
recordservice::Record { id: 3, value: 250 },
];
let messages: Vec<Bytes> = records
.iter()
.map(|rec| {
let mut buf = Vec::new();
rec.encode(&mut buf).expect("Failed to encode Record protobuf");
Bytes::from(buf)
})
.collect();
let mut metadata = MetadataMap::new();
metadata.insert("x-filter-type", "high-value".parse().expect("Failed to parse filter-type header"));
metadata.insert("x-min-value", "100".parse().expect("Failed to parse min-value header"));
let request = GrpcBidiStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "FilterStream".to_string(),
metadata,
messages,
};
let response = handle_filter_stream(request).await.expect("Handler call failed");
// Only records with value >= 100 should be returned
assert_eq!(response.messages.len(), 2);
assert_eq!(
response.metadata.get("x-output-count").expect("Missing x-output-count header").to_str().expect("Invalid x-output-count header"),
"2"
);
assert_eq!(
response.metadata.get("x-filtered-count").expect("Missing x-filtered-count header").to_str().expect("Invalid x-filtered-count header"),
"1"
);
// Verify output values
for msg in response.messages {
let result = recordservice::ProcessedRecord::decode(msg).expect("Failed to decode ProcessedRecord");
assert!(result.original_value >= 100);
}
}
#[tokio::test]
async fn test_filter_stream_malformed_input() {
let messages = vec![Bytes::from("invalid protobuf")];
let mut metadata = MetadataMap::new();
metadata.insert("x-filter-type", "all".parse().expect("Failed to parse filter-type header"));
let request = GrpcBidiStreamRequest {
service_name: "myservice.MyService".to_string(),
method_name: "FilterStream".to_string(),
metadata,
messages,
};
// Should return InvalidArgument error
let result = handle_filter_stream(request).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("Failed to decode"));
}
}
Running Tests¶
# Run all streaming tests
cargo test --package myservice
# Run specific test
cargo test test_batch_create_success
# Run with output
cargo test -- --nocapture
# Run async tests only
cargo test --features tokio-test
Comparison with Other Patterns¶
| Aspect | Client Streaming | Bidirectional | Unary |
|---|---|---|---|
| Input | Multiple messages | Multiple messages | Single message |
| Output | Single response | Multiple messages | Single response |
| Use case | Batch operations | Stream processing | Simple requests |
| Message order | Important | Important | N/A |
| Atomicity | Full batch atomic | Per-message or batch | Single atomic |
Common Pitfalls¶
1. Using .unwrap() Instead of ? Operator¶
// WRONG: Using .unwrap() causes panics
let item = messageservice::Item::decode(msg).unwrap();
// CORRECT: Use ? operator with map_err
let item = messageservice::Item::decode(msg)
.map_err(|e| Status::invalid_argument(format!("Decode failed: {}", e)))?;
2. Not Handling All Message Errors¶
// WRONG: Failing entire stream on first error
for msg in messages {
let item = messageservice::Item::decode(msg)?; // Stops on first error
}
// CORRECT: Handle per-message errors
for msg in messages {
match messageservice::Item::decode(msg) {
Ok(item) => { /* process */ },
Err(e) => {
// Generate error response for this message
let error_result = messageservice::ItemResult {
status: "ERROR".to_string(),
error_message: e.to_string(),
..Default::default()
};
let mut buf = Vec::new();
error_result.encode(&mut buf)?;
output_messages.push(Bytes::from(buf));
}
}
}
3. Forgetting to Serialize Response Messages¶
// WRONG: Returning protobuf objects instead of Bytes
let response = messageservice::Item { name: "test".to_string(), value: 1 };
output_messages.push(response); // Type error!
// CORRECT: Serialize to Bytes
let response = messageservice::Item { name: "test".to_string(), value: 1 };
let mut buf = Vec::new();
response.encode(&mut buf)?;
output_messages.push(Bytes::from(buf));
4. Blocking Operations in Async Context¶
// WRONG: Blocking operation in async handler
async fn handle_batch_create(request: GrpcClientStreamRequest) -> Result<GrpcResponse, Status> {
std::thread::sleep(std::time::Duration::from_secs(1)); // Blocks entire executor!
Ok(response)
}
// CORRECT: Use async/await
async fn handle_batch_create(request: GrpcClientStreamRequest) -> Result<GrpcResponse, Status> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await; // Non-blocking
Ok(response)
}
See the gRPC documentation for more examples.
Part 8: Best Practices¶
Project Structure¶
project/
+-- proto/
| +-- user/v1/
| | +-- user.proto
| | +-- user_service.proto
| +-- common/v1/
| +-- types.proto
+-- generated/
+-- python/
+-- typescript/
+-- ruby/
Schema Evolution¶
- Use packages with versioning:
package company.user.v1; - Reserve deleted field numbers:
reserved 2, 15, 9 to 11; - Document everything: Add comments to services and fields
- Design for evolution: Wrapper responses allow adding metadata later
Next Steps¶
- Getting Started with gRPC - Step-by-step tutorial
- ADR 0010 - Implementation details
- Proto3 Language Guide
- gRPC Core Concepts
Summary¶
You've learned proto3 syntax, type mapping across languages, code generation, handler implementation, error handling, and testing patterns. gRPC with Spikard gives you type-safe, high-performance APIs with cross-language support.