Skip to content

Protobuf/gRPC Guide

Build high-performance, type-safe gRPC services with Spikard. This guide covers proto3 syntax, code generation, and service implementation across all supported languages.

What You'll Learn

  • Write .proto files with proto3 syntax
  • Generate type-safe code with spikard generate protobuf
  • Implement gRPC service handlers
  • Handle errors with gRPC status codes
  • Use streaming patterns

Why gRPC and Protobuf?

Protocol Buffers provide type safety, 3-5x smaller payloads than JSON, cross-language code generation, and schema evolution support.

gRPC adds HTTP/2 multiplexing, four streaming modes, standardized error codes, and metadata support.

Part 1: Proto3 Syntax

Basic Service Definition

UserService Proto Definition

Basic protobuf definition for a UserService with GetUser and CreateUser methods.

syntax = "proto3";

package userservice;

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
}

message GetUserRequest {
  int32 id = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
  string created_at = 4;
}

Usage

This proto definition is used across all language examples in the Spikard documentation. It demonstrates:

  • Service definition: Defines the RPC methods available
  • Request messages: Input types for each RPC method
  • Response messages: Output types returned by the service
  • Field numbering: Each field has a unique number for wire format compatibility

Key concepts:

  1. syntax = "proto3": Required (Spikard only supports proto3)
  2. package: Namespace for types (use versioning: v1, v2)
  3. Field numbers: Unique identifiers (1-536870911) that must never change
  4. Field labels: optional, repeated, or none

Scalar Types

Proto3 Type Description Example
int32 32-bit signed integer 42, -100
int64 64-bit signed integer 9223372036854775807
uint32 32-bit unsigned integer 42
uint64 64-bit unsigned integer 18446744073709551615
float 32-bit floating point 3.14
double 64-bit floating point 3.141592653589793
bool Boolean true, false
string UTF-8 string "Hello, World!"
bytes Binary data b"\x00\x01\x02"

Enums

enum UserStatus {
  USER_STATUS_UNSPECIFIED = 0;  // Always have a zero value
  ACTIVE = 1;
  INACTIVE = 2;
  SUSPENDED = 3;
}

Best practices: Always include a zero-value, use UPPER_SNAKE_CASE, prefix values with enum name.

Service Definitions

service UserService {
  // Unary RPC
  rpc GetUser(GetUserRequest) returns (User) {}

  // Server streaming
  rpc ListUsers(ListUsersRequest) returns (stream User) {}

  // Client streaming
  rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse) {}

  // Bidirectional streaming
  rpc Chat(stream ChatMessage) returns (stream ChatMessage) {}
}

Part 2: Type Mapping

Protobuf Type Mapping Across Languages

Reference table showing how proto3 types map to native language types in all Spikard-supported languages.

Scalar Types

Proto3 Type Python TypeScript Ruby PHP Rust
double float number Float float f64
float float number Float float f32
int32 int number Integer int i32
int64 int number\|bigint Integer int i64
uint32 int number Integer int u32
uint64 int number\|bigint Integer int u64
bool bool boolean Boolean bool bool
string str string String string String
bytes bytes Uint8Array String string Bytes

Complex Types

Proto3 Type Python TypeScript Ruby PHP Rust
message dataclass interface class class struct
enum Literal[...] union type module class enum
repeated T list[T] T[] Array<T> array<T> Vec<T>
map<K,V> dict[K,V] Map<K,V> Hash{K=>V} array<K,V> HashMap<K,V>
optional T T\|None T\|undefined T\|nil ?T Option<T>

Default Values in Proto3

When a field is not set, proto3 uses these default values:

Type Default Value
Numbers 0
Strings "" (empty string)
Booleans false
Enums First value (must be 0)
Messages None/null/undefined
Repeated Empty list
Maps Empty map

Optional Field Handling

Use optional keyword to distinguish between "not set" and "default value":

message User {
  int32 id = 1;                // Never null (defaults to 0)
  string name = 2;             // Never null (defaults to "")
  optional string email = 3;   // Can be null/None/undefined
  optional int32 age = 4;      // Can be null (distinguishes 0 from unset)
}

Checking Optional Fields by Language

Python:

if user.HasField("email"):
    print(f"Email: {user.email}")

TypeScript:

if (user.email !== undefined) {
    console.log(`Email: ${user.email}`);
}

Ruby:

if user.respond_to?(:email) && user.email
    puts "Email: #{user.email}"
end

PHP:

if ($user->hasEmail()) {
    echo "Email: " . $user->getEmail();
}

Rust:

if let Some(email) = &user.email {
    println!("Email: {}", email);
}

Part 3: Code Generation

Installation

cargo install spikard-cli

Generate Code

Code Generation Commands

Generate protobuf code for each language:

Python:

protoc --python_out=. user_service.proto

TypeScript:

# Install protobufjs CLI
npm install -g protobufjs-cli

# Generate TypeScript definitions
pbjs -t static-module -w commonjs -o user_service.js user_service.proto
pbts -o user_service.d.ts user_service.js

Ruby:

grpc_tools_ruby_protoc --ruby_out=. user_service.proto

PHP:

protoc --php_out=. user_service.proto

Rust (add to build.rs):

fn main() {
    prost_build::compile_protos(&["user_service.proto"], &["."]).unwrap();
}

Part 4: Implementing Handlers

Python gRPC Handler

Complete Python handler implementation for UserService with GetUser and CreateUser methods.

from spikard.grpc import GrpcHandler, GrpcRequest, GrpcResponse
import userservice_pb2  # Generated from proto
from datetime import datetime


class UserServiceHandler(GrpcHandler):
    """UserService gRPC handler implementation."""

    def __init__(self, user_repository):
        """Initialize handler with dependencies."""
        self.user_repository = user_repository

    async def handle_request(self, request: GrpcRequest) -> GrpcResponse:
        """
        Handle incoming gRPC requests.

        Routes to appropriate method based on request.method_name.
        """
        if request.method_name == "GetUser":
            return await self._get_user(request)
        elif request.method_name == "CreateUser":
            return await self._create_user(request)
        else:
            raise NotImplementedError(f"Unknown method: {request.method_name}")

    async def _get_user(self, request: GrpcRequest) -> GrpcResponse:
        """Handle GetUser RPC."""
        # 1. Deserialize request
        req = userservice_pb2.GetUserRequest()
        req.ParseFromString(request.payload)

        # 2. Validate input
        if req.id <= 0:
            raise ValueError("User ID must be positive")

        # 3. Business logic
        user = await self.user_repository.find_by_id(req.id)
        if not user:
            raise ValueError(f"User {req.id} not found")

        # 4. Build response
        response_user = userservice_pb2.User()
        response_user.id = user.id
        response_user.name = user.name
        response_user.email = user.email
        response_user.created_at = user.created_at.isoformat()

        # 5. Serialize and return
        return GrpcResponse(
            payload=response_user.SerializeToString(),
            metadata={"x-user-found": "true"}
        )

    async def _create_user(self, request: GrpcRequest) -> GrpcResponse:
        """Handle CreateUser RPC."""
        # 1. Deserialize request
        req = userservice_pb2.CreateUserRequest()
        req.ParseFromString(request.payload)

        # 2. Validate input
        if not req.name or not req.email:
            raise ValueError("Name and email are required")

        # 3. Check authorization from metadata
        auth_token = request.get_metadata("authorization")
        if not auth_token:
            raise PermissionError("Authentication required")

        # 4. Business logic
        user = await self.user_repository.create(
            name=req.name,
            email=req.email
        )

        # 5. Build response
        response_user = userservice_pb2.User()
        response_user.id = user.id
        response_user.name = user.name
        response_user.email = user.email
        response_user.created_at = datetime.utcnow().isoformat()

        # 6. Serialize with metadata
        return GrpcResponse(
            payload=response_user.SerializeToString(),
            metadata={
                "x-user-id": str(user.id),
                "x-created": "true"
            }
        )

Key Patterns

  • Async/await: All handlers are async for non-blocking I/O
  • ParseFromString(): Deserializes binary protobuf to Python object
  • SerializeToString(): Serializes Python object to binary protobuf
  • Exception mapping: ValueError -> INVALID_ARGUMENT, PermissionError -> PERMISSION_DENIED
  • Metadata access: request.get_metadata(key) returns str | None

Registration

from spikard import create_app
from spikard.grpc import GrpcService

# Create app
app = create_app()

# Create service registry
grpc_service = GrpcService()

# Register handler
handler = UserServiceHandler(user_repository=UserRepository())
grpc_service.register_handler("userservice.UserService", handler)

# App is now ready to serve gRPC requests

TypeScript gRPC Handler

Complete TypeScript handler implementation for UserService with GetUser and CreateUser methods.

import {
  GrpcHandler,
  GrpcRequest,
  GrpcResponse,
  GrpcError,
  GrpcStatusCode,
  createServiceHandler,
  createUnaryHandler,
} from 'spikard';
import * as userservice from './userservice_pb';  // Generated protobufjs types

class UserServiceHandler implements GrpcHandler {
  constructor(private userRepository: UserRepository) {}

  async handleRequest(request: GrpcRequest): Promise<GrpcResponse> {
    /**
     * Handle incoming gRPC requests.
     * Routes to appropriate method based on request.methodName.
     */
    switch (request.methodName) {
      case 'GetUser':
        return this.getUser(request);
      case 'CreateUser':
        return this.createUser(request);
      default:
        throw new GrpcError(
          GrpcStatusCode.UNIMPLEMENTED,
          `Method ${request.methodName} not implemented`
        );
    }
  }

  private async getUser(request: GrpcRequest): Promise<GrpcResponse> {
    // 1. Deserialize request
    const req = userservice.GetUserRequest.decode(request.payload);

    // 2. Validate input
    if (req.id <= 0) {
      throw new GrpcError(
        GrpcStatusCode.INVALID_ARGUMENT,
        'User ID must be positive'
      );
    }

    // 3. Business logic
    const user = await this.userRepository.findById(req.id);
    if (!user) {
      throw new GrpcError(
        GrpcStatusCode.NOT_FOUND,
        `User ${req.id} not found`
      );
    }

    // 4. Build response
    const responseUser = userservice.User.create({
      id: user.id,
      name: user.name,
      email: user.email,
      createdAt: user.createdAt.toISOString(),
    });

    // 5. Serialize and return
    const encoded = userservice.User.encode(responseUser).finish();
    return {
      payload: Buffer.from(encoded),
      metadata: { 'x-user-found': 'true' },
    };
  }

  private async createUser(request: GrpcRequest): Promise<GrpcResponse> {
    // 1. Deserialize request
    const req = userservice.CreateUserRequest.decode(request.payload);

    // 2. Validate input
    if (!req.name || !req.email) {
      throw new GrpcError(
        GrpcStatusCode.INVALID_ARGUMENT,
        'Name and email are required'
      );
    }

    // 3. Check authorization from metadata
    const authToken = request.metadata['authorization'];
    if (!authToken) {
      throw new GrpcError(
        GrpcStatusCode.UNAUTHENTICATED,
        'Authentication required'
      );
    }

    // 4. Business logic
    const user = await this.userRepository.create({
      name: req.name,
      email: req.email,
    });

    // 5. Build response
    const responseUser = userservice.User.create({
      id: user.id,
      name: user.name,
      email: user.email,
      createdAt: new Date().toISOString(),
    });

    // 6. Serialize with metadata
    const encoded = userservice.User.encode(responseUser).finish();
    return {
      payload: Buffer.from(encoded),
      metadata: {
        'x-user-id': user.id.toString(),
        'x-created': 'true',
      },
    };
  }
}

Key Patterns

  • Protobufjs: Uses .decode() and .encode().finish() for serialization
  • Buffer: gRPC payloads are Node.js Buffer objects
  • GrpcError: Throw with explicit status codes for proper error responses
  • Helper functions: createUnaryHandler and createServiceHandler reduce boilerplate
  • Type safety: Full TypeScript type inference for protobuf messages

Registration

import { GrpcService, Spikard } from 'spikard';

const grpcService = new GrpcService();

grpcService.registerUnary('userservice.UserService', 'GetUser', userServiceHandler);

const app = new Spikard();
app.useGrpc(grpcService);

Ruby gRPC Handler

Complete Ruby handler implementation for UserService with GetUser and CreateUser methods.

require 'spikard/grpc'
require 'userservice_pb'  # Generated from proto

class UserServiceHandler < Spikard::Grpc::Handler
  def initialize(user_repository)
    @user_repository = user_repository
  end

  def handle_request(request)
    # Route based on method name
    case request.method_name
    when 'GetUser'
      get_user(request)
    when 'CreateUser'
      create_user(request)
    else
      raise "Unknown method: #{request.method_name}"
    end
  end

  private

  def get_user(request)
    # 1. Deserialize request
    req = Userservice::GetUserRequest.decode(request.payload)

    # 2. Validate input
    raise ArgumentError, 'User ID must be positive' if req.id <= 0

    # 3. Business logic
    user = @user_repository.find_by_id(req.id)
    raise ArgumentError, "User #{req.id} not found" unless user

    # 4. Build response
    response_user = Userservice::User.new(
      id: user.id,
      name: user.name,
      email: user.email,
      created_at: user.created_at.iso8601
    )

    # 5. Serialize and return
    response = Spikard::Grpc::Response.new(
      payload: Userservice::User.encode(response_user)
    )
    response.metadata = { 'x-user-found' => 'true' }
    response
  end

  def create_user(request)
    # 1. Deserialize request
    req = Userservice::CreateUserRequest.decode(request.payload)

    # 2. Validate input
    if req.name.empty? || req.email.empty?
      raise ArgumentError, 'Name and email are required'
    end

    # 3. Check authorization from metadata
    auth_token = request.get_metadata('authorization')
    unless auth_token
      raise SecurityError, 'Authentication required'
    end

    # 4. Business logic
    user = @user_repository.create(
      name: req.name,
      email: req.email
    )

    # 5. Build response
    response_user = Userservice::User.new(
      id: user.id,
      name: user.name,
      email: user.email,
      created_at: Time.now.utc.iso8601
    )

    # 6. Serialize with metadata
    response = Spikard::Grpc::Response.new(
      payload: Userservice::User.encode(response_user)
    )
    response.metadata = {
      'x-user-id' => user.id.to_s,
      'x-created' => 'true'
    }
    response
  end
end

Key Patterns

  • Synchronous: Ruby handlers are synchronous (Rust runtime handles async)
  • .decode() / .encode(): Ruby protobuf methods for serialization
  • Metadata access: request.get_metadata(key) returns String | nil
  • Response construction: Create response, then set metadata separately
  • Error responses: Use Response.error() for error cases
  • Exception mapping: Rescue exceptions and convert to gRPC status codes

Error Handling

class UserServiceHandler < Spikard::Grpc::Handler
  def handle_request(request)
    case request.method_name
    when 'GetUser'
      get_user(request)
    else
      # Return error response
      Spikard::Grpc::Response.error(
        "Method not implemented: #{request.method_name}"
      )
    end
  rescue ArgumentError => e
    # Invalid argument error
    Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
  rescue SecurityError => e
    # Authentication error
    Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
  rescue StandardError => e
    # Internal error
    Spikard::Grpc::Response.error("Internal error: #{e.message}")
  end
end

Registration

require 'spikard'

# Create service registry
service = Spikard::Grpc::Service.new

# Register handler
handler = UserServiceHandler.new(UserRepository.new)
service.register_handler('userservice.UserService', handler)

# Service ready to handle requests

PHP gRPC Handler

Complete PHP handler implementation for UserService with GetUser and CreateUser methods.

<?php

declare(strict_types=1);

use Spikard\Grpc\HandlerInterface;
use Spikard\Grpc\Request;
use Spikard\Grpc\Response;
use Userservice\GetUserRequest;
use Userservice\CreateUserRequest;
use Userservice\User;

class UserServiceHandler implements HandlerInterface
{
    public function __construct(
        private UserRepository $userRepository,
    ) {}

    public function handleRequest(Request $request): Response
    {
        // Route based on method name
        return match ($request->methodName) {
            'GetUser' => $this->getUser($request),
            'CreateUser' => $this->createUser($request),
            default => Response::error("Unknown method: {$request->methodName}"),
        };
    }

    private function getUser(Request $request): Response
    {
        try {
            // 1. Deserialize request
            $req = new GetUserRequest();
            $req->mergeFromString($request->payload);

            // 2. Validate input
            if ($req->getId() <= 0) {
                return Response::error('User ID must be positive');
            }

            // 3. Business logic
            $user = $this->userRepository->findById($req->getId());
            if (!$user) {
                return Response::error("User {$req->getId()} not found");
            }

            // 4. Build response
            $responseUser = new User();
            $responseUser->setId($user->getId());
            $responseUser->setName($user->getName());
            $responseUser->setEmail($user->getEmail());
            $responseUser->setCreatedAt($user->getCreatedAt()->format('c'));

            // 5. Serialize and return
            return new Response(
                payload: $responseUser->serializeToString(),
                metadata: ['x-user-found' => 'true']
            );

        } catch (\Exception $e) {
            return Response::error("Error: {$e->getMessage()}");
        }
    }

    private function createUser(Request $request): Response
    {
        try {
            // 1. Deserialize request
            $req = new CreateUserRequest();
            $req->mergeFromString($request->payload);

            // 2. Validate input
            if (empty($req->getName()) || empty($req->getEmail())) {
                return Response::error('Name and email are required');
            }

            // 3. Check authorization from metadata
            $authToken = $request->getMetadata('authorization');
            if (!$authToken) {
                return Response::error(
                    'Authentication required',
                    'UNAUTHENTICATED'
                );
            }

            // 4. Business logic
            $user = $this->userRepository->create(
                name: $req->getName(),
                email: $req->getEmail()
            );

            // 5. Build response
            $responseUser = new User();
            $responseUser->setId($user->getId());
            $responseUser->setName($user->getName());
            $responseUser->setEmail($user->getEmail());
            $responseUser->setCreatedAt((new \DateTime())->format('c'));

            // 6. Serialize with metadata
            return new Response(
                payload: $responseUser->serializeToString(),
                metadata: [
                    'x-user-id' => (string)$user->getId(),
                    'x-created' => 'true',
                ]
            );

        } catch (\Exception $e) {
            return Response::error("Error: {$e->getMessage()}");
        }
    }
}

