Background Tasks¶
Offload non-critical work from request handlers to maintain fast response times and improve reliability.
Enqueue work¶
from spikard import Spikard
from celery import Celery
from msgspec import Struct
import redis
# celery_app.py
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
)
@celery_app.task(bind=True, max_retries=3)
def process_upload(self, file_id: int):
try:
# Heavy file processing
file = get_file(file_id)
result = perform_analysis(file)
notify_completion(file_id, result)
return {'status': 'completed', 'file_id': file_id}
except Exception as e:
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries)
@celery_app.task(max_retries=5)
def send_email(user_id: int, template: str, params: dict):
user = get_user(user_id)
email_service = EmailService(user.email)
email_service.send_template(template, params)
# main.py
app = Spikard()
class User(Struct):
id: int
email: str
name: str
@app.post("/upload")
async def upload_file(file_id: int) -> dict:
task = process_upload.delay(file_id)
return {"status": "processing", "task_id": task.id}
@app.post("/signup")
async def signup(user: User) -> User:
# Save user first
saved_user = save_user(user)
send_email.delay(saved_user.id, "welcome", {"name": user.name})
return saved_user
import { Spikard } from "spikard";
import Bull, { Queue, Job } from "bull";
import Redis from "ioredis";
// queues.ts
const redisConfig = {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
};
export const uploadQueue = new Queue("upload-processing", {
redis: redisConfig,
});
export const emailQueue = new Queue("email-sending", {
redis: redisConfig,
defaultJobOptions: {
attempts: 5,
backoff: {
type: "exponential",
delay: 2000,
},
},
});
// Process upload jobs
uploadQueue.process(async (job: Job) => {
const { fileId } = job.data;
try {
await job.progress(10);
const file = await getFile(fileId);
await job.progress(50);
const result = await processFile(file);
await job.progress(90);
await notifyCompletion(fileId, result);
await job.progress(100);
return { status: "completed", fileId };
} catch (error) {
throw new Error(`Upload processing failed: ${error.message}`);
}
});
// Process email jobs
emailQueue.process(async (job: Job) => {
const { userId, template, params } = job.data;
const user = await getUser(userId);
const emailService = new EmailService(user.email);
await emailService.sendTemplate(template, params);
});
// main.ts
const app = new Spikard();
interface User {
id: number;
email: string;
name: string;
}
app.addRoute(
{ method: "POST", path: "/upload", handler_name: "upload", is_async: true },
async (req) => {
const { fileId } = req.json<{ fileId: number }>();
const job = await uploadQueue.add({ fileId });
return { status: "processing", jobId: job.id };
}
);
app.addRoute(
{ method: "POST", path: "/signup", handler_name: "signup", is_async: true },
async (req) => {
const user = req.json<User>();
const savedUser = await saveUser(user);
await emailQueue.add({
userId: savedUser.id,
template: "welcome",
params: { name: user.name }
});
return savedUser;
}
);
# Gemfile
gem 'sidekiq'
gem 'redis'
# config/sidekiq.rb
require 'sidekiq'
Sidekiq.configure_server do |config|
config.redis = { url: ENV['REDIS_URL'] || 'redis://localhost:6379/0' }
end
Sidekiq.configure_client do |config|
config.redis = { url: ENV['REDIS_URL'] || 'redis://localhost:6379/0' }
end
# app/jobs/process_upload_job.rb
class ProcessUploadJob
include Sidekiq::Job
sidekiq_options queue: :default, retry: 3
def perform(file_id)
file = File.find(file_id)
raise "File not found: #{file_id}" unless file
# Process file asynchronously
process_file(file)
notify_completion(file)
rescue StandardError => e
logger.error("Failed to process file #{file_id}: #{e.message}")
raise # Let Sidekiq handle retry
end
private
def process_file(file)
# Heavy processing work
sleep 5 # Simulate long operation
end
def notify_completion(file)
# Send notification
end
end
# app/jobs/send_email_job.rb
class SendEmailJob
include Sidekiq::Job
sidekiq_options queue: :mailers, retry: 5
def perform(user_id, template, params = {})
user = User.find(user_id)
mailer = EmailService.new(user.email)
mailer.send_template(template, params)
end
end
# In Spikard handler
require 'spikard'
require_relative 'jobs/process_upload_job'
require_relative 'jobs/send_email_job'
app = Spikard::App.new
app.post '/upload' do |params, _query, body|
file_id = body['file_id']
job = ProcessUploadJob.perform_async(file_id)
{ status: 'processing', job_id: job }
end
app.post '/signup' do |params, _query, body|
user = User.create!(body)
SendEmailJob.perform_async(user.id, 'welcome', { name: user.name })
{ id: user.id, email: user.email }
end
<?php
declare(strict_types=1);
use Spikard\App;
use Spikard\Attributes\Post;
use Spikard\Background\BackgroundTask;
use Spikard\Config\ServerConfig;
use Spikard\Http\Request;
use Spikard\Http\Response;
function sendWelcomeEmail(int $userId, string $email): void
{
error_log("Sending welcome email to {$email} (user: {$userId})");
// Simulate email sending
sleep(2);
error_log("Email sent to {$email}");
}
final class SignupController
{
#[Post('/signup')]
public function signup(Request $request): Response
{
$user = $request->body;
$userId = random_int(1000, 9999);
$email = $user['email'] ?? 'unknown@example.com';
// Fire-and-forget background task
BackgroundTask::run(function () use ($userId, $email): void {
sendWelcomeEmail($userId, $email);
});
return Response::json([
'id' => $userId,
'email' => $email,
'status' => 'created'
], 201);
}
}
$app = (new App(new ServerConfig(port: 8000)))
->registerController(new SignupController());
use spikard::prelude::*;
use tokio::task;
app.route(post("/send-email"), |ctx: Context| async move {
let email = ctx.body_as::<EmailRequest>().await?;
// Spawn background task without waiting for completion
task::spawn(async move {
if let Err(e) = send_email_internal(&email).await {
eprintln!("Email send failed: {}", e);
}
});
Ok(Json(json!({
"status": "queued",
"message": "Email will be sent shortly"
})))
})?;
async fn send_email_internal(email: &EmailRequest) -> Result<(), Box<dyn std::error::Error>> {
// Simulate async email sending
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("Email sent to: {}", email.to);
Ok(())
}
#[derive(serde::Deserialize)]
struct EmailRequest {
to: String,
subject: String,
}
Error Recovery Patterns¶
Handle failures gracefully with retries and dead letter queues.
from celery import Task
from celery.exceptions import MaxRetriesExceededError
class ResilientTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 5}
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
@celery_app.task(base=ResilientTask)
def process_with_recovery(data: dict):
# Check idempotency
if is_already_processed(data['id']):
return {'status': 'already_processed'}
try:
result = external_api_call(data)
mark_processed(data['id'])
return result
except MaxRetriesExceededError:
# Send to dead letter queue
send_to_dlq(data)
raise
import { Job } from "bull";
const resilientQueue = new Queue("resilient-tasks", {
redis: redisConfig,
defaultJobOptions: {
attempts: 5,
backoff: {
type: "exponential",
delay: 2000,
},
removeOnComplete: 100,
removeOnFail: false,
},
});
resilientQueue.process(async (job: Job) => {
const { id, data } = job.data;
// Idempotency check
if (await isAlreadyProcessed(id)) {
return { status: "already_processed" };
}
try {
const result = await externalApiCall(data);
await markProcessed(id);
return result;
} catch (error) {
if (job.attemptsMade >= job.opts.attempts) {
// Send to dead letter queue
await deadLetterQueue.add({ originalJob: job.data, error: error.message });
}
throw error;
}
});
# app/jobs/resilient_job.rb
class ResilientJob
include Sidekiq::Job
sidekiq_options retry: 5, dead: true
sidekiq_retries_exhausted do |msg, _ex|
# Move to dead letter queue or log
DeadLetterQueue.push(msg)
logger.error("Job exhausted retries: #{msg['jid']}")
end
def perform(data)
# Idempotent operation
return if already_processed?(data['id'])
process_with_retry(data)
mark_processed(data['id'])
end
private
def process_with_retry(data)
attempt = 0
max_attempts = 3
begin
external_api_call(data)
rescue => e
attempt += 1
raise if attempt >= max_attempts
sleep(2 ** attempt)
retry
end
end
end
<?php
declare(strict_types=1);
use Spikard\Background\BackgroundTask;
use Spikard\Background\DeadLetterQueue;
// Error recovery with retry logic
final class ResilientTask
{
private int $maxRetries = 5;
private int $retryCount = 0;
public function execute(array $data): array
{
// Idempotency check
if ($this->isAlreadyProcessed($data['id'])) {
return ['status' => 'already_processed'];
}
try {
$result = $this->externalApiCall($data);
$this->markProcessed($data['id']);
return $result;
} catch (\Exception $e) {
if ($this->retryCount >= $this->maxRetries) {
DeadLetterQueue::push($data, $e->getMessage());
}
throw $e;
}
}
private function isAlreadyProcessed(string $id): bool
{
// Check if already processed
return false;
}
private function markProcessed(string $id): void
{
// Mark as processed
}
private function externalApiCall(array $data): array
{
// Call external API
return ['success' => true];
}
}
use spikard::prelude::*;
use tokio::time::{sleep, Duration};
async fn retry_with_backoff<F, T>(mut f: F, max_retries: u32) -> Result<T, Box<dyn std::error::Error>>
where
F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Box<dyn std::error::Error>>>>>,
{
let mut retries = 0;
loop {
match f().await {
Ok(result) => return Ok(result),
Err(e) => {
retries += 1;
if retries >= max_retries {
return Err(e);
}
// Exponential backoff: 1s, 2s, 4s, 8s...
let backoff = Duration::from_secs(2_u64.pow(retries - 1));
eprintln!("Retry attempt {} after {:?}: {}", retries, backoff, e);
sleep(backoff).await;
}
}
}
}
app.route(post("/process"), |ctx: Context| async move {
let data = ctx.body_as::<ProcessRequest>().await?;
tokio::spawn(async move {
if let Err(e) = retry_with_backoff(
|| {
Box::pin(async {
process_data_internal(&data).await
})
},
3,
)
.await
{
eprintln!("Processing failed after retries: {}", e);
}
});
Ok(Json(json!({"status": "processing"})))
})?;
async fn process_data_internal(data: &ProcessRequest) -> Result<(), Box<dyn std::error::Error>> {
// Simulate processing that may fail
Ok(())
}
#[derive(serde::Deserialize)]
struct ProcessRequest {
id: String,
}
Queue Monitoring¶
Monitor job health and status in production.
from celery.result import AsyncResult
@app.get("/health/jobs")
async def job_health():
inspector = celery_app.control.inspect()
active = inspector.active() or {}
scheduled = inspector.scheduled() or {}
return {
"active_jobs": sum(len(tasks) for tasks in active.values()),
"scheduled_jobs": sum(len(tasks) for tasks in scheduled.values()),
"workers": len(active.keys()),
}
@app.get("/jobs/{task_id}/status")
async def job_status(task_id: str):
result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
"traceback": str(result.traceback) if result.failed() else None,
}
app.addRoute(
{ method: "GET", path: "/health/jobs", handler_name: "job_health", is_async: true },
async (req) => {
const [uploadCounts, emailCounts] = await Promise.all([
uploadQueue.getJobCounts(),
emailQueue.getJobCounts(),
]);
return {
queues: {
upload: uploadCounts,
email: emailCounts,
},
};
}
);
app.addRoute(
{ method: "GET", path: "/jobs/:jobId/status", handler_name: "job_status", is_async: true },
async (req) => {
const jobId = req.params.jobId;
const job = await uploadQueue.getJob(jobId) || await emailQueue.getJob(jobId);
if (!job) {
return { error: "Job not found", jobId };
}
const state = await job.getState();
const progress = job.progress();
return {
jobId: job.id,
state,
progress,
data: job.data,
finishedOn: job.finishedOn,
failedReason: job.failedReason,
};
}
);
require 'spikard'
require 'sidekiq/api'
# Health check endpoint
app.get '/health/jobs' do
stats = Sidekiq::Stats.new
queues = Sidekiq::Queue.all.map do |queue|
{
name: queue.name,
size: queue.size,
latency: queue.latency
}
end
{
processed: stats.processed,
failed: stats.failed,
scheduled_size: stats.scheduled_size,
retry_size: stats.retry_size,
dead_size: stats.dead_size,
queues: queues
}
end
# Check specific job status
app.get '/jobs/:jid/status' do |params, _query, _body|
jid = params['jid']
# Check if job is still queued or processing
status = if Sidekiq::Queue.new.find_job(jid)
'queued'
elsif Sidekiq::Workers.new.any? { |_, _, work| work['payload']['jid'] == jid }
'processing'
else
'completed_or_failed'
end
{ job_id: jid, status: status }
end
<?php
declare(strict_types=1);
use Spikard\App;
use Spikard\Attributes\Get;
use Spikard\Background\QueueStats;
use Spikard\Http\Request;
use Spikard\Http\Response;
final class JobHealthController
{
#[Get('/health/jobs')]
public function jobHealth(Request $request): Response
{
$stats = QueueStats::all();
return Response::json([
'active_jobs' => $stats->activeCount(),
'scheduled_jobs' => $stats->scheduledCount(),
'workers' => $stats->workerCount(),
]);
}
#[Get('/jobs/{taskId}/status')]
public function jobStatus(Request $request): Response
{
$taskId = $request->pathParams['taskId'];
$result = QueueStats::getTask($taskId);
if ($result === null) {
return Response::json(['error' => 'Job not found', 'taskId' => $taskId], 404);
}
return Response::json([
'task_id' => $taskId,
'status' => $result->status,
'result' => $result->isReady() ? $result->result : null,
]);
}
}
use spikard::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
struct TaskMonitor {
active_tasks: Arc<AtomicU32>,
completed_tasks: Arc<AtomicU32>,
}
impl TaskMonitor {
fn new() -> Self {
Self {
active_tasks: Arc::new(AtomicU32::new(0)),
completed_tasks: Arc::new(AtomicU32::new(0)),
}
}
fn record_start(&self) {
self.active_tasks.fetch_add(1, Ordering::SeqCst);
}
fn record_complete(&self) {
self.active_tasks.fetch_sub(1, Ordering::SeqCst);
self.completed_tasks.fetch_add(1, Ordering::SeqCst);
}
}
let monitor = Arc::new(TaskMonitor::new());
app.route(get("/task-stats"), {
let monitor = Arc::clone(&monitor);
|_ctx: Context| async move {
Ok(Json(json!({
"active_tasks": monitor.active_tasks.load(Ordering::SeqCst),
"completed_tasks": monitor.completed_tasks.load(Ordering::SeqCst),
})))
}
})?;
app.route(post("/process"), {
let monitor = Arc::clone(&monitor);
|ctx: Context| async move {
let data = ctx.body_as::<ProcessRequest>().await?;
let monitor_clone = Arc::clone(&monitor);
tokio::spawn(async move {
monitor_clone.record_start();
if let Err(e) = process_data(&data).await {
eprintln!("Processing failed: {}", e);
}
monitor_clone.record_complete();
});
Ok(Json(json!({"status": "processing"})))
}
})?;
async fn process_data(data: &ProcessRequest) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[derive(serde::Deserialize)]
struct ProcessRequest {
id: String,
}
Testing Background Jobs¶
Test asynchronous jobs reliably in your test suite.
import pytest
from celery import current_app
from unittest.mock import patch, MagicMock
# conftest.py
@pytest.fixture
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'cache+memory://',
}
@pytest.fixture
def celery_worker_parameters():
return {
'perform_ping_check': False,
}
# test_tasks.py
def test_process_upload_enqueued(celery_app, celery_worker):
result = process_upload.delay(123)
assert result.task_id is not None
def test_process_upload_success(celery_app, celery_worker):
with patch('tasks.get_file') as mock_get_file:
mock_file = MagicMock()
mock_get_file.return_value = mock_file
result = process_upload.apply(args=[123])
assert result.successful()
assert result.result['status'] == 'completed'
mock_get_file.assert_called_once_with(123)
def test_process_upload_retry_on_failure(celery_app, celery_worker):
with patch('tasks.get_file', side_effect=Exception("Temporary error")):
result = process_upload.apply(args=[123])
assert result.failed()
# Check retry was attempted
assert result.traceback is not None
@pytest.mark.asyncio
async def test_upload_endpoint_creates_task(client):
response = await client.post("/upload", json={"file_id": 123})
assert response.status_code == 200
data = response.json()
assert data["status"] == "processing"
assert "task_id" in data
import { Queue, Job } from "bull";
import { jest } from "@jest/globals";
describe("Upload Queue", () => {
let testQueue: Queue;
beforeEach(() => {
testQueue = new Queue("test-upload", {
redis: { host: "localhost", port: 6379, db: 1 },
});
});
afterEach(async () => {
await testQueue.close();
});
it("enqueues upload job with correct data", async () => {
const job = await testQueue.add({ fileId: 123 });
expect(job.id).toBeDefined();
expect(job.data.fileId).toBe(123);
});
it("processes upload successfully", async () => {
const getFile = jest.fn().mockResolvedValue({ id: 123, name: "test.txt" });
const processFile = jest.fn().mockResolvedValue({ success: true });
testQueue.process(async (job: Job) => {
const file = await getFile(job.data.fileId);
return await processFile(file);
});
const job = await testQueue.add({ fileId: 123 });
await job.finished();
expect(getFile).toHaveBeenCalledWith(123);
expect(processFile).toHaveBeenCalled();
});
it("retries on failure", async () => {
let attempts = 0;
testQueue.process(async (job: Job) => {
attempts++;
if (attempts < 3) {
throw new Error("Temporary failure");
}
return { success: true };
});
const job = await testQueue.add(
{ fileId: 123 },
{ attempts: 3, backoff: 100 }
);
const result = await job.finished();
expect(attempts).toBe(3);
expect(result.success).toBe(true);
});
it("moves to failed after max retries", async () => {
testQueue.process(async (job: Job) => {
throw new Error("Persistent failure");
});
const job = await testQueue.add(
{ fileId: 123 },
{ attempts: 2 }
);
await expect(job.finished()).rejects.toThrow("Persistent failure");
const state = await job.getState();
expect(state).toBe("failed");
});
});
# test/jobs/process_upload_job_test.rb
require 'test_helper'
require 'sidekiq/testing'
class ProcessUploadJobTest < ActiveSupport::TestCase
setup do
Sidekiq::Testing.fake! # Queue jobs, don't process
end
test "enqueues job with correct arguments" do
assert_equal 0, ProcessUploadJob.jobs.size
ProcessUploadJob.perform_async(123)
assert_equal 1, ProcessUploadJob.jobs.size
assert_equal [123], ProcessUploadJob.jobs.last['args']
end
test "processes upload successfully" do
Sidekiq::Testing.inline! do # Actually run the job
file = files(:sample_file)
assert_difference -> { ProcessedFile.count }, 1 do
ProcessUploadJob.perform_async(file.id)
end
end
end
test "handles missing file gracefully" do
Sidekiq::Testing.inline! do
assert_raises(RuntimeError) do
ProcessUploadJob.perform_async(999999)
end
end
end
test "retries on transient failures" do
Sidekiq::Testing.inline! do
file = files(:sample_file)
# Simulate transient error
FileProcessor.stub(:process, -> { raise "Temporary error" }) do
assert_raises(RuntimeError) do
ProcessUploadJob.perform_async(file.id)
end
end
end
end
end
<?php
declare(strict_types=1);
use PHPUnit\Framework\TestCase;
use Spikard\Background\BackgroundTask;
use Spikard\Testing\QueueFake;
final class BackgroundTaskTest extends TestCase
{
protected function setUp(): void
{
QueueFake::fake(); // Don't actually process jobs
}
public function testEnqueuesJobWithCorrectArguments(): void
{
$this->assertCount(0, QueueFake::jobs());
BackgroundTask::run(function (): void {
sendEmail(123);
});
$this->assertCount(1, QueueFake::jobs());
}
public function testProcessesTaskSuccessfully(): void
{
QueueFake::processSync(); // Actually run jobs
$result = BackgroundTask::run(function (): array {
return ['status' => 'completed'];
});
$this->assertEquals('completed', $result['status']);
}
public function testHandlesFailureGracefully(): void
{
QueueFake::processSync();
$this->expectException(\RuntimeException::class);
BackgroundTask::run(function (): void {
throw new \RuntimeException('Processing failed');
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{sleep, timeout, Duration};
#[tokio::test]
async fn test_background_task_completion() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(1);
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
let _ = tx.send("task_complete".to_string()).await;
});
let result = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert_eq!(result, "task_complete");
}
#[tokio::test]
async fn test_multiple_background_tasks() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<u32>(10);
for i in 0..5 {
let tx_clone = tx.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(50 * i)).await;
let _ = tx_clone.send(i).await;
});
}
drop(tx); // Close sender to allow recv to complete
let mut results = Vec::new();
while let Ok(value) = timeout(Duration::from_secs(1), rx.recv()).await {
if let Some(v) = value {
results.push(v);
} else {
break;
}
}
assert_eq!(results.len(), 5);
}
#[tokio::test]
async fn test_task_error_handling() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<Result<String, String>>(1);
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await;
let _ = tx.send(Err("task_failed".to_string())).await;
});
let result = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("timeout")
.expect("channel closed");
assert!(result.is_err());
assert_eq!(result.err().unwrap(), "task_failed");
}
}
Tips¶
- Keep request handlers fast: Enqueue email, notifications, and ETL jobs instead of blocking responses.
- Use durable queues: Prefer Redis, SQS, or RabbitMQ over in-process threads for production workloads.
- Ensure idempotency: Jobs should be safe to retry. Check if work was already completed before processing.
- Monitor queue depth: Alert when queues grow too large or latency increases.
- Set appropriate timeouts: Prevent jobs from running indefinitely.
- Use dead letter queues: Capture failed jobs for manual investigation.
- Test both success and failure paths: Verify retry logic and error handling work as expected.