Key Patterns

  • Synchronous: PHP handlers are synchronous
  • mergeFromString(): Deserializes binary protobuf (use merge, not parse)
  • serializeToString(): Serializes protobuf to binary
  • Getters/Setters: PHP protobuf uses getter/setter methods
  • Error responses: Return Response::error() instead of throwing
  • Named arguments: PHP 8.0+ named arguments for clarity
  • Type hints: Leverage PHP type system for safety

Registration

<?php

declare(strict_types=1);

use Spikard\Grpc;

// Create service registry
$service = Grpc::createService();

// Register handler
$handler = new UserServiceHandler(
    userRepository: new UserRepository()
);

$service->registerHandler('userservice.UserService', $handler);

// Service ready to handle requests

Rust gRPC Handler

Complete Rust handler implementation for UserService with GetUser and CreateUser methods.

use bytes::Bytes;
use spikard_http::grpc::{GrpcHandler, GrpcHandlerResult, GrpcRequestData, GrpcResponseData};
use tonic::Status;
use std::sync::Arc;

// Generated protobuf types
mod userservice {
    include!("userservice.rs");  // Generated by prost
}

pub struct UserServiceHandler {
    user_repository: Arc<dyn UserRepository + Send + Sync>,
}

impl UserServiceHandler {
    pub fn new(user_repository: Arc<dyn UserRepository + Send + Sync>) -> Self {
        Self { user_repository }
    }

    async fn get_user(&self, request: GrpcRequestData) -> GrpcHandlerResult {
        // 1. Deserialize request
        use prost::Message;
        let req = userservice::GetUserRequest::decode(request.payload)
            .map_err(|e| Status::invalid_argument(format!("Invalid request: {}", e)))?;

        // 2. Validate input
        if req.id <= 0 {
            return Err(Status::invalid_argument("User ID must be positive"));
        }

        // 3. Business logic
        let user = self.user_repository.find_by_id(req.id).await
            .map_err(|e| Status::internal(format!("Database error: {}", e)))?
            .ok_or_else(|| Status::not_found(format!("User {} not found", req.id)))?;

        // 4. Build response
        let response_user = userservice::User {
            id: user.id,
            name: user.name.clone(),
            email: user.email.clone(),
            created_at: user.created_at.to_rfc3339(),
        };

        // 5. Serialize
        let mut buf = Vec::new();
        response_user.encode(&mut buf)
            .map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;

        // 6. Add metadata
        let mut metadata = tonic::metadata::MetadataMap::new();
        metadata.insert("x-user-found", "true".parse().map_err(|_| Status::internal("failed to parse metadata value"))?);

        Ok(GrpcResponseData {
            payload: Bytes::from(buf),
            metadata,
        })
    }

    async fn create_user(&self, request: GrpcRequestData) -> GrpcHandlerResult {
        // 1. Deserialize request
        use prost::Message;
        let req = userservice::CreateUserRequest::decode(request.payload)
            .map_err(|e| Status::invalid_argument(format!("Invalid request: {}", e)))?;

        // 2. Validate input
        if req.name.is_empty() || req.email.is_empty() {
            return Err(Status::invalid_argument("Name and email are required"));
        }

        // 3. Check authorization from metadata
        let auth_token = request.metadata
            .get("authorization")
            .and_then(|v| v.to_str().ok());

        if auth_token.is_none() {
            return Err(Status::unauthenticated("Authentication required"));
        }

        // 4. Business logic
        let user = self.user_repository.create(&req.name, &req.email).await
            .map_err(|e| Status::internal(format!("Create failed: {}", e)))?;

        // 5. Build response
        let response_user = userservice::User {
            id: user.id,
            name: user.name.clone(),
            email: user.email.clone(),
            created_at: chrono::Utc::now().to_rfc3339(),
        };

        // 6. Serialize
        let mut buf = Vec::new();
        response_user.encode(&mut buf)
            .map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;

        // 7. Add metadata
        let mut metadata = tonic::metadata::MetadataMap::new();
        metadata.insert("x-user-id", user.id.to_string().parse().map_err(|_| Status::internal("failed to parse metadata value"))?);
        metadata.insert("x-created", "true".parse().map_err(|_| Status::internal("failed to parse metadata value"))?);

        Ok(GrpcResponseData {
            payload: Bytes::from(buf),
            metadata,
        })
    }
}

impl GrpcHandler for UserServiceHandler {
    fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
        Box::pin(async move {
            match request.method_name.as_str() {
                "GetUser" => self.get_user(request).await,
                "CreateUser" => self.create_user(request).await,
                _ => Err(Status::unimplemented(format!("Unknown method: {}", request.method_name))),
            }
        })
    }

    fn service_name(&self) -> &'static str {
        "userservice.UserService"
    }
}

Key Patterns

  • Async/await: Handlers return Pin<Box<dyn Future>>
  • prost: Uses .decode() and .encode() for protobuf
  • Error handling: Return tonic::Status directly
  • Zero-copy: Uses Bytes for efficient payload handling
  • Type safety: Full compile-time type checking
  • Arc: Shared ownership for thread-safe repository access

Registration

use spikard_http::grpc::{GrpcRegistry, RpcMode};
use std::sync::Arc;

// Create handler
let user_repository = Arc::new(UserRepositoryImpl::new());
let handler = Arc::new(UserServiceHandler::new(user_repository));

// Register with gRPC runtime
let mut registry = GrpcRegistry::new();
registry.register("userservice.UserService", handler, RpcMode::Unary);

// Runtime ready to serve

Part 5: Error Handling

gRPC Status Codes

gRPC Status Codes Reference

Complete reference table for all 17 standard gRPC status codes.

Code Numeric When to Use HTTP Equivalent
OK 0 Request completed successfully 200 OK
CANCELLED 1 Operation was cancelled (typically by caller) 499 Client Closed Request
UNKNOWN 2 Unknown error or unmapped status from another system 500 Internal Server Error
INVALID_ARGUMENT 3 Client specified an invalid argument (validation errors) 400 Bad Request
DEADLINE_EXCEEDED 4 Operation deadline was exceeded before completion 504 Gateway Timeout
NOT_FOUND 5 Requested entity (e.g., file, user) was not found 404 Not Found
ALREADY_EXISTS 6 Entity that client attempted to create already exists 409 Conflict
PERMISSION_DENIED 7 Caller lacks permission for the operation 403 Forbidden
RESOURCE_EXHAUSTED 8 Resource has been exhausted (quota, rate limit) 429 Too Many Requests
FAILED_PRECONDITION 9 Operation rejected because system not in required state 400 Bad Request
ABORTED 10 Operation aborted due to concurrency issues 409 Conflict
OUT_OF_RANGE 11 Operation attempted past valid range 400 Bad Request
UNIMPLEMENTED 12 Operation is not implemented or not supported 501 Not Implemented
INTERNAL 13 Internal server error 500 Internal Server Error
UNAVAILABLE 14 Service is currently unavailable (temporary condition) 503 Service Unavailable
DATA_LOSS 15 Unrecoverable data loss or corruption 500 Internal Server Error
UNAUTHENTICATED 16 Request missing or invalid authentication credentials 401 Unauthorized

Usage Guidelines

  1. Choose the most specific code: Use the most descriptive status code that accurately represents the error condition.

  2. Provide helpful messages: Include clear, actionable error messages that help clients understand and resolve the issue.

  3. Never expose sensitive information: Don't include stack traces, database errors, or internal system details in error messages.

  4. Use INTERNAL for unexpected errors: When encountering unexpected server errors, return INTERNAL and log the details server-side.

  5. Distinguish UNAUTHENTICATED vs PERMISSION_DENIED: Use UNAUTHENTICATED for missing/invalid credentials, PERMISSION_DENIED for authenticated users lacking permissions.

  6. Consider retry behavior: Clients may automatically retry certain codes (UNAVAILABLE, DEADLINE_EXCEEDED) but not others (INVALID_ARGUMENT, PERMISSION_DENIED).

Error Handling Patterns

try:
    # Handler logic
    pass
except ValueError as e:
    # Maps to INVALID_ARGUMENT
    raise
except PermissionError as e:
    # Maps to PERMISSION_DENIED
    raise
except NotImplementedError as e:
    # Maps to UNIMPLEMENTED
    raise
except Exception as e:
    # Maps to INTERNAL
    raise

Mapping is automatic via FFI layer (pyerr_to_grpc_status).

import { GrpcError, GrpcStatusCode } from 'spikard';

// Explicit status codes
throw new GrpcError(GrpcStatusCode.INVALID_ARGUMENT, 'Invalid ID');
throw new GrpcError(GrpcStatusCode.NOT_FOUND, 'User not found');
throw new GrpcError(GrpcStatusCode.UNAUTHENTICATED, 'Auth required');
throw new GrpcError(GrpcStatusCode.PERMISSION_DENIED, 'Access denied');
throw new GrpcError(GrpcStatusCode.INTERNAL, 'Internal error');

Explicit GrpcError for all status codes.

class UserServiceHandler < Spikard::Grpc::Handler
  def handle_request(request)
    case request.method_name
    when 'GetUser'
      get_user(request)
    else
      # Return error response
      Spikard::Grpc::Response.error(
        "Method not implemented: #{request.method_name}"
      )
    end
  rescue ArgumentError => e
    # Invalid argument error
    Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
  rescue SecurityError => e
    # Authentication error
    Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
  rescue StandardError => e
    # Internal error
    Spikard::Grpc::Response.error("Internal error: #{e.message}")
  end
end
<?php declare(strict_types=1);

// Return error response
return Response::error('Error message');

// With status code in metadata
return Response::error(
    'Error message',
    'INVALID_ARGUMENT'
);

// Try-catch pattern
try {
    // Handler logic
} catch (\InvalidArgumentException $e) {
    return Response::error($e->getMessage());
} catch (\Exception $e) {
    return Response::error("Internal error: {$e->getMessage()}");
}

Return error responses instead of throwing.

// Direct tonic::Status
return Err(Status::invalid_argument("Invalid ID"));
return Err(Status::not_found("User not found"));
return Err(Status::unauthenticated("Auth required"));
return Err(Status::permission_denied("Access denied"));
return Err(Status::internal("Internal error"));

// With .map_err()
user_repository.find_by_id(id)
    .await
    .map_err(|e| Status::internal(format!("DB error: {}", e)))?;

Type-safe Result<T, Status> pattern.

Part 6: Testing

Python gRPC Handler Tests

Comprehensive test examples for gRPC handlers using pytest.

# test_user_handler.py
import pytest
from user_handler import UserServiceHandler
from spikard import GrpcRequest
import user_service_pb2 as pb


@pytest.mark.asyncio
async def test_get_user_success():
    """Test getting an existing user."""
    handler = UserServiceHandler()

    # Create request
    req = pb.GetUserRequest(user_id=1)
    grpc_request = GrpcRequest(
        service_name="userservice.v1.UserService",
        method_name="GetUser",
        payload=req.SerializeToString(),
    )

    # Call handler
    response = await handler.handle_request(grpc_request)

    # Deserialize response
    user_response = pb.UserResponse()
    user_response.ParseFromString(response.payload)

    # Assertions
    assert user_response.success is True
    assert user_response.user.id == 1
    assert user_response.user.name == "Alice"
    assert user_response.user.email == "alice@example.com"


@pytest.mark.asyncio
async def test_get_user_not_found():
    """Test getting a non-existent user."""
    handler = UserServiceHandler()

    # Create request for non-existent user
    req = pb.GetUserRequest(user_id=999)
    grpc_request = GrpcRequest(
        service_name="userservice.v1.UserService",
        method_name="GetUser",
        payload=req.SerializeToString(),
    )

    # Call handler
    response = await handler.handle_request(grpc_request)

    # Deserialize response
    user_response = pb.UserResponse()
    user_response.ParseFromString(response.payload)

    # Assertions
    assert user_response.success is False
    assert "not found" in user_response.error_message


@pytest.mark.asyncio
async def test_create_user_success():
    """Test creating a new user."""
    handler = UserServiceHandler()

    # Create request
    req = pb.CreateUserRequest(
        name="Charlie",
        email="charlie@example.com",
        phone="555-1234",
        tags=["developer", "remote"],
    )
    grpc_request = GrpcRequest(
        service_name="userservice.v1.UserService",
        method_name="CreateUser",
        payload=req.SerializeToString(),
    )

    # Call handler
    response = await handler.handle_request(grpc_request)

    # Deserialize response
    user_response = pb.UserResponse()
    user_response.ParseFromString(response.payload)

    # Assertions
    assert user_response.success is True
    assert user_response.user.id == 3  # Next available ID
    assert user_response.user.name == "Charlie"
    assert user_response.user.email == "charlie@example.com"
    assert user_response.user.phone == "555-1234"
    assert list(user_response.user.tags) == ["developer", "remote"]


@pytest.mark.asyncio
async def test_create_user_validation_error():
    """Test creating a user with missing required fields."""
    handler = UserServiceHandler()

    # Create request with missing email
    req = pb.CreateUserRequest(name="Invalid")
    grpc_request = GrpcRequest(
        service_name="userservice.v1.UserService",
        method_name="CreateUser",
        payload=req.SerializeToString(),
    )

    # Call handler
    response = await handler.handle_request(grpc_request)

    # Deserialize response
    user_response = pb.UserResponse()
    user_response.ParseFromString(response.payload)

    # Assertions
    assert user_response.success is False
    assert "required" in user_response.error_message


@pytest.mark.asyncio
async def test_unknown_method():
    """Test calling an unknown method."""
    handler = UserServiceHandler()

    grpc_request = GrpcRequest(
        service_name="userservice.v1.UserService",
        method_name="DeleteUser",  # Not implemented
        payload=b"",
    )

    # Should raise NotImplementedError
    with pytest.raises(NotImplementedError, match="Unknown method"):
        await handler.handle_request(grpc_request)


# Run tests
if __name__ == "__main__":
    pytest.main([__file__, "-v"])

Test Patterns

Using Fixtures

import pytest
from user_handler import UserServiceHandler

@pytest.fixture
def handler():
    """Create handler with test data."""
    return UserServiceHandler()

@pytest.fixture
def sample_user():
    """Create sample user for tests."""
    return pb.User(
        id=1,
        name="Test User",
        email="test@example.com",
    )

@pytest.mark.asyncio
async def test_with_fixtures(handler, sample_user):
    # Test using fixtures
    pass

Testing Error Cases

@pytest.mark.asyncio
async def test_handles_malformed_payload():
    """Test handler with malformed protobuf."""
    handler = UserServiceHandler()

    # Create request with invalid protobuf
    grpc_request = GrpcRequest(
        service_name="userservice.v1.UserService",
        method_name="GetUser",
        payload=b"invalid protobuf data",
    )

    # Should handle deserialization error gracefully
    with pytest.raises(Exception):  # Or specific protobuf exception
        await handler.handle_request(grpc_request)

Running Tests

pytest test_user_handler.py -v

TypeScript gRPC Handler Tests

Comprehensive test examples for gRPC handlers using Vitest.

// user_handler.test.ts
import { describe, it, expect } from 'vitest';
import { GrpcRequest } from '@spikard/node';
import { UserServiceHandler } from './user_handler';
import { userservice } from './user_service';

describe('UserServiceHandler', () => {
  it('should get an existing user', async () => {
    const handler = new UserServiceHandler();

    // Create request
    const req = userservice.v1.GetUserRequest.create({ userId: 1 });
    const payload = userservice.v1.GetUserRequest.encode(req).finish();
    const grpcRequest = new GrpcRequest({
      serviceName: 'userservice.v1.UserService',
      methodName: 'GetUser',
      payload,
    });

    // Call handler
    const response = await handler.handleRequest(grpcRequest);

    // Deserialize response
    const userResponse = userservice.v1.UserResponse.decode(response.payload);

    // Assertions
    expect(userResponse.success).toBe(true);
    expect(userResponse.user?.id).toBe(1);
    expect(userResponse.user?.name).toBe('Alice');
  });

  it('should return error for non-existent user', async () => {
    const handler = new UserServiceHandler();

    const req = userservice.v1.GetUserRequest.create({ userId: 999 });
    const payload = userservice.v1.GetUserRequest.encode(req).finish();
    const grpcRequest = new GrpcRequest({
      serviceName: 'userservice.v1.UserService',
      methodName: 'GetUser',
      payload,
    });

    const response = await handler.handleRequest(grpcRequest);
    const userResponse = userservice.v1.UserResponse.decode(response.payload);

    expect(userResponse.success).toBe(false);
    expect(userResponse.errorMessage).toContain('not found');
  });

  it('should create a new user', async () => {
    const handler = new UserServiceHandler();

    const req = userservice.v1.CreateUserRequest.create({
      name: 'Charlie',
      email: 'charlie@example.com',
      tags: ['developer'],
    });
    const payload = userservice.v1.CreateUserRequest.encode(req).finish();
    const grpcRequest = new GrpcRequest({
      serviceName: 'userservice.v1.UserService',
      methodName: 'CreateUser',
      payload,
    });

    const response = await handler.handleRequest(grpcRequest);
    const userResponse = userservice.v1.UserResponse.decode(response.payload);

    expect(userResponse.success).toBe(true);
    expect(userResponse.user?.name).toBe('Charlie');
    expect(userResponse.user?.id).toBe(3);
  });

  it('should validate required fields on create', async () => {
    const handler = new UserServiceHandler();

    const req = userservice.v1.CreateUserRequest.create({
      name: 'Test User',
      email: '', // Missing email
    });
    const payload = userservice.v1.CreateUserRequest.encode(req).finish();
    const grpcRequest = new GrpcRequest({
      serviceName: 'userservice.v1.UserService',
      methodName: 'CreateUser',
      payload,
    });

    const response = await handler.handleRequest(grpcRequest);
    const userResponse = userservice.v1.UserResponse.decode(response.payload);

    expect(userResponse.success).toBe(false);
    expect(userResponse.errorMessage).toContain('required');
  });

  it('should throw for unknown methods', async () => {
    const handler = new UserServiceHandler();

    const grpcRequest = new GrpcRequest({
      serviceName: 'userservice.v1.UserService',
      methodName: 'DeleteUser', // Not implemented
      payload: new Uint8Array(),
    });

    await expect(handler.handleRequest(grpcRequest)).rejects.toThrow(
      'Unknown method'
    );
  });
});

Test Patterns

Using Test Helpers

// test-helpers.ts
import { GrpcRequest } from '@spikard/node';
import { userservice } from './user_service';

export function createGetUserRequest(userId: number): GrpcRequest {
  const req = userservice.v1.GetUserRequest.create({ userId });
  const payload = userservice.v1.GetUserRequest.encode(req).finish();
  return new GrpcRequest({
    serviceName: 'userservice.v1.UserService',
    methodName: 'GetUser',
    payload,
  });
}

export function createCreateUserRequest(
  name: string,
  email: string,
  tags?: string[]
): GrpcRequest {
  const req = userservice.v1.CreateUserRequest.create({
    name,
    email,
    tags: tags || [],
  });
  const payload = userservice.v1.CreateUserRequest.encode(req).finish();
  return new GrpcRequest({
    serviceName: 'userservice.v1.UserService',
    methodName: 'CreateUser',
    payload,
  });
}

Testing with Metadata

it('should handle authorization header', async () => {
  const handler = new UserServiceHandler();

  const req = userservice.v1.CreateUserRequest.create({
    name: 'Test',
    email: 'test@example.com',
  });
  const payload = userservice.v1.CreateUserRequest.encode(req).finish();
  const grpcRequest = new GrpcRequest({
    serviceName: 'userservice.v1.UserService',
    methodName: 'CreateUser',
    payload,
    metadata: {
      authorization: 'Bearer valid-token',
    },
  });

  const response = await handler.handleRequest(grpcRequest);
  const userResponse = userservice.v1.UserResponse.decode(response.payload);

  expect(userResponse.success).toBe(true);
});

Running Tests

# Run all tests
npx vitest

# Run with coverage
npx vitest --coverage

# Run specific file
npx vitest user_handler.test.ts

Ruby gRPC Handler Tests

Comprehensive test examples for gRPC handlers using RSpec.

# spec/user_service_handler_spec.rb
require 'spec_helper'
require 'spikard/grpc'
require 'userservice_pb'
require_relative '../lib/user_service_handler'

RSpec.describe UserServiceHandler do
  let(:user_repository) { instance_double('UserRepository') }
  let(:handler) { described_class.new(user_repository) }

  describe '#handle_request' do
    context 'GetUser' do
      it 'returns an existing user successfully' do
        # Setup mock data
        mock_user = OpenStruct.new(
          id: 1,
          name: 'Alice',
          email: 'alice@example.com',
          created_at: Time.now.utc
        )
        allow(user_repository).to receive(:find_by_id).with(1).and_return(mock_user)

        # Create request
        req = Userservice::GetUserRequest.new(id: 1)
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'GetUser',
          payload: Userservice::GetUserRequest.encode(req)
        )

        # Call handler
        response = handler.handle_request(grpc_request)

        # Deserialize response
        user_response = Userservice::User.decode(response.payload)

        # Assertions
        expect(user_response.id).to eq(1)
        expect(user_response.name).to eq('Alice')
        expect(user_response.email).to eq('alice@example.com')
        expect(response.metadata['x-user-found']).to eq('true')
      end

      it 'returns error for non-existent user' do
        allow(user_repository).to receive(:find_by_id).with(999).and_return(nil)

        # Create request for non-existent user
        req = Userservice::GetUserRequest.new(id: 999)
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'GetUser',
          payload: Userservice::GetUserRequest.encode(req)
        )

        # Call handler - should raise error
        expect { handler.handle_request(grpc_request) }.to raise_error(
          ArgumentError, /not found/
        )
      end

      it 'validates user ID is positive' do
        req = Userservice::GetUserRequest.new(id: 0)
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'GetUser',
          payload: Userservice::GetUserRequest.encode(req)
        )

        expect { handler.handle_request(grpc_request) }.to raise_error(
          ArgumentError, /must be positive/
        )
      end
    end

    context 'CreateUser' do
      it 'creates a new user successfully' do
        mock_user = OpenStruct.new(
          id: 3,
          name: 'Charlie',
          email: 'charlie@example.com'
        )
        allow(user_repository).to receive(:create).and_return(mock_user)

        # Create request with authorization metadata
        req = Userservice::CreateUserRequest.new(
          name: 'Charlie',
          email: 'charlie@example.com'
        )
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'CreateUser',
          payload: Userservice::CreateUserRequest.encode(req),
          metadata: { 'authorization' => 'Bearer valid-token' }
        )

        # Call handler
        response = handler.handle_request(grpc_request)

        # Deserialize response
        user_response = Userservice::User.decode(response.payload)

        # Assertions
        expect(user_response.id).to eq(3)
        expect(user_response.name).to eq('Charlie')
        expect(user_response.email).to eq('charlie@example.com')
        expect(response.metadata['x-user-id']).to eq('3')
        expect(response.metadata['x-created']).to eq('true')
      end

      it 'returns error when name is missing' do
        req = Userservice::CreateUserRequest.new(
          name: '',
          email: 'test@example.com'
        )
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'CreateUser',
          payload: Userservice::CreateUserRequest.encode(req),
          metadata: { 'authorization' => 'Bearer token' }
        )

        expect { handler.handle_request(grpc_request) }.to raise_error(
          ArgumentError, /required/
        )
      end

      it 'returns error when authorization is missing' do
        req = Userservice::CreateUserRequest.new(
          name: 'Test',
          email: 'test@example.com'
        )
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'CreateUser',
          payload: Userservice::CreateUserRequest.encode(req)
        )

        expect { handler.handle_request(grpc_request) }.to raise_error(
          SecurityError, /Authentication required/
        )
      end
    end

    context 'unknown method' do
      it 'raises error for unknown method' do
        grpc_request = Spikard::Grpc::Request.new(
          service_name: 'userservice.v1.UserService',
          method_name: 'DeleteUser',
          payload: ''
        )

        expect { handler.handle_request(grpc_request) }.to raise_error(
          RuntimeError, /Unknown method/
        )
      end
    end
  end
end

Test Patterns

Using Shared Examples

RSpec.shared_examples 'authenticated endpoint' do |method_name|
  it 'requires authentication' do
    grpc_request = Spikard::Grpc::Request.new(
      service_name: 'userservice.v1.UserService',
      method_name: method_name,
      payload: request_payload
    )

    expect { handler.handle_request(grpc_request) }.to raise_error(
      SecurityError, /Authentication required/
    )
  end
end

RSpec.describe UserServiceHandler do
  describe 'CreateUser' do
    let(:request_payload) do
      req = Userservice::CreateUserRequest.new(name: 'Test', email: 'test@example.com')
      Userservice::CreateUserRequest.encode(req)
    end

    include_examples 'authenticated endpoint', 'CreateUser'
  end
end

Testing Error Responses

RSpec.describe UserServiceHandler do
  describe 'error handling' do
    it 'handles malformed protobuf gracefully' do
      grpc_request = Spikard::Grpc::Request.new(
        service_name: 'userservice.v1.UserService',
        method_name: 'GetUser',
        payload: 'invalid protobuf data'
      )

      expect { handler.handle_request(grpc_request) }.to raise_error(
        Google::Protobuf::ParseError
      )
    end
  end
end

Using let! for Setup

RSpec.describe UserServiceHandler do
  let!(:alice) do
    OpenStruct.new(id: 1, name: 'Alice', email: 'alice@example.com', created_at: Time.now)
  end
  let!(:bob) do
    OpenStruct.new(id: 2, name: 'Bob', email: 'bob@example.com', created_at: Time.now)
  end

  before do
    allow(user_repository).to receive(:find_by_id).with(1).and_return(alice)
    allow(user_repository).to receive(:find_by_id).with(2).and_return(bob)
  end

  it 'retrieves different users' do
    # Test with alice
    req = Userservice::GetUserRequest.new(id: 1)
    grpc_request = Spikard::Grpc::Request.new(
      service_name: 'userservice.v1.UserService',
      method_name: 'GetUser',
      payload: Userservice::GetUserRequest.encode(req)
    )

    response = handler.handle_request(grpc_request)
    user = Userservice::User.decode(response.payload)

    expect(user.name).to eq('Alice')
  end
end

Running Tests

# Run all tests
bundle exec rspec

# Run with verbose output
bundle exec rspec --format documentation

# Run specific file
bundle exec rspec spec/user_service_handler_spec.rb

# Run specific example
bundle exec rspec spec/user_service_handler_spec.rb:15

PHP gRPC Handler Tests

Comprehensive test examples for gRPC handlers using PHPUnit.

<?php
// tests/UserServiceHandlerTest.php

declare(strict_types=1);

namespace Tests;

use PHPUnit\Framework\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use Spikard\Grpc\Request;
use Spikard\Grpc\Response;
use Userservice\GetUserRequest;
use Userservice\CreateUserRequest;
use Userservice\User;

class UserServiceHandlerTest extends TestCase
{
    private UserServiceHandler $handler;
    private MockObject $userRepository;

    protected function setUp(): void
    {
        $this->userRepository = $this->createMock(UserRepository::class);
        $this->handler = new UserServiceHandler($this->userRepository);
    }

    public function testGetUserSuccess(): void
    {
        // Setup mock data
        $mockUser = new \stdClass();
        $mockUser->id = 1;
        $mockUser->name = 'Alice';
        $mockUser->email = 'alice@example.com';
        $mockUser->createdAt = new \DateTime();

        $this->userRepository
            ->expects($this->once())
            ->method('findById')
            ->with(1)
            ->willReturn($mockUser);

        // Create request
        $req = new GetUserRequest();
        $req->setId(1);

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: $req->serializeToString()
        );

        // Call handler
        $response = $this->handler->handleRequest($grpcRequest);

        // Deserialize response
        $userResponse = new User();
        $userResponse->mergeFromString($response->payload);

        // Assertions
        $this->assertEquals(1, $userResponse->getId());
        $this->assertEquals('Alice', $userResponse->getName());
        $this->assertEquals('alice@example.com', $userResponse->getEmail());
        $this->assertEquals('true', $response->getMetadata('x-user-found'));
    }

    public function testGetUserNotFound(): void
    {
        $this->userRepository
            ->expects($this->once())
            ->method('findById')
            ->with(999)
            ->willReturn(null);

        // Create request for non-existent user
        $req = new GetUserRequest();
        $req->setId(999);

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: $req->serializeToString()
        );

        // Call handler
        $response = $this->handler->handleRequest($grpcRequest);

        // Should return error response
        $this->assertTrue($response->isError());
        $this->assertStringContainsString('not found', $response->errorMessage);
    }

    public function testGetUserInvalidId(): void
    {
        $req = new GetUserRequest();
        $req->setId(0);

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: $req->serializeToString()
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertTrue($response->isError());
        $this->assertStringContainsString('must be positive', $response->errorMessage);
    }

    public function testCreateUserSuccess(): void
    {
        $mockUser = new \stdClass();
        $mockUser->id = 3;
        $mockUser->name = 'Charlie';
        $mockUser->email = 'charlie@example.com';

        $this->userRepository
            ->expects($this->once())
            ->method('create')
            ->willReturn($mockUser);

        // Create request
        $req = new CreateUserRequest();
        $req->setName('Charlie');
        $req->setEmail('charlie@example.com');

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'CreateUser',
            payload: $req->serializeToString(),
            metadata: ['authorization' => 'Bearer valid-token']
        );

        // Call handler
        $response = $this->handler->handleRequest($grpcRequest);

        // Deserialize response
        $userResponse = new User();
        $userResponse->mergeFromString($response->payload);

        // Assertions
        $this->assertEquals(3, $userResponse->getId());
        $this->assertEquals('Charlie', $userResponse->getName());
        $this->assertEquals('charlie@example.com', $userResponse->getEmail());
        $this->assertEquals('3', $response->getMetadata('x-user-id'));
        $this->assertEquals('true', $response->getMetadata('x-created'));
    }

    public function testCreateUserValidationError(): void
    {
        // Create request with missing email
        $req = new CreateUserRequest();
        $req->setName('Test User');
        $req->setEmail('');

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'CreateUser',
            payload: $req->serializeToString(),
            metadata: ['authorization' => 'Bearer token']
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertTrue($response->isError());
        $this->assertStringContainsString('required', $response->errorMessage);
    }

    public function testCreateUserRequiresAuthentication(): void
    {
        $req = new CreateUserRequest();
        $req->setName('Test');
        $req->setEmail('test@example.com');

        // Request without authorization header
        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'CreateUser',
            payload: $req->serializeToString()
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertTrue($response->isError());
        $this->assertStringContainsString('Authentication required', $response->errorMessage);
        $this->assertEquals('16', $response->getMetadata('grpc-status'));
    }

    public function testUnknownMethod(): void
    {
        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'DeleteUser',
            payload: ''
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertTrue($response->isError());
        $this->assertStringContainsString('Unknown method', $response->errorMessage);
    }
}

Test Patterns

Using Data Providers

<?php

class UserServiceHandlerTest extends TestCase
{
    /**
     * @dataProvider invalidUserIdProvider
     */
    public function testGetUserWithInvalidIds(int $invalidId, string $expectedError): void
    {
        $req = new GetUserRequest();
        $req->setId($invalidId);

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: $req->serializeToString()
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertTrue($response->isError());
        $this->assertStringContainsString($expectedError, $response->errorMessage);
    }

    public static function invalidUserIdProvider(): array
    {
        return [
            'zero id' => [0, 'must be positive'],
            'negative id' => [-1, 'must be positive'],
        ];
    }
}

Testing Error Handling

<?php

class UserServiceHandlerTest extends TestCase
{
    public function testHandlesMalformedPayload(): void
    {
        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: 'invalid protobuf data'
        );

        $response = $this->handler->handleRequest($grpcRequest);

        // Should return error, not throw exception
        $this->assertTrue($response->isError());
        $this->assertStringContainsString('Error', $response->errorMessage);
    }

    public function testHandlesRepositoryException(): void
    {
        $this->userRepository
            ->method('findById')
            ->willThrowException(new \RuntimeException('Database connection failed'));

        $req = new GetUserRequest();
        $req->setId(1);

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: $req->serializeToString()
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertTrue($response->isError());
        $this->assertStringContainsString('Database connection failed', $response->errorMessage);
    }
}

Testing with Metadata

<?php

class UserServiceHandlerTest extends TestCase
{
    public function testHandlerReadsCustomMetadata(): void
    {
        $mockUser = new \stdClass();
        $mockUser->id = 1;
        $mockUser->name = 'Alice';
        $mockUser->email = 'alice@example.com';
        $mockUser->createdAt = new \DateTime();

        $this->userRepository->method('findById')->willReturn($mockUser);

        $req = new GetUserRequest();
        $req->setId(1);

        $grpcRequest = new Request(
            serviceName: 'userservice.v1.UserService',
            methodName: 'GetUser',
            payload: $req->serializeToString(),
            metadata: [
                'x-request-id' => 'abc-123',
                'x-trace-id' => 'trace-456',
            ]
        );

        $response = $this->handler->handleRequest($grpcRequest);

        $this->assertFalse($response->isError());
    }
}

Running Tests

# Run all tests
./vendor/bin/phpunit

# Run with verbose output
./vendor/bin/phpunit --verbose

# Run specific test file
./vendor/bin/phpunit tests/UserServiceHandlerTest.php

# Run specific test method
./vendor/bin/phpunit --filter testGetUserSuccess

# Run with coverage
./vendor/bin/phpunit --coverage-html coverage/

Rust gRPC Handler Tests

Comprehensive test examples for gRPC handlers using Tokio test.

// tests/user_handler_test.rs
use bytes::Bytes;
use spikard_http::grpc::{GrpcHandler, GrpcRequestData, GrpcResponseData};
use std::sync::Arc;
use tonic::metadata::MetadataMap;

mod userservice {
    include!("../src/userservice.rs");
}

use crate::user_handler::UserServiceHandler;

// Mock repository for testing
struct MockUserRepository {
    users: std::sync::RwLock<Vec<userservice::User>>,
}

impl MockUserRepository {
    fn new() -> Self {
        let users = vec![
            userservice::User {
                id: 1,
                name: "Alice".to_string(),
                email: "alice@example.com".to_string(),
                created_at: "2024-01-01T00:00:00Z".to_string(),
            },
            userservice::User {
                id: 2,
                name: "Bob".to_string(),
                email: "bob@example.com".to_string(),
                created_at: "2024-01-02T00:00:00Z".to_string(),
            },
        ];
        Self {
            users: std::sync::RwLock::new(users),
        }
    }
}

#[async_trait::async_trait]
impl UserRepository for MockUserRepository {
    async fn find_by_id(&self, id: i32) -> Result<Option<userservice::User>, String> {
        let users = self.users.read().expect("lock poisoned");
        Ok(users.iter().find(|u| u.id == id).cloned())
    }

    async fn create(&self, name: &str, email: &str) -> Result<userservice::User, String> {
        let mut users = self.users.write().expect("lock poisoned");
        let new_id = users.len() as i32 + 1;
        let user = userservice::User {
            id: new_id,
            name: name.to_string(),
            email: email.to_string(),
            created_at: chrono::Utc::now().to_rfc3339(),
        };
        users.push(user.clone());
        Ok(user)
    }
}

fn create_handler() -> UserServiceHandler {
    let repo = Arc::new(MockUserRepository::new());
    UserServiceHandler::new(repo)
}

#[tokio::test]
async fn test_get_user_success() {
    use prost::Message;

    let handler = create_handler();

    // Create request
    let req = userservice::GetUserRequest { id: 1 };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "GetUser".to_string(),
        payload: Bytes::from(buf),
        metadata: MetadataMap::new(),
    };

    // Call handler
    let response = handler.call(grpc_request).await.expect("handler call failed");

    // Deserialize response
    let user_response = userservice::User::decode(response.payload).expect("failed to decode response payload");

    // Assertions
    assert_eq!(user_response.id, 1);
    assert_eq!(user_response.name, "Alice");
    assert_eq!(user_response.email, "alice@example.com");
    assert_eq!(
        response.metadata.get("x-user-found").expect("x-user-found header missing").to_str().expect("invalid metadata value"),
        "true"
    );
}

#[tokio::test]
async fn test_get_user_not_found() {
    use prost::Message;

    let handler = create_handler();

    // Create request for non-existent user
    let req = userservice::GetUserRequest { id: 999 };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "GetUser".to_string(),
        payload: Bytes::from(buf),
        metadata: MetadataMap::new(),
    };

    // Call handler - should return error
    let result = handler.call(grpc_request).await;

    assert!(result.is_err());
    let status = result.unwrap_err();
    assert_eq!(status.code(), tonic::Code::NotFound);
    assert!(status.message().contains("not found"));
}

#[tokio::test]
async fn test_get_user_invalid_id() {
    use prost::Message;

    let handler = create_handler();

    // Create request with invalid ID
    let req = userservice::GetUserRequest { id: 0 };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "GetUser".to_string(),
        payload: Bytes::from(buf),
        metadata: MetadataMap::new(),
    };

    let result = handler.call(grpc_request).await;

    assert!(result.is_err());
    let status = result.unwrap_err();
    assert_eq!(status.code(), tonic::Code::InvalidArgument);
    assert!(status.message().contains("must be positive"));
}

#[tokio::test]
async fn test_create_user_success() {
    use prost::Message;

    let handler = create_handler();

    // Create request
    let req = userservice::CreateUserRequest {
        name: "Charlie".to_string(),
        email: "charlie@example.com".to_string(),
    };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    // Add authorization metadata
    let mut metadata = MetadataMap::new();
    metadata.insert("authorization", "Bearer valid-token".parse().expect("failed to parse authorization header"));

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "CreateUser".to_string(),
        payload: Bytes::from(buf),
        metadata,
    };

    // Call handler
    let response = handler.call(grpc_request).await.expect("handler call failed");

    // Deserialize response
    let user_response = userservice::User::decode(response.payload).expect("failed to decode response payload");

    // Assertions
    assert_eq!(user_response.id, 3); // Next available ID
    assert_eq!(user_response.name, "Charlie");
    assert_eq!(user_response.email, "charlie@example.com");
    assert_eq!(
        response.metadata.get("x-user-id").expect("x-user-id header missing").to_str().expect("invalid metadata value"),
        "3"
    );
    assert_eq!(
        response.metadata.get("x-created").expect("x-created header missing").to_str().expect("invalid metadata value"),
        "true"
    );
}

#[tokio::test]
async fn test_create_user_validation_error() {
    use prost::Message;

    let handler = create_handler();

    // Create request with missing email
    let req = userservice::CreateUserRequest {
        name: "Test User".to_string(),
        email: "".to_string(),
    };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    let mut metadata = MetadataMap::new();
    metadata.insert("authorization", "Bearer token".parse().expect("failed to parse authorization header"));

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "CreateUser".to_string(),
        payload: Bytes::from(buf),
        metadata,
    };

    let result = handler.call(grpc_request).await;

    assert!(result.is_err());
    let status = result.unwrap_err();
    assert_eq!(status.code(), tonic::Code::InvalidArgument);
    assert!(status.message().contains("required"));
}

#[tokio::test]
async fn test_create_user_requires_authentication() {
    use prost::Message;

    let handler = create_handler();

    let req = userservice::CreateUserRequest {
        name: "Test".to_string(),
        email: "test@example.com".to_string(),
    };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    // Request without authorization header
    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "CreateUser".to_string(),
        payload: Bytes::from(buf),
        metadata: MetadataMap::new(),
    };

    let result = handler.call(grpc_request).await;

    assert!(result.is_err());
    let status = result.unwrap_err();
    assert_eq!(status.code(), tonic::Code::Unauthenticated);
    assert!(status.message().contains("Authentication required"));
}

#[tokio::test]
async fn test_unknown_method() {
    let handler = create_handler();

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "DeleteUser".to_string(),
        payload: Bytes::new(),
        metadata: MetadataMap::new(),
    };

    let result = handler.call(grpc_request).await;

    assert!(result.is_err());
    let status = result.unwrap_err();
    assert_eq!(status.code(), tonic::Code::Unimplemented);
    assert!(status.message().contains("Unknown method"));
}

Test Patterns

Using Test Fixtures

use once_cell::sync::Lazy;

static TEST_HANDLER: Lazy<UserServiceHandler> = Lazy::new(|| {
    let repo = Arc::new(MockUserRepository::new());
    UserServiceHandler::new(repo)
});

#[tokio::test]
async fn test_with_shared_handler() {
    use prost::Message;

    let req = userservice::GetUserRequest { id: 1 };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "GetUser".to_string(),
        payload: Bytes::from(buf),
        metadata: MetadataMap::new(),
    };

    let response = TEST_HANDLER.call(grpc_request).await.expect("handler call failed");
    let user = userservice::User::decode(response.payload).expect("failed to decode response payload");

    assert_eq!(user.name, "Alice");
}

Testing Error Cases

#[tokio::test]
async fn test_handles_malformed_payload() {
    let handler = create_handler();

    let grpc_request = GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "GetUser".to_string(),
        payload: Bytes::from("invalid protobuf data"),
        metadata: MetadataMap::new(),
    };

    let result = handler.call(grpc_request).await;

    assert!(result.is_err());
    let status = result.unwrap_err();
    assert_eq!(status.code(), tonic::Code::InvalidArgument);
}

Helper Functions

fn create_get_user_request(user_id: i32) -> GrpcRequestData {
    use prost::Message;

    let req = userservice::GetUserRequest { id: user_id };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "GetUser".to_string(),
        payload: Bytes::from(buf),
        metadata: MetadataMap::new(),
    }
}

fn create_create_user_request(name: &str, email: &str, auth_token: Option<&str>) -> GrpcRequestData {
    use prost::Message;

    let req = userservice::CreateUserRequest {
        name: name.to_string(),
        email: email.to_string(),
    };
    let mut buf = Vec::new();
    req.encode(&mut buf).expect("failed to encode protobuf request");

    let mut metadata = MetadataMap::new();
    if let Some(token) = auth_token {
        metadata.insert("authorization", token.parse().expect("failed to parse authorization header"));
    }

    GrpcRequestData {
        service_name: "userservice.v1.UserService".to_string(),
        method_name: "CreateUser".to_string(),
        payload: Bytes::from(buf),
        metadata,
    }
}

#[tokio::test]
async fn test_with_helpers() {
    let handler = create_handler();

    let request = create_get_user_request(1);
    let response = handler.call(request).await.expect("handler call failed");

    let user = userservice::User::decode(response.payload).expect("failed to decode response payload");
    assert_eq!(user.id, 1);
}

Running Tests

# Run all tests
cargo test

# Run with output
cargo test -- --nocapture

# Run specific test
cargo test test_get_user_success

# Run tests matching pattern
cargo test test_get_user

# Run with coverage (requires cargo-tarpaulin)
cargo tarpaulin --out Html

Part 7: Streaming

Streaming Modes

Mode Definition Use Cases
Unary rpc Method(Request) returns (Response) CRUD operations, simple queries
Server Streaming rpc Method(Request) returns (stream Response) Large result sets, real-time updates
Client Streaming rpc Method(stream Request) returns (Response) File uploads, batch operations
Bidirectional rpc Method(stream Request) returns (stream Response) Chat, real-time collaboration

Support Status

Mode Python TypeScript Ruby PHP Rust
Unary Supported Supported Supported Supported Supported
Server Streaming Supported Supported Supported Supported Supported
Client Streaming Supported Supported Supported Supported Supported
Bidirectional Supported Supported Supported Supported Supported

Documentation

Streaming handler examples are available in the tabbed section above showing client streaming and bidirectional streaming patterns for Python, TypeScript, Ruby, and PHP.

Streaming Handler Implementations

Complete examples of client streaming and bidirectional streaming handlers:

Python gRPC Streaming Handlers

Complete Python implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.

Client Streaming Handler

Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.

Handler Signature

from typing import List
from spikard.grpc import GrpcClientStreamRequest, GrpcResponse

class GrpcClientStreamRequest:
    """Request object for client streaming RPC."""
    service_name: str
    method_name: str
    metadata: dict[str, str]
    messages: List[bytes]  # All client messages collected as list

async def handle_client_stream(
    request: GrpcClientStreamRequest,
) -> GrpcResponse:
    """Process all messages and return single response."""
    pass

Example: Batch Message Processing

from typing import List
from spikard.grpc import GrpcClientStreamRequest, GrpcResponse
import messageservice_pb2  # Generated from proto
from datetime import datetime
import uuid


async def handle_batch_create(
    request: GrpcClientStreamRequest,
) -> GrpcResponse:
    """
    Handle client streaming: receive multiple messages, send single response.

    Pattern: Collect all input messages, process together, return aggregated response.
    """
    messages = request.messages
    metadata = request.metadata

    # Validate authorization
    auth_token = metadata.get('authorization')
    if not auth_token:
        raise PermissionError('Authentication required')

    # Step 1: Deserialize all input messages
    items = []
    for i, msg in enumerate(messages):
        try:
            item = messageservice_pb2.Item()
            item.ParseFromString(msg)
            items.append(item)
        except Exception as err:
            raise ValueError(f'Failed to decode message {i}: {str(err)}')

    # Validate all items before processing
    for item in items:
        if not item.name or not item.value:
            raise ValueError('Each item must have name and value')

    # Step 2: Process all items atomically
    success_count = 0
    total_value = 0

    try:
        # Simulate database transaction
        async with database_transaction() as tx:
            for item in items:
                # Simulate creating item in database
                await tx.items.create(
                    name=item.name,
                    value=item.value,
                )
                success_count += 1
                total_value += item.value
    except Exception as err:
        raise RuntimeError(f'Transaction failed: {str(err)}')

    # Step 3: Build aggregate response
    response = messageservice_pb2.BatchCreateResponse()
    response.success_count = success_count
    response.total_value = total_value
    response.batch_id = str(uuid.uuid4())
    response.timestamp = datetime.utcnow().isoformat()

    # Step 4: Serialize and return
    return GrpcResponse(
        payload=response.SerializeToString(),
        metadata={
            'x-batch-id': response.batch_id,
            'x-count': str(success_count),
        },
    )


async def database_transaction():
    """Context manager for database transactions."""
    # Placeholder for actual database transaction logic
    class Transaction:
        class Items:
            async def create(self, name: str, value: int):
                pass

        def __init__(self):
            self.items = self.Items()

        async def __aenter__(self):
            return self

        async def __aexit__(self, exc_type, exc_val, exc_tb):
            pass

    return Transaction()

Bidirectional Streaming Handler

Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.

Handler Signature

from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse

class GrpcBidiStreamRequest:
    """Request object for bidirectional streaming RPC."""
    service_name: str
    method_name: str
    metadata: dict[str, str]
    messages: List[bytes]  # All input messages collected as list

class GrpcBidiStreamResponse:
    """Response object for bidirectional streaming RPC."""
    messages: List[bytes]  # Array of response messages
    metadata: dict[str, str]

async def handle_bidi_stream(
    request: GrpcBidiStreamRequest,
) -> GrpcBidiStreamResponse:
    """Process input messages and generate output messages."""
    pass

Example: Message Transformation Pipeline

from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
import transformservice_pb2  # Generated from proto
from datetime import datetime


async def handle_transform_stream(
    request: GrpcBidiStreamRequest,
) -> GrpcBidiStreamResponse:
    """
    Handle bidirectional streaming: receive multiple messages, send multiple messages.

    Pattern: Collect all input messages, process, generate output messages, return array.
    """
    messages = request.messages
    metadata = request.metadata

    # Step 1: Deserialize all input messages
    input_documents = []
    for index, msg in enumerate(messages):
        try:
            document = transformservice_pb2.Document()
            document.ParseFromString(msg)
            input_documents.append({
                'index': index,
                'document': document,
            })
        except Exception as err:
            raise ValueError(
                f'Failed to decode message {index}: {str(err)}'
            )

    # Step 2: Process each document and generate response
    output_messages: List[bytes] = []

    for item in input_documents:
        index = item['index']
        document = item['document']

        try:
            # Transform the document
            transformed = await transform_document(document)

            # Build response message
            result = transformservice_pb2.TransformResult()
            result.original_id = document.id
            result.transformed_content = transformed['content']
            result.transformed_at = datetime.utcnow().isoformat()
            result.status = 'SUCCESS' if transformed['success'] else 'PARTIAL'
            result.metadata['original_size'] = str(len(document.content))
            result.metadata['transformed_size'] = str(len(transformed['content']))

            # Serialize response message
            output_messages.append(result.SerializeToString())
        except Exception as err:
            # Create error response for this document
            error_result = transformservice_pb2.TransformResult()
            error_result.original_id = document.id
            error_result.status = 'ERROR'
            error_result.error_message = str(err)

            output_messages.append(error_result.SerializeToString())

    # Step 3: Return all response messages
    return GrpcBidiStreamResponse(
        messages=output_messages,
        metadata={
            'x-processed-count': str(len(output_messages)),
            'x-timestamp': datetime.utcnow().isoformat(),
        },
    )


async def transform_document(doc) -> dict:
    """Simulate document transformation."""
    # Simulate async transformation work
    return {
        'content': doc.content.upper(),
        'success': True,
    }

Advanced Example: Filtering and Aggregation

Bidirectional Stream with Filtering

from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
import recordservice_pb2  # Generated from proto


async def handle_filter_stream(
    request: GrpcBidiStreamRequest,
) -> GrpcBidiStreamResponse:
    """
    Filter records and apply transformations in bidirectional stream.

    Pattern: Filter input based on metadata criteria, transform, return filtered output.
    """
    messages = request.messages
    metadata = request.metadata

    # Parse filter criteria from metadata
    filter_type = metadata.get('x-filter-type', 'all')
    min_value = int(metadata.get('x-min-value', '0'))

    # Step 1: Deserialize and filter
    filtered_items = []

    for msg in messages:
        item = recordservice_pb2.Record()
        item.ParseFromString(msg)

        # Apply filter logic
        if (filter_type == 'all' or
            (filter_type == 'high-value' and item.value >= min_value)):
            filtered_items.append(item)

    # Step 2: Transform filtered items
    output_messages: List[bytes] = []

    for item in filtered_items:
        response = recordservice_pb2.ProcessedRecord()
        response.id = item.id
        response.original_value = item.value
        response.processed_value = item.value * 1.1  # Apply multiplier
        response.filtered = False

        output_messages.append(response.SerializeToString())

    # Step 3: Return response with statistics
    return GrpcBidiStreamResponse(
        messages=output_messages,
        metadata={
            'x-input-count': str(len(messages)),
            'x-output-count': str(len(output_messages)),
            'x-filtered-count': str(len(messages) - len(output_messages)),
        },
    )

Key Patterns

Message Collection

  • All client messages are collected in a single messages list
  • Messages are provided as bytes objects (protobuf serialized)
  • No streaming iteration needed - full list is provided
  • Order of messages is preserved

Processing Strategy

  1. Collect: All input messages received as list
  2. Validate: Check all messages before processing
  3. Transform: Process and generate output
  4. Serialize: Encode response messages as bytes
  5. Return: List of response message bytes

Error Handling

  • Raise Python exceptions with appropriate types:
  • ValueError maps to INVALID_ARGUMENT status
  • PermissionError maps to PERMISSION_DENIED status
  • NotImplementedError maps to UNIMPLEMENTED status
  • RuntimeError or other exceptions map to INTERNAL status
  • Errors in message deserialization should use ValueError
  • Processing errors should use RuntimeError or domain-specific exceptions
  • Per-message errors can be included in response messages (with ERROR status)

Metadata

  • Client streaming: Metadata passed in request, can be included in response
  • Bidirectional streaming: Metadata passed in request, can be included in response
  • Use metadata for non-payload information (timestamps, counts, filters)
  • Access with metadata.get(key) and provide defaults

Limits and Constraints

  • MAX_STREAM_MESSAGES: 10,000 messages per stream
  • Resource Exhaustion: Streams exceeding limit raise MemoryError
  • Memory: All messages collected in memory - appropriate for moderate message counts
  • Atomicity: All messages processed together (transaction-like semantics)

Testing Client Streaming

import pytest
from typing import List
from spikard.grpc import GrpcClientStreamRequest, GrpcResponse
from batch_handler import handle_batch_create
import messageservice_pb2 as pb


@pytest.mark.asyncio
async def test_batch_create_success():
    """Test processing batch of items."""
    # Create multiple input messages
    items = [
        pb.Item(name='Item 1', value=100),
        pb.Item(name='Item 2', value=200),
        pb.Item(name='Item 3', value=300),
    ]

    messages = [item.SerializeToString() for item in items]

    # Create request
    request = GrpcClientStreamRequest(
        service_name='myservice.MyService',
        method_name='BatchCreate',
        metadata={'authorization': 'Bearer token'},
        messages=messages,
    )

    # Call handler
    response = await handle_batch_create(request)

    # Verify response
    result = pb.BatchCreateResponse()
    result.ParseFromString(response.payload)

    assert result.success_count == 3
    assert result.total_value == 600
    assert 'x-batch-id' in response.metadata
    assert response.metadata['x-count'] == '3'


@pytest.mark.asyncio
async def test_batch_create_missing_authorization():
    """Test batch create without authorization."""
    items = [pb.Item(name='Item 1', value=100)]
    messages = [item.SerializeToString() for item in items]

    request = GrpcClientStreamRequest(
        service_name='myservice.MyService',
        method_name='BatchCreate',
        metadata={},  # No authorization
        messages=messages,
    )

    # Should raise PermissionError
    with pytest.raises(PermissionError, match='Authentication required'):
        await handle_batch_create(request)


@pytest.mark.asyncio
async def test_batch_create_invalid_item():
    """Test batch create with invalid item (missing required fields)."""
    # Item with missing value
    items = [pb.Item(name='Item 1')]  # No value
    messages = [item.SerializeToString() for item in items]

    request = GrpcClientStreamRequest(
        service_name='myservice.MyService',
        method_name='BatchCreate',
        metadata={'authorization': 'Bearer token'},
        messages=messages,
    )

    # Should raise ValueError
    with pytest.raises(ValueError, match='must have name and value'):
        await handle_batch_create(request)


@pytest.mark.asyncio
async def test_batch_create_malformed_message():
    """Test batch create with malformed protobuf."""
    messages = [b'invalid protobuf data']

    request = GrpcClientStreamRequest(
        service_name='myservice.MyService',
        method_name='BatchCreate',
        metadata={'authorization': 'Bearer token'},
        messages=messages,
    )

    # Should raise ValueError for decode error
    with pytest.raises(ValueError, match='Failed to decode message'):
        await handle_batch_create(request)

Testing Bidirectional Streaming

import pytest
from typing import List
from spikard.grpc import GrpcBidiStreamRequest, GrpcBidiStreamResponse
from transform_handler import handle_transform_stream
import transformservice_pb2 as pb


@pytest.mark.asyncio
async def test_transform_stream_success():
    """Test transforming multiple documents."""
    # Create input messages
    documents = [
        pb.Document(id=1, content='hello world'),
        pb.Document(id=2, content='goodbye world'),
    ]

    messages = [doc.SerializeToString() for doc in documents]

    # Create request
    request = GrpcBidiStreamRequest(
        service_name='myservice.MyService',
        method_name='TransformStream',
        metadata={},
        messages=messages,
    )

    # Call handler
    response = await handle_transform_stream(request)

    # Verify multiple response messages
    assert len(response.messages) == 2
    assert 'x-processed-count' in response.metadata
    assert response.metadata['x-processed-count'] == '2'

    # Verify each response
    for msg in response.messages:
        result = pb.TransformResult()
        result.ParseFromString(msg)
        assert result.status == 'SUCCESS'
        assert result.transformed_content  # Should have transformed content
        assert 'HELLO' in result.transformed_content or 'GOODBYE' in result.transformed_content


@pytest.mark.asyncio
async def test_transform_stream_partial_failure():
    """Test transform stream with some valid and some invalid documents."""
    documents = [
        pb.Document(id=1, content='valid content'),
        pb.Document(id=2, content='another valid'),
    ]

    messages = [doc.SerializeToString() for doc in documents]

    request = GrpcBidiStreamRequest(
        service_name='myservice.MyService',
        method_name='TransformStream',
        metadata={},
        messages=messages,
    )

    response = await handle_transform_stream(request)

    # All documents should get responses (some may be errors)
    assert len(response.messages) == 2

    # Check that we get both SUCCESS and possibly ERROR responses
    statuses = []
    for msg in response.messages:
        result = pb.TransformResult()
        result.ParseFromString(msg)
        statuses.append(result.status)

    assert 'SUCCESS' in statuses or 'ERROR' in statuses


@pytest.mark.asyncio
async def test_filter_stream_high_value():
    """Test filtering records by minimum value."""
    from filter_handler import handle_filter_stream

    records = [
        pb.Record(id=1, value=50),
        pb.Record(id=2, value=150),
        pb.Record(id=3, value=250),
    ]

    messages = [rec.SerializeToString() for rec in records]

    request = GrpcBidiStreamRequest(
        service_name='myservice.MyService',
        method_name='FilterStream',
        metadata={
            'x-filter-type': 'high-value',
            'x-min-value': '100',
        },
        messages=messages,
    )

    response = await handle_filter_stream(request)

    # Only records with value >= 100 should be returned
    assert len(response.messages) == 2
    assert response.metadata['x-output-count'] == '2'
    assert response.metadata['x-filtered-count'] == '1'

    # Verify output values
    for msg in response.messages:
        result = pb.ProcessedRecord()
        result.ParseFromString(msg)
        assert result.original_value >= 100


@pytest.mark.asyncio
async def test_filter_stream_all_records():
    """Test returning all records without filtering."""
    from filter_handler import handle_filter_stream

    records = [
        pb.Record(id=1, value=50),
        pb.Record(id=2, value=75),
        pb.Record(id=3, value=150),
    ]

    messages = [rec.SerializeToString() for rec in records]

    request = GrpcBidiStreamRequest(
        service_name='myservice.MyService',
        method_name='FilterStream',
        metadata={'x-filter-type': 'all'},
        messages=messages,
    )

    response = await handle_filter_stream(request)

    # All records should be returned
    assert len(response.messages) == 3
    assert response.metadata['x-output-count'] == '3'
    assert response.metadata['x-filtered-count'] == '0'


@pytest.mark.asyncio
async def test_filter_stream_malformed_input():
    """Test filter stream with malformed protobuf."""
    from filter_handler import handle_filter_stream

    messages = [b'invalid protobuf']

    request = GrpcBidiStreamRequest(
        service_name='myservice.MyService',
        method_name='FilterStream',
        metadata={'x-filter-type': 'all'},
        messages=messages,
    )

    # Should raise ValueError
    with pytest.raises(ValueError, match='Failed to decode'):
        await handle_filter_stream(request)

Running Tests

# Run all streaming tests
pytest test_streaming_handlers.py -v

# Run specific test
pytest test_streaming_handlers.py::test_batch_create_success -v

# Run with async support and verbose output
pytest test_streaming_handlers.py -v -k "batch" --asyncio-mode=auto

Comparison with Other Patterns

Aspect Client Streaming Bidirectional Unary
Input Multiple messages Multiple messages Single message
Output Single response Multiple messages Single response
Use case Batch operations Stream processing Simple requests
Message order Important Important N/A
Atomicity Full batch atomic Per-message or batch Single atomic

Common Pitfalls

1. Forgetting to Deserialize Messages

# WRONG: Using raw bytes
for msg in messages:
    print(msg)  # prints raw bytes

# CORRECT: Deserialize protobuf
for msg in messages:
    item = messageservice_pb2.Item()
    item.ParseFromString(msg)
    print(item.name)

2. Not Handling All Message Errors

# WRONG: Failing entire stream on first error
for msg in messages:
    item = messageservice_pb2.Item()
    item.ParseFromString(msg)  # May raise exception

# CORRECT: Handle per-message errors
for msg in messages:
    try:
        item = messageservice_pb2.Item()
        item.ParseFromString(msg)
    except Exception as e:
        # Generate error response for this message
        error_result = messageservice_pb2.ItemResult()
        error_result.status = 'ERROR'
        error_result.error_message = str(e)
        output_messages.append(error_result.SerializeToString())

3. Forgetting to Serialize Response Messages

# WRONG: Returning protobuf objects instead of bytes
response = pb.Item(name='test')
return GrpcBidiStreamResponse(messages=[response])  # Wrong!

# CORRECT: Serialize to bytes
response = pb.Item(name='test')
return GrpcBidiStreamResponse(
    messages=[response.SerializeToString()]  # Correct
)

4. Not Using Async Functions

# WRONG: Blocking operation in async handler
def handle_batch_create(request: GrpcClientStreamRequest) -> GrpcResponse:
    time.sleep(1)  # Blocks entire server!
    return response

# CORRECT: Use async/await
async def handle_batch_create(request: GrpcClientStreamRequest) -> GrpcResponse:
    await asyncio.sleep(1)  # Non-blocking
    return response

See the gRPC documentation for more examples.

TypeScript gRPC Streaming Handlers

The TypeScript package does not yet expose public streaming gRPC helper types.

Current public gRPC APIs in @spikard/node are:

  • GrpcHandler
  • GrpcResponse
  • GrpcError
  • GrpcStatusCode
  • GrpcService
  • createUnaryHandler(...)
  • createServiceHandler(...)

For the current TypeScript surface, use unary handlers and register them through GrpcService.

Ruby gRPC Streaming Handlers

Complete Ruby implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.

Client Streaming Handler

Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.

Handler Signature

class ClientStreamRequest
  attr_reader :service_name, :method_name, :metadata, :messages

  def initialize(service_name:, method_name:, metadata:, messages:)
    @service_name = service_name
    @method_name = method_name
    @metadata = metadata
    @messages = messages  # All client messages collected as array
  end
end

def handle_client_stream(request)
  # Process all messages and return single response
end

Example: Batch Message Processing

require 'spikard/grpc'
require 'messageservice_pb'

class BatchCreateHandler < Spikard::Grpc::Handler
  def initialize(database)
    @database = database
  end

  def handle_request(request)
    case request.method_name
    when 'BatchCreate'
      handle_batch_create(request)
    else
      raise "Unknown method: #{request.method_name}"
    end
  end

  private

  def handle_batch_create(request)
    # Validate authorization
    auth_token = request.metadata['authorization']
    unless auth_token
      raise SecurityError, 'Authentication required'
    end

    # Step 1: Deserialize all input messages
    items = request.messages.map.with_index do |msg, index|
      begin
        Messageservice::Item.decode(msg)
      rescue Google::Protobuf::ParseError => e
        raise ArgumentError, "Failed to decode message #{index}: #{e.message}"
      end
    end

    # Validate all items before processing
    items.each do |item|
      unless item.name && !item.name.empty? && item.value > 0
        raise ArgumentError, 'Each item must have name and positive value'
      end
    end

    # Step 2: Process all items atomically
    success_count = 0
    total_value = 0
    batch_id = SecureRandom.uuid

    begin
      # Begin transaction
      @database.transaction do
        items.each do |item|
          created = @database[:items].insert(
            name: item.name,
            value: item.value,
            batch_id: batch_id
          )
          success_count += 1
          total_value += item.value
        end
      end
    rescue StandardError => e
      raise StandardError, "Transaction failed: #{e.message}"
    end

    # Step 3: Build aggregate response
    response_proto = Messageservice::BatchCreateResponse.new(
      success_count: success_count,
      total_value: total_value,
      batch_id: batch_id,
      timestamp: Time.now.utc.iso8601
    )

    # Step 4: Serialize and return
    response = Spikard::Grpc::Response.new(
      payload: Messageservice::BatchCreateResponse.encode(response_proto)
    )
    response.metadata = {
      'x-batch-id' => batch_id,
      'x-count' => success_count.to_s
    }
    response
  rescue ArgumentError => e
    Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
  rescue SecurityError => e
    Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
  rescue StandardError => e
    Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
  end
end

Bidirectional Streaming Handler

Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.

Handler Signature

class BidiStreamRequest
  attr_reader :service_name, :method_name, :metadata, :messages

  def initialize(service_name:, method_name:, metadata:, messages:)
    @service_name = service_name
    @method_name = method_name
    @metadata = metadata
    @messages = messages  # All input messages collected as array
  end
end

class BidiStreamResponse
  attr_accessor :messages, :metadata

  def initialize(messages:, metadata: {})
    @messages = messages
    @metadata = metadata
  end
end

def handle_bidi_stream(request)
  # Process input messages and generate output messages
  # Return hash with :messages and :metadata keys
end

Example: Message Transformation Pipeline

require 'spikard/grpc'
require 'transformservice_pb'

class TransformStreamHandler < Spikard::Grpc::Handler
  def initialize(transformer_service)
    @transformer_service = transformer_service
  end

  def handle_request(request)
    case request.method_name
    when 'TransformStream'
      handle_transform_stream(request)
    else
      raise "Unknown method: #{request.method_name}"
    end
  end

  private

  def handle_transform_stream(request)
    # Step 1: Deserialize all input messages
    input_documents = request.messages.map.with_index do |msg, index|
      begin
        {
          index: index,
          document: Transformservice::Document.decode(msg)
        }
      rescue Google::Protobuf::ParseError => e
        raise ArgumentError, "Failed to decode message #{index}: #{e.message}"
      end
    end

    # Step 2: Process each document and generate response
    output_messages = []

    input_documents.each do |input|
      index = input[:index]
      document = input[:document]

      begin
        # Transform the document
        transformed = transform_document(document)

        # Build response message
        result = Transformservice::TransformResult.new(
          original_id: document.id,
          transformed_content: transformed[:content],
          transformed_at: Time.now.utc.iso8601,
          status: transformed[:success] ? 'SUCCESS' : 'PARTIAL',
          metadata: {
            'original_size' => document.content.length.to_s,
            'transformed_size' => transformed[:content].length.to_s
          }
        )

        # Serialize response message
        encoded = Transformservice::TransformResult.encode(result)
        output_messages << encoded
      rescue StandardError => e
        # Create error response for this document
        error_result = Transformservice::TransformResult.new(
          original_id: document.id,
          status: 'ERROR',
          error_message: e.message
        )

        encoded = Transformservice::TransformResult.encode(error_result)
        output_messages << encoded
      end
    end

    # Step 3: Return all response messages
    {
      messages: output_messages,
      metadata: {
        'x-processed-count' => output_messages.length.to_s,
        'x-timestamp' => Time.now.utc.iso8601
      }
    }
  rescue ArgumentError => e
    Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
  rescue StandardError => e
    Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
  end

  def transform_document(doc)
    # Simulate document transformation
    {
      content: doc.content.upcase,
      success: true
    }
  end
end

Advanced Example: Filtering and Aggregation

Bidirectional Stream with Filtering

class FilterStreamHandler < Spikard::Grpc::Handler
  def initialize(record_service)
    @record_service = record_service
  end

  def handle_request(request)
    case request.method_name
    when 'FilterStream'
      handle_filter_stream(request)
    else
      raise "Unknown method: #{request.method_name}"
    end
  end

  private

  def handle_filter_stream(request)
    # Parse filter criteria from metadata
    filter_type = request.metadata['x-filter-type'] || 'all'
    min_value = (request.metadata['x-min-value'] || '0').to_i

    # Step 1: Deserialize and filter
    filtered_items = request.messages.filter_map do |msg|
      item = Recordservice::Record.decode(msg)

      # Apply filter logic
      if filter_type == 'all' || (filter_type == 'high-value' && item.value >= min_value)
        item
      end
    end

    # Step 2: Transform filtered items
    output_messages = filtered_items.map do |item|
      response = Recordservice::ProcessedRecord.new(
        id: item.id,
        original_value: item.value,
        processed_value: item.value * 1.1,  # Apply multiplier
        filtered: false
      )

      Recordservice::ProcessedRecord.encode(response)
    end

    # Step 3: Return response with statistics
    {
      messages: output_messages,
      metadata: {
        'x-input-count' => request.messages.length.to_s,
        'x-output-count' => output_messages.length.to_s,
        'x-filtered-count' => (request.messages.length - output_messages.length).to_s
      }
    }
  rescue ArgumentError => e
    Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
  rescue StandardError => e
    Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
  end
end

Key Patterns

Message Collection

  • All client messages are collected in a single messages array via request.messages
  • Messages are binary strings (serialized protobuf)
  • No streaming iteration needed - full array is provided
  • Use .each.with_index or .map.with_index to process with index

Processing Strategy

  1. Collect: All input messages received as array via request.messages
  2. Validate: Check all messages before processing with .each or .filter_map
  3. Transform: Process and generate output using blocks
  4. Serialize: Encode response messages with protobuf .encode() method
  5. Return: Hash with :messages and :metadata keys for bidi; Response object for client stream

Error Handling

  • Use Ruby exceptions (ArgumentError, StandardError, custom errors) to signal failures
  • Catch exceptions in handle_request and convert to Spikard::Grpc::Response.error(message, status_code)
  • Errors in message deserialization should use INVALID_ARGUMENT status
  • Processing errors should use INTERNAL or domain-specific codes
  • Per-message errors can be included in response messages (with ERROR status)

Metadata

  • Client streaming: Metadata passed in request, can be included in response
  • Bidirectional streaming: Metadata passed in request, can be included in response
  • Use metadata for non-payload information (timestamps, counts, filters)
  • Access request metadata via request.metadata[key] returning String | nil

Limits and Constraints

  • MAX_STREAM_MESSAGES: 10,000 messages per stream
  • Resource Exhaustion: Streams exceeding limit return RESOURCE_EXHAUSTED error
  • Memory: All messages collected in memory - appropriate for moderate message counts
  • Atomicity: All messages processed together (transaction-like semantics)

Testing Client Streaming

require 'rspec'
require 'spikard/grpc'
require 'messageservice_pb'
require_relative '../lib/batch_create_handler'

RSpec.describe BatchCreateHandler do
  let(:database) { instance_double('Database') }
  let(:handler) { described_class.new(database) }

  describe '#handle_request' do
    context 'BatchCreate' do
      it 'should process batch of items' do
        # Create multiple input messages
        items = [
          Messageservice::Item.new(name: 'Item 1', value: 100),
          Messageservice::Item.new(name: 'Item 2', value: 200),
          Messageservice::Item.new(name: 'Item 3', value: 300)
        ]

        messages = items.map { |item| Messageservice::Item.encode(item) }

        # Setup database mock
        allow(database).to receive(:transaction).and_yield(database)
        allow(database).to receive(:[]).and_return(
          instance_double('Table', insert: true)
        )

        # Create request
        request = Spikard::Grpc::Request.new(
          service_name: 'messageservice.MessageService',
          method_name: 'BatchCreate',
          metadata: { 'authorization' => 'Bearer token' },
          payload: ''  # Not used in client streaming
        )
        request.instance_variable_set(:@messages, messages)

        # Call handler
        response = handler.handle_request(request)

        # Verify response
        result = Messageservice::BatchCreateResponse.decode(response.payload)
        expect(result.success_count).to eq(3)
        expect(result.total_value).to eq(600)
        expect(response.metadata['x-count']).to eq('3')
      end

      it 'returns error when authorization is missing' do
        items = [
          Messageservice::Item.new(name: 'Item 1', value: 100)
        ]
        messages = items.map { |item| Messageservice::Item.encode(item) }

        request = Spikard::Grpc::Request.new(
          service_name: 'messageservice.MessageService',
          method_name: 'BatchCreate',
          metadata: {},  # Missing authorization
          payload: ''
        )
        request.instance_variable_set(:@messages, messages)

        expect { handler.handle_request(request) }.to raise_error(
          SecurityError, /Authentication required/
        )
      end

      it 'validates all items have values' do
        items = [
          Messageservice::Item.new(name: 'Item 1', value: 100),
          Messageservice::Item.new(name: 'Item 2', value: 0)  # Invalid
        ]
        messages = items.map { |item| Messageservice::Item.encode(item) }

        request = Spikard::Grpc::Request.new(
          service_name: 'messageservice.MessageService',
          method_name: 'BatchCreate',
          metadata: { 'authorization' => 'Bearer token' },
          payload: ''
        )
        request.instance_variable_set(:@messages, messages)

        expect { handler.handle_request(request) }.to raise_error(
          ArgumentError, /positive value/
        )
      end
    end
  end
end

Testing Bidirectional Streaming

require 'rspec'
require 'spikard/grpc'
require 'transformservice_pb'
require_relative '../lib/transform_stream_handler'

RSpec.describe TransformStreamHandler do
  let(:transformer_service) { instance_double('TransformerService') }
  let(:handler) { described_class.new(transformer_service) }

  describe '#handle_request' do
    context 'TransformStream' do
      it 'should transform and return multiple messages' do
        # Create input messages
        documents = [
          Transformservice::Document.new(id: '1', content: 'hello'),
          Transformservice::Document.new(id: '2', content: 'world')
        ]

        messages = documents.map { |doc| Transformservice::Document.encode(doc) }

        # Create request
        request = Spikard::Grpc::Request.new(
          service_name: 'transformservice.TransformService',
          method_name: 'TransformStream',
          metadata: { 'authorization' => 'Bearer token' },
          payload: ''
        )
        request.instance_variable_set(:@messages, messages)

        # Call handler
        response = handler.handle_request(request)

        # Verify response
        expect(response).to include(:messages, :metadata)
        expect(response[:messages]).to have_length(2)

        response[:messages].each do |msg|
          result = Transformservice::TransformResult.decode(msg)
          expect(result.status).to eq('SUCCESS')
          expect(result.transformed_content).not_to be_empty
        end

        expect(response[:metadata]['x-processed-count']).to eq('2')
      end

      it 'handles per-message errors gracefully' do
        documents = [
          Transformservice::Document.new(id: '1', content: 'valid'),
          Transformservice::Document.new(id: '2', content: nil)  # Will fail
        ]

        messages = documents.map { |doc| Transformservice::Document.encode(doc) }

        request = Spikard::Grpc::Request.new(
          service_name: 'transformservice.TransformService',
          method_name: 'TransformStream',
          metadata: { 'authorization' => 'Bearer token' },
          payload: ''
        )
        request.instance_variable_set(:@messages, messages)

        # Call handler - should return response with error for second message
        response = handler.handle_request(request)

        expect(response[:messages]).to have_length(2)

        # First message should be SUCCESS
        result1 = Transformservice::TransformResult.decode(response[:messages][0])
        expect(result1.status).to eq('SUCCESS')

        # Second message should be ERROR
        result2 = Transformservice::TransformResult.decode(response[:messages][1])
        expect(result2.status).to eq('ERROR')
        expect(result2.error_message).not_to be_empty
      end

      it 'filters items based on metadata' do
        records = [
          Recordservice::Record.new(id: '1', value: 50),
          Recordservice::Record.new(id: '2', value: 100),
          Recordservice::Record.new(id: '3', value: 150)
        ]

        messages = records.map { |rec| Recordservice::Record.encode(rec) }

        request = Spikard::Grpc::Request.new(
          service_name: 'recordservice.RecordService',
          method_name: 'FilterStream',
          metadata: {
            'x-filter-type' => 'high-value',
            'x-min-value' => '100'
          },
          payload: ''
        )
        request.instance_variable_set(:@messages, messages)

        # Call handler
        response = handler.handle_request(request)

        # Only 2 items should pass filter (value >= 100)
        expect(response[:messages]).to have_length(2)
        expect(response[:metadata]['x-output-count']).to eq('2')
        expect(response[:metadata]['x-filtered-count']).to eq('1')
      end
    end
  end
end

Comparison with Other Patterns

Aspect Client Streaming Bidirectional Unary
Input Multiple messages Multiple messages Single message
Output Single response Multiple messages Single response
Use case Batch operations Stream processing Simple requests
Message order Important Important N/A
Atomicity Full batch atomic Per-message or batch Single atomic

Ruby-Specific Patterns

Using Enumerables Effectively

# Filter and transform in one pass
output_messages = request.messages.filter_map do |msg|
  item = decode_message(msg)
  next unless item.valid?  # Filters out invalid items
  encode_response(item)    # Transforms to response
end

# Using each_with_index for detailed indexing
request.messages.each_with_index do |msg, idx|
  process_with_position(msg, idx)
end

# Collect with error handling
results = request.messages.map do |msg|
  decode_and_process(msg)
rescue StandardError => e
  handle_error(e)
end

Exception Handling in Streaming

# Catch and convert exceptions to gRPC responses
def handle_request(request)
  case request.method_name
  when 'BatchCreate'
    handle_batch_create(request)
  else
    raise "Unknown method"
  end
rescue ArgumentError => e
  Spikard::Grpc::Response.error(e.message, 'INVALID_ARGUMENT')
rescue SecurityError => e
  Spikard::Grpc::Response.error(e.message, 'UNAUTHENTICATED')
rescue StandardError => e
  Spikard::Grpc::Response.error("Internal error: #{e.message}", 'INTERNAL')
end

Transaction Patterns with Blocks

begin
  @database.transaction do
    items.each do |item|
      @database[:items].insert(item.to_h)
    end
  end
rescue StandardError => e
  raise StandardError, "Transaction failed: #{e.message}"
end

See the gRPC documentation for more examples.

PHP gRPC Streaming Handlers

Complete PHP implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.

Client Streaming Handler

Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.

Handler Signature

<?php

declare(strict_types=1);

use Spikard\Grpc\ClientStreamRequest;
use Spikard\Grpc\Response;

/**
 * Client streaming handler signature
 */
final class ExampleClientStreamHandler
{
    public function handleClientStream(ClientStreamRequest $request): Response
    {
        // $request->serviceName: string
        // $request->methodName: string
        // $request->metadata: array<string, string>
        // $request->messages: string[]  // Array of serialized messages

        // Process all messages and return single response
    }
}

Example: Batch Message Processing

<?php

declare(strict_types=1);

use Spikard\Grpc\HandlerInterface;
use Spikard\Grpc\ClientStreamRequest;
use Spikard\Grpc\Response;
use Messageservice\Item;
use Messageservice\BatchCreateResponse;

class MessageServiceHandler implements HandlerInterface
{
    public function __construct(
        private MessageRepository $repository,
    ) {}

    /**
     * Handle client streaming: receive multiple messages, send single response
     *
     * Pattern: Collect all input messages, process together, return aggregated response
     */
    public function handleClientStream(ClientStreamRequest $request): Response
    {
        try {
            // Step 1: Validate authorization
            $authToken = $request->getMetadata('authorization');
            if (!$authToken) {
                return Response::error(
                    'Authentication required',
                    'UNAUTHENTICATED'
                );
            }

            // Step 2: Deserialize all input messages
            $items = [];
            $successCount = 0;
            $totalValue = 0;

            foreach ($request->messages as $index => $messagePayload) {
                try {
                    $item = new Item();
                    $item->mergeFromString($messagePayload);

                    // Validate item
                    if (empty($item->getName()) || $item->getValue() <= 0) {
                        return Response::error(
                            "Invalid item at index {$index}: name and positive value required"
                        );
                    }

                    $items[] = $item;
                } catch (\Exception $e) {
                    return Response::error(
                        "Failed to decode message at index {$index}: {$e->getMessage()}"
                    );
                }
            }

            if (empty($items)) {
                return Response::error('At least one item is required');
            }

            // Step 3: Process all items atomically
            try {
                $this->repository->transaction(function () use ($items, &$successCount, &$totalValue): void {
                    foreach ($items as $item) {
                        $this->repository->create(
                            name: $item->getName(),
                            value: $item->getValue()
                        );
                        $successCount++;
                        $totalValue += $item->getValue();
                    }
                });
            } catch (\Exception $e) {
                return Response::error("Transaction failed: {$e->getMessage()}");
            }

            // Step 4: Build aggregate response
            $batchId = bin2hex(random_bytes(8));
            $response = new BatchCreateResponse();
            $response->setSuccessCount($successCount);
            $response->setTotalValue($totalValue);
            $response->setBatchId($batchId);
            $response->setTimestamp((new \DateTime())->format('c'));

            // Step 5: Serialize and return
            return new Response(
                payload: $response->serializeToString(),
                metadata: [
                    'x-batch-id' => $batchId,
                    'x-count' => (string)$successCount,
                ]
            );

        } catch (\Exception $e) {
            return Response::error("Error: {$e->getMessage()}");
        }
    }

    public function handleRequest(\Spikard\Grpc\Request $request): Response
    {
        // Route unary requests
        return match ($request->methodName) {
            default => Response::error("Unknown method: {$request->methodName}"),
        };
    }
}

Bidirectional Streaming Handler

Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.

Handler Signature

<?php

declare(strict_types=1);

use Spikard\Grpc\BidiStreamRequest;
use Spikard\Grpc\BidiStreamResponse;

/**
 * Bidirectional streaming handler signature
 */
final class ExampleBidiStreamHandler
{
    public function handleBidiStream(BidiStreamRequest $request): BidiStreamResponse
    {
        // $request->serviceName: string
        // $request->methodName: string
        // $request->metadata: array<string, string>
        // $request->messages: string[]  // Array of serialized input messages

        // Process input messages and generate output messages
        // return new BidiStreamResponse(
        //     messages: string[],        // Array of serialized response messages
        //     metadata: array<string, string>
        // );
    }
}

Example: Message Transformation Pipeline

<?php

declare(strict_types=1);

use Spikard\Grpc\HandlerInterface;
use Spikard\Grpc\BidiStreamRequest;
use Spikard\Grpc\BidiStreamResponse;
use Spikard\Grpc\Request;
use Spikard\Grpc\Response;
use Transformservice\Document;
use Transformservice\TransformResult;

class TransformServiceHandler implements HandlerInterface
{
    public function __construct(
        private DocumentTransformer $transformer,
    ) {}

    /**
     * Handle bidirectional streaming: receive multiple messages, send multiple messages
     *
     * Pattern: Collect all input messages, process, generate output messages, return array
     */
    public function handleBidiStream(BidiStreamRequest $request): BidiStreamResponse
    {
        try {
            // Step 1: Deserialize all input messages
            $inputDocuments = [];

            foreach ($request->messages as $index => $messagePayload) {
                try {
                    $document = new Document();
                    $document->mergeFromString($messagePayload);

                    $inputDocuments[] = [
                        'index' => $index,
                        'document' => $document,
                    ];
                } catch (\Exception $e) {
                    // Create error response for this document
                    $errorResult = new TransformResult();
                    $errorResult->setStatus('ERROR');
                    $errorResult->setErrorMessage(
                        "Failed to decode message at index {$index}: {$e->getMessage()}"
                    );

                    // Still return partial response
                    return new BidiStreamResponse(
                        messages: [$errorResult->serializeToString()],
                        metadata: [
                            'x-error-at-index' => (string)$index,
                        ]
                    );
                }
            }

            if (empty($inputDocuments)) {
                return new BidiStreamResponse(
                    messages: [],
                    metadata: ['x-processed-count' => '0']
                );
            }

            // Step 2: Process each document and generate response
            $outputMessages = [];
            $processedCount = 0;
            $errorCount = 0;

            foreach ($inputDocuments as $input) {
                try {
                    $document = $input['document'];

                    // Transform the document
                    $transformedContent = $this->transformer->transform(
                        $document->getContent()
                    );

                    // Build response message
                    $result = new TransformResult();
                    $result->setOriginalId($document->getId());
                    $result->setTransformedContent($transformedContent);
                    $result->setTransformedAt((new \DateTime())->format('c'));
                    $result->setStatus('SUCCESS');

                    // Add metadata about transformation
                    $metadata = new \Google\Protobuf\StringValue();
                    $metadata->setValue(json_encode([
                        'original_size' => strlen($document->getContent()),
                        'transformed_size' => strlen($transformedContent),
                    ], JSON_THROW_ON_ERROR));
                    $result->setMetadata($metadata);

                    // Serialize and collect response
                    $outputMessages[] = $result->serializeToString();
                    $processedCount++;

                } catch (\Exception $e) {
                    // Create error response for this document
                    $errorResult = new TransformResult();
                    $errorResult->setOriginalId($input['document']->getId());
                    $errorResult->setStatus('ERROR');
                    $errorResult->setErrorMessage($e->getMessage());

                    $outputMessages[] = $errorResult->serializeToString();
                    $errorCount++;
                }
            }

            // Step 3: Return all response messages
            return new BidiStreamResponse(
                messages: $outputMessages,
                metadata: [
                    'x-processed-count' => (string)$processedCount,
                    'x-error-count' => (string)$errorCount,
                    'x-timestamp' => (new \DateTime())->format('c'),
                ]
            );

        } catch (\Exception $e) {
            // Return error response
            $errorResult = new TransformResult();
            $errorResult->setStatus('ERROR');
            $errorResult->setErrorMessage("Stream processing failed: {$e->getMessage()}");

            return new BidiStreamResponse(
                messages: [$errorResult->serializeToString()],
                metadata: ['x-error' => 'true']
            );
        }
    }

    public function handleRequest(Request $request): Response
    {
        // Route unary requests
        return Response::error("Unknown method: {$request->methodName}");
    }
}

Advanced Example: Filtering and Aggregation

Bidirectional Stream with Filtering

<?php

declare(strict_types=1);

use Spikard\Grpc\BidiStreamRequest;
use Spikard\Grpc\BidiStreamResponse;
use Recordservice\Record;
use Recordservice\ProcessedRecord;

class RecordServiceHandler
{
    public function handleBidiStream(BidiStreamRequest $request): BidiStreamResponse
    {
        try {
            // Parse filter criteria from metadata
            $filterType = $request->getMetadata('x-filter-type') ?? 'all';
            $minValue = (int)($request->getMetadata('x-min-value') ?? '0');
            $maxValue = (int)($request->getMetadata('x-max-value') ?? PHP_INT_MAX);

            // Step 1: Deserialize and filter
            $filteredRecords = [];
            $inputCount = count($request->messages);

            foreach ($request->messages as $messagePayload) {
                $record = new Record();
                $record->mergeFromString($messagePayload);

                // Apply filter logic
                $value = $record->getValue();

                if ($this->matchesFilter($record, $filterType, $minValue, $maxValue)) {
                    $filteredRecords[] = $record;
                }
            }

            // Step 2: Transform filtered records
            $outputMessages = [];

            foreach ($filteredRecords as $record) {
                $processed = new ProcessedRecord();
                $processed->setId($record->getId());
                $processed->setOriginalValue($record->getValue());

                // Apply business logic: apply multiplier
                $multiplier = $this->calculateMultiplier($record);
                $processed->setProcessedValue($record->getValue() * $multiplier);
                $processed->setFiltered(false);
                $processed->setProcessedAt((new \DateTime())->format('c'));

                $outputMessages[] = $processed->serializeToString();
            }

            // Step 3: Return response with statistics
            return new BidiStreamResponse(
                messages: $outputMessages,
                metadata: [
                    'x-input-count' => (string)$inputCount,
                    'x-output-count' => (string)count($outputMessages),
                    'x-filtered-count' => (string)($inputCount - count($outputMessages)),
                    'x-filter-type' => $filterType,
                ]
            );

        } catch (\Exception $e) {
            $errorResult = new ProcessedRecord();
            $errorResult->setId('error');

            return new BidiStreamResponse(
                messages: [$errorResult->serializeToString()],
                metadata: ['x-error' => $e->getMessage()]
            );
        }
    }

    private function matchesFilter(
        Record $record,
        string $filterType,
        int $minValue,
        int $maxValue
    ): bool {
        return match ($filterType) {
            'all' => true,
            'high-value' => $record->getValue() >= $minValue && $record->getValue() <= $maxValue,
            'range' => $record->getValue() >= $minValue && $record->getValue() <= $maxValue,
            'low-value' => $record->getValue() < $minValue,
            default => true,
        };
    }

    private function calculateMultiplier(Record $record): float
    {
        return match (true) {
            $record->getValue() > 1000 => 1.05,
            $record->getValue() > 500 => 1.03,
            default => 1.01,
        };
    }
}

Key Patterns

Message Collection

  • All client messages are collected in the messages array property
  • Messages are already serialized as binary strings (use mergeFromString() to deserialize)
  • No streaming iteration needed - full array is provided

Processing Strategy

  1. Collect: All input messages received as array
  2. Validate: Check all messages before processing
  3. Transform: Process and generate output
  4. Serialize: Encode response messages with serializeToString()
  5. Return: Array of response messages in response object

Error Handling

  • Return Response::error() for fatal errors that abort the stream
  • Per-message errors can be included in response messages (with ERROR status)
  • Deserialization errors should return error response immediately
  • Processing errors can return partial results (successful messages + error messages)

Metadata

  • Client streaming: Metadata passed in request, can be included in response
  • Bidirectional streaming: Metadata passed in request, can be included in response
  • Use metadata for non-payload information (timestamps, counts, filters)

Limits and Constraints

  • MAX_STREAM_MESSAGES: 10,000 messages per stream
  • Resource Exhaustion: Streams exceeding limit return error response
  • Memory: All messages collected in memory - appropriate for moderate message counts
  • Atomicity: All messages processed together (transaction-like semantics for client streaming)

Testing Client Streaming

<?php

declare(strict_types=1);

namespace Tests;

use PHPUnit\Framework\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use Spikard\Grpc\ClientStreamRequest;
use Messageservice\Item;
use Messageservice\BatchCreateResponse;

class ClientStreamHandlerTest extends TestCase
{
    private MessageServiceHandler $handler;
    private MockObject $repository;

    protected function setUp(): void
    {
        $this->repository = $this->createMock(MessageRepository::class);
        $this->handler = new MessageServiceHandler($this->repository);
    }

    public function testClientStreamBatchCreate(): void
    {
        // Setup mock
        $this->repository
            ->expects($this->once())
            ->method('transaction')
            ->willReturnCallback(function (callable $callback): void {
                $callback();
            });

        $this->repository
            ->expects($this->exactly(3))
            ->method('create')
            ->willReturn(true);

        // Create multiple input messages
        $items = [
            $this->createItem('Item 1', 100),
            $this->createItem('Item 2', 200),
            $this->createItem('Item 3', 300),
        ];

        $messages = array_map(
            fn(Item $item) => $item->serializeToString(),
            $items
        );

        // Create request
        $request = new ClientStreamRequest(
            serviceName: 'messageservice.MessageService',
            methodName: 'BatchCreate',
            messages: $messages,
            metadata: ['authorization' => 'Bearer token']
        );

        // Call handler
        $response = $this->handler->handleClientStream($request);

        // Verify response
        $this->assertFalse($response->isError());

        $result = new BatchCreateResponse();
        $result->mergeFromString($response->payload);

        $this->assertEquals(3, $result->getSuccessCount());
        $this->assertEquals(600, $result->getTotalValue());
        $this->assertNotEmpty($result->getBatchId());
        $this->assertEquals('3', $response->getMetadata('x-count'));
    }

    public function testClientStreamRequiresAuthentication(): void
    {
        // Create request without auth
        $item = $this->createItem('Item 1', 100);

        $request = new ClientStreamRequest(
            serviceName: 'messageservice.MessageService',
            methodName: 'BatchCreate',
            messages: [$item->serializeToString()],
            metadata: []  // No authorization
        );

        // Call handler
        $response = $this->handler->handleClientStream($request);

        // Should return auth error
        $this->assertTrue($response->isError());
        $this->assertStringContainsString('Authentication required', $response->errorMessage);
        $this->assertEquals('16', $response->getMetadata('grpc-status'));
    }

    public function testClientStreamValidatesItems(): void
    {
        // Create item with invalid value
        $item = $this->createItem('Item 1', -100);

        $request = new ClientStreamRequest(
            serviceName: 'messageservice.MessageService',
            methodName: 'BatchCreate',
            messages: [$item->serializeToString()],
            metadata: ['authorization' => 'Bearer token']
        );

        // Call handler
        $response = $this->handler->handleClientStream($request);

        // Should return validation error
        $this->assertTrue($response->isError());
        $this->assertStringContainsString('positive value required', $response->errorMessage);
    }

    public function testClientStreamHandlesDeserializationError(): void
    {
        $request = new ClientStreamRequest(
            serviceName: 'messageservice.MessageService',
            methodName: 'BatchCreate',
            messages: ['invalid protobuf data'],
            metadata: ['authorization' => 'Bearer token']
        );

        // Call handler
        $response = $this->handler->handleClientStream($request);

        // Should return deserialization error
        $this->assertTrue($response->isError());
        $this->assertStringContainsString('Failed to decode', $response->errorMessage);
    }

    private function createItem(string $name, int $value): Item
    {
        $item = new Item();
        $item->setName($name);
        $item->setValue($value);
        return $item;
    }
}

Testing Bidirectional Streaming

<?php

declare(strict_types=1);

namespace Tests;

use PHPUnit\Framework\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use Spikard\Grpc\BidiStreamRequest;
use Transformservice\Document;
use Transformservice\TransformResult;

class BidiStreamHandlerTest extends TestCase
{
    private TransformServiceHandler $handler;
    private MockObject $transformer;

    protected function setUp(): void
    {
        $this->transformer = $this->createMock(DocumentTransformer::class);
        $this->handler = new TransformServiceHandler($this->transformer);
    }

    public function testBidiStreamTransformMultipleDocuments(): void
    {
        // Setup mock transformer
        $this->transformer
            ->method('transform')
            ->willReturnCallback(fn(string $content) => strtoupper($content));

        // Create input documents
        $documents = [
            $this->createDocument(1, 'hello world'),
            $this->createDocument(2, 'foo bar'),
            $this->createDocument(3, 'test content'),
        ];

        $messages = array_map(
            fn(Document $doc) => $doc->serializeToString(),
            $documents
        );

        // Create request
        $request = new BidiStreamRequest(
            serviceName: 'transformservice.TransformService',
            methodName: 'TransformStream',
            messages: $messages,
            metadata: ['authorization' => 'Bearer token']
        );

        // Call handler
        $response = $this->handler->handleBidiStream($request);

        // Verify response contains multiple messages
        $this->assertCount(3, $response->messages);

        // Verify each response message
        foreach ($response->messages as $index => $messagePayload) {
            $result = new TransformResult();
            $result->mergeFromString($messagePayload);

            $this->assertEquals('SUCCESS', $result->getStatus());
            $this->assertNotEmpty($result->getTransformedContent());
            $this->assertEquals((string)($index + 1), $result->getOriginalId());
        }

        // Verify metadata
        $this->assertEquals('3', $response->getMetadata('x-processed-count'));
        $this->assertEquals('0', $response->getMetadata('x-error-count'));
    }

    public function testBidiStreamPartialErrors(): void
    {
        // Setup transformer to fail on specific content
        $this->transformer
            ->method('transform')
            ->willReturnCallback(function (string $content) {
                if ($content === 'error') {
                    throw new \Exception('Transform failed');
                }
                return strtoupper($content);
            });

        // Create documents, one will fail
        $documents = [
            $this->createDocument(1, 'success'),
            $this->createDocument(2, 'error'),
            $this->createDocument(3, 'success'),
        ];

        $messages = array_map(
            fn(Document $doc) => $doc->serializeToString(),
            $documents
        );

        $request = new BidiStreamRequest(
            serviceName: 'transformservice.TransformService',
            methodName: 'TransformStream',
            messages: $messages,
            metadata: []
        );

        // Call handler
        $response = $this->handler->handleBidiStream($request);

        // Should have response for all documents
        $this->assertCount(3, $response->messages);

        // Check results
        $results = [];
        foreach ($response->messages as $messagePayload) {
            $result = new TransformResult();
            $result->mergeFromString($messagePayload);
            $results[] = $result->getStatus();
        }

        $this->assertContains('SUCCESS', $results);
        $this->assertContains('ERROR', $results);

        // Verify error count in metadata
        $this->assertEquals('1', $response->getMetadata('x-error-count'));
    }

    public function testBidiStreamEmptyInput(): void
    {
        $request = new BidiStreamRequest(
            serviceName: 'transformservice.TransformService',
            methodName: 'TransformStream',
            messages: [],
            metadata: []
        );

        // Call handler
        $response = $this->handler->handleBidiStream($request);

        // Should return empty messages
        $this->assertEmpty($response->messages);
        $this->assertEquals('0', $response->getMetadata('x-processed-count'));
    }

    private function createDocument(int $id, string $content): Document
    {
        $document = new Document();
        $document->setId((string)$id);
        $document->setContent($content);
        return $document;
    }
}

Testing Filtering and Aggregation

<?php

declare(strict_types=1);

namespace Tests;

use PHPUnit\Framework\TestCase;
use Spikard\Grpc\BidiStreamRequest;
use Recordservice\Record;
use Recordservice\ProcessedRecord;

class FilterStreamHandlerTest extends TestCase
{
    private RecordServiceHandler $handler;

    protected function setUp(): void
    {
        $this->handler = new RecordServiceHandler();
    }

    /**
     * @dataProvider filterTypeProvider
     */
    public function testBidiStreamWithFilters(
        string $filterType,
        int $minValue,
        int $expectedOutputCount
    ): void {
        // Create records with various values
        $records = [
            $this->createRecord('1', 100),
            $this->createRecord('2', 500),
            $this->createRecord('3', 1500),
            $this->createRecord('4', 2500),
        ];

        $messages = array_map(
            fn(Record $record) => $record->serializeToString(),
            $records
        );

        $request = new BidiStreamRequest(
            serviceName: 'recordservice.RecordService',
            methodName: 'FilterStream',
            messages: $messages,
            metadata: [
                'x-filter-type' => $filterType,
                'x-min-value' => (string)$minValue,
            ]
        );

        // Call handler
        $response = $this->handler->handleBidiStream($request);

        // Verify filtered count
        $this->assertCount($expectedOutputCount, $response->messages);
        $this->assertEquals(
            (string)$expectedOutputCount,
            $response->getMetadata('x-output-count')
        );
    }

    public static function filterTypeProvider(): array
    {
        return [
            'all records' => ['all', 0, 4],
            'high value (>=500)' => ['high-value', 500, 3],
            'range (>=1000, <=2000)' => ['range', 1000, 1],
            'low value (<500)' => ['low-value', 500, 1],
        ];
    }

    public function testBidiStreamAppliesMultiplier(): void
    {
        $record = $this->createRecord('1', 1500);

        $request = new BidiStreamRequest(
            serviceName: 'recordservice.RecordService',
            methodName: 'FilterStream',
            messages: [$record->serializeToString()],
            metadata: ['x-filter-type' => 'all']
        );

        $response = $this->handler->handleBidiStream($request);

        // Parse response
        $result = new ProcessedRecord();
        $result->mergeFromString($response->messages[0]);

        // Verify multiplier applied (1500 >= 1000, so multiplier = 1.05)
        $expectedValue = 1500 * 1.05;
        $this->assertEquals($expectedValue, $result->getProcessedValue());
    }

    private function createRecord(string $id, int $value): Record
    {
        $record = new Record();
        $record->setId($id);
        $record->setValue($value);
        return $record;
    }
}

Comparison with Other Patterns

Aspect Client Streaming Bidirectional Unary
Input Multiple messages Multiple messages Single message
Output Single response Multiple messages Single response
Use case Batch operations Stream processing Simple requests
Message order Important Important N/A
Atomicity Full batch atomic Per-message or batch Single atomic
PHP Pattern handleClientStream() handleBidiStream() handleRequest()

Key PHP-Specific Patterns

Using serializeToString() and mergeFromString()

  • Always use mergeFromString() to deserialize protobuf messages
  • Always use serializeToString() to serialize protobuf responses
  • Never use parse() for deserialization in handlers

Error Handling

  • Prefer returning error responses over throwing exceptions
  • Use try-catch to handle deserialization and business logic errors
  • Include context in error messages (index, original value, etc.)

Metadata Access

  • Use getMetadata(string $key) to safely access metadata with null coalescing
  • Return metadata in response for logging and debugging
  • Use standard header names (e.g., 'x-batch-id', 'x-count')

Type Hints

  • Use strict type hints on all function parameters
  • Return types must be Response or BidiStreamResponse
  • Use declare(strict_types=1) in all files

See the gRPC documentation for more examples.

Rust gRPC Streaming Handlers

Complete Rust implementation examples for client streaming and bidirectional streaming gRPC handlers in Spikard.

Client Streaming Handler

Client streaming RPC allows a client to send multiple messages and then the server responds with a single message.

Handler Signature

use bytes::Bytes;
use spikard_http::grpc::{GrpcClientStreamRequest, GrpcResponse};
use tonic::metadata::MetadataMap;

pub struct GrpcClientStreamRequest {
    /// Service name (e.g., "myservice.MyService")
    pub service_name: String,
    /// Method name (e.g., "BatchCreate")
    pub method_name: String,
    /// Request metadata from client
    pub metadata: MetadataMap,
    /// All client messages collected as Vec
    pub messages: Vec<Bytes>,
}

async fn handle_client_stream(
    request: GrpcClientStreamRequest,
) -> Result<GrpcResponse, tonic::Status> {
    // Process all messages and return single response
}

Example: Batch Message Processing

use bytes::Bytes;
use prost::Message;
use spikard_http::grpc::{GrpcClientStreamRequest, GrpcResponse};
use tonic::{Status, metadata::MetadataMap};
use std::sync::Arc;

// Generated protobuf types
mod messageservice {
    include!("messageservice.rs");  // Generated by prost
}

/// Handle client streaming: receive multiple messages, send single response.
///
/// Pattern: Collect all input messages, process together, return aggregated response.
pub async fn handle_batch_create(
    request: GrpcClientStreamRequest,
    repository: Arc<dyn ItemRepository + Send + Sync>,
) -> Result<GrpcResponse, Status> {
    let messages = request.messages;
    let metadata = request.metadata;

    // Validate authorization
    let auth_token = metadata
        .get("authorization")
        .and_then(|v| v.to_str().ok());

    if auth_token.is_none() {
        return Err(Status::unauthenticated("Authentication required"));
    }

    // Step 1: Deserialize all input messages
    let mut items = Vec::with_capacity(messages.len());
    for (index, msg) in messages.iter().enumerate() {
        let item = messageservice::Item::decode(msg.clone())
            .map_err(|e| {
                Status::invalid_argument(format!("Failed to decode message {}: {}", index, e))
            })?;
        items.push(item);
    }

    // Validate all items before processing
    for item in &items {
        if item.name.is_empty() || item.value == 0 {
            return Err(Status::invalid_argument(
                "Each item must have name and value"
            ));
        }
    }

    // Step 2: Process all items atomically
    let mut success_count = 0u32;
    let mut total_value = 0i32;

    // Simulate database transaction
    repository.with_transaction(|tx| async move {
        for item in items {
            tx.items().create(&item.name, item.value).await
                .map_err(|e| Status::internal(format!("Transaction failed: {}", e)))?;
            success_count += 1;
            total_value += item.value;
        }
        Ok::<_, Status>(())
    }).await?;

    // Step 3: Build aggregate response
    let batch_id = uuid::Uuid::new_v4().to_string();
    let timestamp = chrono::Utc::now().to_rfc3339();

    let response = messageservice::BatchCreateResponse {
        success_count,
        total_value,
        batch_id: batch_id.clone(),
        timestamp,
    };

    // Step 4: Serialize and return
    let mut buf = Vec::new();
    response.encode(&mut buf)
        .map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;

    let mut response_metadata = MetadataMap::new();
    response_metadata.insert(
        "x-batch-id",
        batch_id.parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );
    response_metadata.insert(
        "x-count",
        success_count.to_string().parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );

    Ok(GrpcResponse {
        payload: Bytes::from(buf),
        metadata: response_metadata,
    })
}

// Mock trait for demonstration
#[async_trait::async_trait]
pub trait ItemRepository {
    async fn with_transaction<F, Fut, T>(&self, f: F) -> Result<T, Box<dyn std::error::Error>>
    where
        F: FnOnce(&dyn Transaction) -> Fut + Send,
        Fut: std::future::Future<Output = Result<T, Status>> + Send;
}

#[async_trait::async_trait]
pub trait Transaction: Send + Sync {
    fn items(&self) -> &dyn ItemStore;
}

#[async_trait::async_trait]
pub trait ItemStore: Send + Sync {
    async fn create(&self, name: &str, value: i32) -> Result<(), Box<dyn std::error::Error>>;
}

Bidirectional Streaming Handler

Bidirectional streaming RPC allows the client to send multiple messages and the server to send multiple messages back.

Handler Signature

use bytes::Bytes;
use spikard_http::grpc::{GrpcBidiStreamRequest, GrpcBidiStreamResponse};
use tonic::metadata::MetadataMap;

pub struct GrpcBidiStreamRequest {
    /// Service name (e.g., "myservice.MyService")
    pub service_name: String,
    /// Method name (e.g., "TransformStream")
    pub method_name: String,
    /// Request metadata from client
    pub metadata: MetadataMap,
    /// All input messages collected as Vec
    pub messages: Vec<Bytes>,
}

pub struct GrpcBidiStreamResponse {
    /// Array of response messages
    pub messages: Vec<Bytes>,
    /// Response metadata
    pub metadata: MetadataMap,
}

async fn handle_bidi_stream(
    request: GrpcBidiStreamRequest,
) -> Result<GrpcBidiStreamResponse, tonic::Status> {
    // Process input messages and generate output messages
}

Example: Message Transformation Pipeline

use bytes::Bytes;
use prost::Message;
use spikard_http::grpc::{GrpcBidiStreamRequest, GrpcBidiStreamResponse};
use tonic::{Status, metadata::MetadataMap};

// Generated protobuf types
mod transformservice {
    include!("transformservice.rs");  // Generated by prost
}

/// Handle bidirectional streaming: receive multiple messages, send multiple messages.
///
/// Pattern: Collect all input messages, process, generate output messages, return Vec.
pub async fn handle_transform_stream(
    request: GrpcBidiStreamRequest,
) -> Result<GrpcBidiStreamResponse, Status> {
    let messages = request.messages;
    let _metadata = request.metadata;

    // Step 1: Deserialize all input messages
    let mut input_documents = Vec::with_capacity(messages.len());
    for (index, msg) in messages.iter().enumerate() {
        let document = transformservice::Document::decode(msg.clone())
            .map_err(|e| {
                Status::invalid_argument(format!("Failed to decode message {}: {}", index, e))
            })?;
        input_documents.push((index, document));
    }

    // Step 2: Process each document and generate response
    let mut output_messages = Vec::with_capacity(input_documents.len());

    for (index, document) in input_documents {
        match transform_document(&document).await {
            Ok(transformed) => {
                // Build response message
                let mut metadata_map = std::collections::HashMap::new();
                metadata_map.insert(
                    "original_size".to_string(),
                    document.content.len().to_string(),
                );
                metadata_map.insert(
                    "transformed_size".to_string(),
                    transformed.content.len().to_string(),
                );

                let result = transformservice::TransformResult {
                    original_id: document.id,
                    transformed_content: transformed.content,
                    transformed_at: chrono::Utc::now().to_rfc3339(),
                    status: if transformed.success { "SUCCESS" } else { "PARTIAL" }.to_string(),
                    metadata: metadata_map,
                    error_message: String::new(),
                };

                // Serialize response message
                let mut buf = Vec::new();
                result.encode(&mut buf)
                    .map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
                output_messages.push(Bytes::from(buf));
            }
            Err(err) => {
                // Create error response for this document
                let error_result = transformservice::TransformResult {
                    original_id: document.id,
                    transformed_content: String::new(),
                    transformed_at: String::new(),
                    status: "ERROR".to_string(),
                    metadata: std::collections::HashMap::new(),
                    error_message: err.to_string(),
                };

                let mut buf = Vec::new();
                error_result.encode(&mut buf)
                    .map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
                output_messages.push(Bytes::from(buf));
            }
        }
    }

    // Step 3: Return all response messages
    let mut response_metadata = MetadataMap::new();
    response_metadata.insert(
        "x-processed-count",
        output_messages.len().to_string().parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );
    response_metadata.insert(
        "x-timestamp",
        chrono::Utc::now().to_rfc3339().parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );

    Ok(GrpcBidiStreamResponse {
        messages: output_messages,
        metadata: response_metadata,
    })
}

struct TransformResult {
    content: String,
    success: bool,
}

async fn transform_document(doc: &transformservice::Document) -> Result<TransformResult, Box<dyn std::error::Error>> {
    // Simulate async transformation work
    Ok(TransformResult {
        content: doc.content.to_uppercase(),
        success: true,
    })
}

Advanced Example: Filtering and Aggregation

Bidirectional Stream with Filtering

use bytes::Bytes;
use prost::Message;
use spikard_http::grpc::{GrpcBidiStreamRequest, GrpcBidiStreamResponse};
use tonic::{Status, metadata::MetadataMap};

// Generated protobuf types
mod recordservice {
    include!("recordservice.rs");  // Generated by prost
}

/// Filter records and apply transformations in bidirectional stream.
///
/// Pattern: Filter input based on metadata criteria, transform, return filtered output.
pub async fn handle_filter_stream(
    request: GrpcBidiStreamRequest,
) -> Result<GrpcBidiStreamResponse, Status> {
    let messages = request.messages;
    let metadata = request.metadata;

    // Parse filter criteria from metadata
    let filter_type = metadata
        .get("x-filter-type")
        .and_then(|v| v.to_str().ok())
        .unwrap_or("all");

    let min_value = metadata
        .get("x-min-value")
        .and_then(|v| v.to_str().ok())
        .and_then(|s| s.parse::<i32>().ok())
        .unwrap_or(0);

    // Step 1: Deserialize and filter
    let mut filtered_items = Vec::new();

    for msg in messages.iter() {
        let item = recordservice::Record::decode(msg.clone())
            .map_err(|e| Status::invalid_argument(format!("Failed to decode: {}", e)))?;

        // Apply filter logic
        if filter_type == "all" || (filter_type == "high-value" && item.value >= min_value) {
            filtered_items.push(item);
        }
    }

    // Step 2: Transform filtered items
    let mut output_messages = Vec::with_capacity(filtered_items.len());

    for item in filtered_items {
        let response = recordservice::ProcessedRecord {
            id: item.id,
            original_value: item.value,
            processed_value: (item.value as f64 * 1.1) as i32,  // Apply multiplier
            filtered: false,
        };

        let mut buf = Vec::new();
        response.encode(&mut buf)
            .map_err(|e| Status::internal(format!("Encoding error: {}", e)))?;
        output_messages.push(Bytes::from(buf));
    }

    // Step 3: Return response with statistics
    let mut response_metadata = MetadataMap::new();
    response_metadata.insert(
        "x-input-count",
        messages.len().to_string().parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );
    response_metadata.insert(
        "x-output-count",
        output_messages.len().to_string().parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );
    response_metadata.insert(
        "x-filtered-count",
        (messages.len() - output_messages.len()).to_string().parse()
            .map_err(|e| Status::internal(format!("Invalid metadata value: {}", e)))?
    );

    Ok(GrpcBidiStreamResponse {
        messages: output_messages,
        metadata: response_metadata,
    })
}

Key Patterns

Message Collection

  • All client messages are collected in a single messages: Vec<Bytes>
  • Messages are provided as Bytes objects (protobuf serialized)
  • No streaming iteration needed - full Vec is provided
  • Order of messages is preserved

Processing Strategy

  1. Collect: All input messages received as Vec
  2. Validate: Check all messages before processing (use ? operator)
  3. Transform: Process and generate output
  4. Serialize: Encode response messages as Bytes
  5. Return: Vec of response message Bytes wrapped in Result

Error Handling

  • Return Result<T, tonic::Status> for all handlers
  • Use ? operator for error propagation (NO .unwrap())
  • Map domain errors to appropriate Status codes:
  • Status::invalid_argument() for validation errors
  • Status::unauthenticated() for auth failures
  • Status::not_found() for missing resources
  • Status::internal() for processing errors
  • Use .map_err() to convert errors to Status
  • Per-message errors can be included in response messages (with ERROR status)

Metadata

  • Client streaming: Metadata passed in request, can be included in response
  • Bidirectional streaming: Metadata passed in request, can be included in response
  • Use metadata for non-payload information (timestamps, counts, filters)
  • Access with metadata.get(key).and_then(|v| v.to_str().ok())
  • Provide defaults with .unwrap_or() or .unwrap_or_default()

Limits and Constraints

  • MAX_STREAM_MESSAGES: 10,000 messages per stream
  • Resource Exhaustion: Streams exceeding limit return RESOURCE_EXHAUSTED status
  • Memory: All messages collected in memory - appropriate for moderate message counts
  • Atomicity: All messages processed together (transaction-like semantics)

Testing Client Streaming

#[cfg(test)]
mod tests {
    use super::*;
    use prost::Message;
    use bytes::Bytes;
    use tonic::metadata::MetadataMap;

    mod messageservice {
        include!("messageservice.rs");
    }

    #[tokio::test]
    async fn test_batch_create_success() {
        // Create multiple input messages
        let items = vec![
            messageservice::Item {
                name: "Item 1".to_string(),
                value: 100,
            },
            messageservice::Item {
                name: "Item 2".to_string(),
                value: 200,
            },
            messageservice::Item {
                name: "Item 3".to_string(),
                value: 300,
            },
        ];

        let messages: Vec<Bytes> = items
            .iter()
            .map(|item| {
                let mut buf = Vec::new();
                item.encode(&mut buf).expect("Failed to encode Item protobuf");
                Bytes::from(buf)
            })
            .collect();

        // Create request
        let mut metadata = MetadataMap::new();
        metadata.insert("authorization", "Bearer token".parse().expect("Failed to parse authorization header"));

        let request = GrpcClientStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "BatchCreate".to_string(),
            metadata,
            messages,
        };

        // Mock repository
        let repository = Arc::new(MockItemRepository::new());

        // Call handler
        let response = handle_batch_create(request, repository).await.expect("Handler call failed");

        // Verify response
        let result = messageservice::BatchCreateResponse::decode(response.payload)
            .expect("Failed to decode BatchCreateResponse");

        assert_eq!(result.success_count, 3);
        assert_eq!(result.total_value, 600);
        assert!(response.metadata.get("x-batch-id").is_some());
        assert_eq!(
            response.metadata.get("x-count").expect("Missing x-count header").to_str().expect("Invalid x-count header"),
            "3"
        );
    }

    #[tokio::test]
    async fn test_batch_create_missing_authorization() {
        // Create request without authorization
        let items = vec![messageservice::Item {
            name: "Item 1".to_string(),
            value: 100,
        }];

        let messages: Vec<Bytes> = items
            .iter()
            .map(|item| {
                let mut buf = Vec::new();
                item.encode(&mut buf).expect("Failed to encode Item protobuf");
                Bytes::from(buf)
            })
            .collect();

        let request = GrpcClientStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "BatchCreate".to_string(),
            metadata: MetadataMap::new(),  // No authorization
            messages,
        };

        let repository = Arc::new(MockItemRepository::new());

        // Should return Unauthenticated error
        let result = handle_batch_create(request, repository).await;
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert_eq!(err.code(), tonic::Code::Unauthenticated);
        assert!(err.message().contains("Authentication required"));
    }

    #[tokio::test]
    async fn test_batch_create_invalid_item() {
        // Create request with invalid item (missing value)
        let items = vec![messageservice::Item {
            name: "Item 1".to_string(),
            value: 0,  // Invalid: value is 0
        }];

        let messages: Vec<Bytes> = items
            .iter()
            .map(|item| {
                let mut buf = Vec::new();
                item.encode(&mut buf).expect("Failed to encode Item protobuf");
                Bytes::from(buf)
            })
            .collect();

        let mut metadata = MetadataMap::new();
        metadata.insert("authorization", "Bearer token".parse().expect("Failed to parse authorization header"));

        let request = GrpcClientStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "BatchCreate".to_string(),
            metadata,
            messages,
        };

        let repository = Arc::new(MockItemRepository::new());

        // Should return InvalidArgument error
        let result = handle_batch_create(request, repository).await;
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert_eq!(err.code(), tonic::Code::InvalidArgument);
        assert!(err.message().contains("must have name and value"));
    }

    #[tokio::test]
    async fn test_batch_create_malformed_message() {
        // Create request with malformed protobuf
        let messages = vec![Bytes::from("invalid protobuf data")];

        let mut metadata = MetadataMap::new();
        metadata.insert("authorization", "Bearer token".parse().expect("Failed to parse authorization header"));

        let request = GrpcClientStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "BatchCreate".to_string(),
            metadata,
            messages,
        };

        let repository = Arc::new(MockItemRepository::new());

        // Should return InvalidArgument error for decode error
        let result = handle_batch_create(request, repository).await;
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert_eq!(err.code(), tonic::Code::InvalidArgument);
        assert!(err.message().contains("Failed to decode message"));
    }

    // Mock implementation for testing
    struct MockItemRepository;

    impl MockItemRepository {
        fn new() -> Self {
            Self
        }
    }

    #[async_trait::async_trait]
    impl ItemRepository for MockItemRepository {
        async fn with_transaction<F, Fut, T>(&self, f: F) -> Result<T, Box<dyn std::error::Error>>
        where
            F: FnOnce(&dyn Transaction) -> Fut + Send,
            Fut: std::future::Future<Output = Result<T, Status>> + Send,
        {
            let tx = MockTransaction;
            f(&tx).await.map_err(|e| e.to_string().into())
        }
    }

    struct MockTransaction;

    impl Transaction for MockTransaction {
        fn items(&self) -> &dyn ItemStore {
            &MockItemStore
        }
    }

    struct MockItemStore;

    #[async_trait::async_trait]
    impl ItemStore for MockItemStore {
        async fn create(&self, _name: &str, _value: i32) -> Result<(), Box<dyn std::error::Error>> {
            Ok(())
        }
    }
}

Testing Bidirectional Streaming

#[cfg(test)]
mod bidi_tests {
    use super::*;
    use prost::Message;
    use bytes::Bytes;
    use tonic::metadata::MetadataMap;

    mod transformservice {
        include!("transformservice.rs");
    }

    #[tokio::test]
    async fn test_transform_stream_success() {
        // Create input messages
        let documents = vec![
            transformservice::Document {
                id: 1,
                content: "hello world".to_string(),
            },
            transformservice::Document {
                id: 2,
                content: "goodbye world".to_string(),
            },
        ];

        let messages: Vec<Bytes> = documents
            .iter()
            .map(|doc| {
                let mut buf = Vec::new();
                doc.encode(&mut buf).expect("Failed to encode Document protobuf");
                Bytes::from(buf)
            })
            .collect();

        // Create request
        let request = GrpcBidiStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "TransformStream".to_string(),
            metadata: MetadataMap::new(),
            messages,
        };

        // Call handler
        let response = handle_transform_stream(request).await.expect("Handler call failed");

        // Verify multiple response messages
        assert_eq!(response.messages.len(), 2);
        assert!(response.metadata.get("x-processed-count").is_some());
        assert_eq!(
            response.metadata.get("x-processed-count").expect("Missing x-processed-count header").to_str().expect("Invalid x-processed-count header"),
            "2"
        );

        // Verify each response
        for msg in response.messages {
            let result = transformservice::TransformResult::decode(msg).expect("Failed to decode TransformResult");
            assert_eq!(result.status, "SUCCESS");
            assert!(!result.transformed_content.is_empty());
            assert!(
                result.transformed_content.contains("HELLO") ||
                result.transformed_content.contains("GOODBYE")
            );
        }
    }

    #[tokio::test]
    async fn test_filter_stream_high_value() {
        mod recordservice {
            include!("recordservice.rs");
        }

        let records = vec![
            recordservice::Record { id: 1, value: 50 },
            recordservice::Record { id: 2, value: 150 },
            recordservice::Record { id: 3, value: 250 },
        ];

        let messages: Vec<Bytes> = records
            .iter()
            .map(|rec| {
                let mut buf = Vec::new();
                rec.encode(&mut buf).expect("Failed to encode Record protobuf");
                Bytes::from(buf)
            })
            .collect();

        let mut metadata = MetadataMap::new();
        metadata.insert("x-filter-type", "high-value".parse().expect("Failed to parse filter-type header"));
        metadata.insert("x-min-value", "100".parse().expect("Failed to parse min-value header"));

        let request = GrpcBidiStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "FilterStream".to_string(),
            metadata,
            messages,
        };

        let response = handle_filter_stream(request).await.expect("Handler call failed");

        // Only records with value >= 100 should be returned
        assert_eq!(response.messages.len(), 2);
        assert_eq!(
            response.metadata.get("x-output-count").expect("Missing x-output-count header").to_str().expect("Invalid x-output-count header"),
            "2"
        );
        assert_eq!(
            response.metadata.get("x-filtered-count").expect("Missing x-filtered-count header").to_str().expect("Invalid x-filtered-count header"),
            "1"
        );

        // Verify output values
        for msg in response.messages {
            let result = recordservice::ProcessedRecord::decode(msg).expect("Failed to decode ProcessedRecord");
            assert!(result.original_value >= 100);
        }
    }

    #[tokio::test]
    async fn test_filter_stream_malformed_input() {
        let messages = vec![Bytes::from("invalid protobuf")];

        let mut metadata = MetadataMap::new();
        metadata.insert("x-filter-type", "all".parse().expect("Failed to parse filter-type header"));

        let request = GrpcBidiStreamRequest {
            service_name: "myservice.MyService".to_string(),
            method_name: "FilterStream".to_string(),
            metadata,
            messages,
        };

        // Should return InvalidArgument error
        let result = handle_filter_stream(request).await;
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert_eq!(err.code(), tonic::Code::InvalidArgument);
        assert!(err.message().contains("Failed to decode"));
    }
}

Running Tests

# Run all streaming tests
cargo test --package myservice

# Run specific test
cargo test test_batch_create_success

# Run with output
cargo test -- --nocapture

# Run async tests only
cargo test --features tokio-test

Comparison with Other Patterns

Aspect Client Streaming Bidirectional Unary
Input Multiple messages Multiple messages Single message
Output Single response Multiple messages Single response
Use case Batch operations Stream processing Simple requests
Message order Important Important N/A
Atomicity Full batch atomic Per-message or batch Single atomic

Common Pitfalls

1. Using .unwrap() Instead of ? Operator

// WRONG: Using .unwrap() causes panics
let item = messageservice::Item::decode(msg).unwrap();

// CORRECT: Use ? operator with map_err
let item = messageservice::Item::decode(msg)
    .map_err(|e| Status::invalid_argument(format!("Decode failed: {}", e)))?;

2. Not Handling All Message Errors

// WRONG: Failing entire stream on first error
for msg in messages {
    let item = messageservice::Item::decode(msg)?;  // Stops on first error
}

// CORRECT: Handle per-message errors
for msg in messages {
    match messageservice::Item::decode(msg) {
        Ok(item) => { /* process */ },
        Err(e) => {
            // Generate error response for this message
            let error_result = messageservice::ItemResult {
                status: "ERROR".to_string(),
                error_message: e.to_string(),
                ..Default::default()
            };
            let mut buf = Vec::new();
            error_result.encode(&mut buf)?;
            output_messages.push(Bytes::from(buf));
        }
    }
}

3. Forgetting to Serialize Response Messages

// WRONG: Returning protobuf objects instead of Bytes
let response = messageservice::Item { name: "test".to_string(), value: 1 };
output_messages.push(response);  // Type error!

// CORRECT: Serialize to Bytes
let response = messageservice::Item { name: "test".to_string(), value: 1 };
let mut buf = Vec::new();
response.encode(&mut buf)?;
output_messages.push(Bytes::from(buf));

4. Blocking Operations in Async Context

// WRONG: Blocking operation in async handler
async fn handle_batch_create(request: GrpcClientStreamRequest) -> Result<GrpcResponse, Status> {
    std::thread::sleep(std::time::Duration::from_secs(1));  // Blocks entire executor!
    Ok(response)
}

// CORRECT: Use async/await
async fn handle_batch_create(request: GrpcClientStreamRequest) -> Result<GrpcResponse, Status> {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;  // Non-blocking
    Ok(response)
}

See the gRPC documentation for more examples.

Part 8: Best Practices

Project Structure

project/
+-- proto/
|   +-- user/v1/
|   |   +-- user.proto
|   |   +-- user_service.proto
|   +-- common/v1/
|       +-- types.proto
+-- generated/
    +-- python/
    +-- typescript/
    +-- ruby/

Schema Evolution

  1. Use packages with versioning: package company.user.v1;
  2. Reserve deleted field numbers: reserved 2, 15, 9 to 11;
  3. Document everything: Add comments to services and fields
  4. Design for evolution: Wrapper responses allow adding metadata later

Next Steps

Summary

You've learned proto3 syntax, type mapping across languages, code generation, handler implementation, error handling, and testing patterns. gRPC with Spikard gives you type-safe, high-performance APIs with cross-language support.