Skip to content

Background Tasks

Offload non-critical work from request handlers to maintain fast response times and improve reliability.

Enqueue work

from spikard import Spikard
from celery import Celery
from msgspec import Struct
import redis

# celery_app.py
celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)

@celery_app.task(bind=True, max_retries=3)
def process_upload(self, file_id: int):
    try:
        # Heavy file processing
        file = get_file(file_id)
        result = perform_analysis(file)
        notify_completion(file_id, result)
        return {'status': 'completed', 'file_id': file_id}
    except Exception as e:
        # Exponential backoff retry
        raise self.retry(exc=e, countdown=2 ** self.request.retries)

@celery_app.task(max_retries=5)
def send_email(user_id: int, template: str, params: dict):
    user = get_user(user_id)
    email_service = EmailService(user.email)
    email_service.send_template(template, params)

# main.py
app = Spikard()

class User(Struct):
    id: int
    email: str
    name: str

@app.post("/upload")
async def upload_file(file_id: int) -> dict:
    task = process_upload.delay(file_id)
    return {"status": "processing", "task_id": task.id}

@app.post("/signup")
async def signup(user: User) -> User:
    # Save user first
    saved_user = save_user(user)
    send_email.delay(saved_user.id, "welcome", {"name": user.name})
    return saved_user
import { Spikard } from "spikard";
import Bull, { Queue, Job } from "bull";
import Redis from "ioredis";

// queues.ts
const redisConfig = {
  host: process.env.REDIS_HOST || "localhost",
  port: parseInt(process.env.REDIS_PORT || "6379"),
};

export const uploadQueue = new Queue("upload-processing", {
  redis: redisConfig,
});

export const emailQueue = new Queue("email-sending", {
  redis: redisConfig,
  defaultJobOptions: {
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 2000,
    },
  },
});

// Process upload jobs
uploadQueue.process(async (job: Job) => {
  const { fileId } = job.data;
  try {
    await job.progress(10);
    const file = await getFile(fileId);

    await job.progress(50);
    const result = await processFile(file);

    await job.progress(90);
    await notifyCompletion(fileId, result);

    await job.progress(100);
    return { status: "completed", fileId };
  } catch (error) {
    throw new Error(`Upload processing failed: ${error.message}`);
  }
});

// Process email jobs
emailQueue.process(async (job: Job) => {
  const { userId, template, params } = job.data;
  const user = await getUser(userId);
  const emailService = new EmailService(user.email);
  await emailService.sendTemplate(template, params);
});

// main.ts
const app = new Spikard();

interface User {
  id: number;
  email: string;
  name: string;
}

app.addRoute(
  { method: "POST", path: "/upload", handler_name: "upload", is_async: true },
  async (req) => {
    const { fileId } = req.json<{ fileId: number }>();
    const job = await uploadQueue.add({ fileId });
    return { status: "processing", jobId: job.id };
  }
);

app.addRoute(
  { method: "POST", path: "/signup", handler_name: "signup", is_async: true },
  async (req) => {
    const user = req.json<User>();
    const savedUser = await saveUser(user);
    await emailQueue.add({
      userId: savedUser.id,
      template: "welcome",
      params: { name: user.name }
    });
    return savedUser;
  }
);
# Gemfile
gem 'sidekiq'
gem 'redis'

# config/sidekiq.rb
require 'sidekiq'

Sidekiq.configure_server do |config|
  config.redis = { url: ENV['REDIS_URL'] || 'redis://localhost:6379/0' }
end

Sidekiq.configure_client do |config|
  config.redis = { url: ENV['REDIS_URL'] || 'redis://localhost:6379/0' }
end

# app/jobs/process_upload_job.rb
class ProcessUploadJob
  include Sidekiq::Job
  sidekiq_options queue: :default, retry: 3

  def perform(file_id)
    file = File.find(file_id)
    raise "File not found: #{file_id}" unless file

    # Process file asynchronously
    process_file(file)
    notify_completion(file)
  rescue StandardError => e
    logger.error("Failed to process file #{file_id}: #{e.message}")
    raise # Let Sidekiq handle retry
  end

  private

  def process_file(file)
    # Heavy processing work
    sleep 5 # Simulate long operation
  end

  def notify_completion(file)
    # Send notification
  end
end

# app/jobs/send_email_job.rb
class SendEmailJob
  include Sidekiq::Job
  sidekiq_options queue: :mailers, retry: 5

  def perform(user_id, template, params = {})
    user = User.find(user_id)
    mailer = EmailService.new(user.email)
    mailer.send_template(template, params)
  end
end

# In Spikard handler
require 'spikard'
require_relative 'jobs/process_upload_job'
require_relative 'jobs/send_email_job'

app = Spikard::App.new

app.post '/upload' do |params, _query, body|
  file_id = body['file_id']
  job = ProcessUploadJob.perform_async(file_id)

  { status: 'processing', job_id: job }
end

app.post '/signup' do |params, _query, body|
  user = User.create!(body)
  SendEmailJob.perform_async(user.id, 'welcome', { name: user.name })

  { id: user.id, email: user.email }
end
<?php

declare(strict_types=1);

use Spikard\App;
use Spikard\Attributes\Post;
use Spikard\Background\BackgroundTask;
use Spikard\Config\ServerConfig;
use Spikard\Http\Request;
use Spikard\Http\Response;

function sendWelcomeEmail(int $userId, string $email): void
{
    error_log("Sending welcome email to {$email} (user: {$userId})");
    // Simulate email sending
    sleep(2);
    error_log("Email sent to {$email}");
}

final class SignupController
{
    #[Post('/signup')]
    public function signup(Request $request): Response
    {
        $user = $request->body;
        $userId = random_int(1000, 9999);
        $email = $user['email'] ?? 'unknown@example.com';

        // Fire-and-forget background task
        BackgroundTask::run(function () use ($userId, $email): void {
            sendWelcomeEmail($userId, $email);
        });

        return Response::json([
            'id' => $userId,
            'email' => $email,
            'status' => 'created'
        ], 201);
    }
}

$app = (new App(new ServerConfig(port: 8000)))
    ->registerController(new SignupController());
use spikard::prelude::*;
use tokio::task;

app.route(post("/send-email"), |ctx: Context| async move {
    let email = ctx.body_as::<EmailRequest>().await?;

    // Spawn background task without waiting for completion
    task::spawn(async move {
        if let Err(e) = send_email_internal(&email).await {
            eprintln!("Email send failed: {}", e);
        }
    });

    Ok(Json(json!({
        "status": "queued",
        "message": "Email will be sent shortly"
    })))
})?;

async fn send_email_internal(email: &EmailRequest) -> Result<(), Box<dyn std::error::Error>> {
    // Simulate async email sending
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    println!("Email sent to: {}", email.to);
    Ok(())
}

#[derive(serde::Deserialize)]
struct EmailRequest {
    to: String,
    subject: String,
}

Error Recovery Patterns

Handle failures gracefully with retries and dead letter queues.

from celery import Task
from celery.exceptions import MaxRetriesExceededError

class ResilientTask(Task):
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 5}
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True

@celery_app.task(base=ResilientTask)
def process_with_recovery(data: dict):
    # Check idempotency
    if is_already_processed(data['id']):
        return {'status': 'already_processed'}

    try:
        result = external_api_call(data)
        mark_processed(data['id'])
        return result
    except MaxRetriesExceededError:
        # Send to dead letter queue
        send_to_dlq(data)
        raise
import { Job } from "bull";

const resilientQueue = new Queue("resilient-tasks", {
  redis: redisConfig,
  defaultJobOptions: {
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 2000,
    },
    removeOnComplete: 100,
    removeOnFail: false,
  },
});

resilientQueue.process(async (job: Job) => {
  const { id, data } = job.data;

  // Idempotency check
  if (await isAlreadyProcessed(id)) {
    return { status: "already_processed" };
  }

  try {
    const result = await externalApiCall(data);
    await markProcessed(id);
    return result;
  } catch (error) {
    if (job.attemptsMade >= job.opts.attempts) {
      // Send to dead letter queue
      await deadLetterQueue.add({ originalJob: job.data, error: error.message });
    }
    throw error;
  }
});
# app/jobs/resilient_job.rb
class ResilientJob
  include Sidekiq::Job
  sidekiq_options retry: 5, dead: true

  sidekiq_retries_exhausted do |msg, _ex|
    # Move to dead letter queue or log
    DeadLetterQueue.push(msg)
    logger.error("Job exhausted retries: #{msg['jid']}")
  end

  def perform(data)
    # Idempotent operation
    return if already_processed?(data['id'])

    process_with_retry(data)
    mark_processed(data['id'])
  end

  private

  def process_with_retry(data)
    attempt = 0
    max_attempts = 3

    begin
      external_api_call(data)
    rescue => e
      attempt += 1
      raise if attempt >= max_attempts
      sleep(2 ** attempt)
      retry
    end
  end
end
<?php

declare(strict_types=1);

use Spikard\Background\BackgroundTask;
use Spikard\Background\DeadLetterQueue;

// Error recovery with retry logic
final class ResilientTask
{
    private int $maxRetries = 5;
    private int $retryCount = 0;

    public function execute(array $data): array
    {
        // Idempotency check
        if ($this->isAlreadyProcessed($data['id'])) {
            return ['status' => 'already_processed'];
        }

        try {
            $result = $this->externalApiCall($data);
            $this->markProcessed($data['id']);
            return $result;
        } catch (\Exception $e) {
            if ($this->retryCount >= $this->maxRetries) {
                DeadLetterQueue::push($data, $e->getMessage());
            }
            throw $e;
        }
    }

    private function isAlreadyProcessed(string $id): bool
    {
        // Check if already processed
        return false;
    }

    private function markProcessed(string $id): void
    {
        // Mark as processed
    }

    private function externalApiCall(array $data): array
    {
        // Call external API
        return ['success' => true];
    }
}
use spikard::prelude::*;
use tokio::time::{sleep, Duration};

async fn retry_with_backoff<F, T>(mut f: F, max_retries: u32) -> Result<T, Box<dyn std::error::Error>>
where
    F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Box<dyn std::error::Error>>>>>,
{
    let mut retries = 0;
    loop {
        match f().await {
            Ok(result) => return Ok(result),
            Err(e) => {
                retries += 1;
                if retries >= max_retries {
                    return Err(e);
                }
                // Exponential backoff: 1s, 2s, 4s, 8s...
                let backoff = Duration::from_secs(2_u64.pow(retries - 1));
                eprintln!("Retry attempt {} after {:?}: {}", retries, backoff, e);
                sleep(backoff).await;
            }
        }
    }
}

app.route(post("/process"), |ctx: Context| async move {
    let data = ctx.body_as::<ProcessRequest>().await?;

    tokio::spawn(async move {
        if let Err(e) = retry_with_backoff(
            || {
                Box::pin(async {
                    process_data_internal(&data).await
                })
            },
            3,
        )
        .await
        {
            eprintln!("Processing failed after retries: {}", e);
        }
    });

    Ok(Json(json!({"status": "processing"})))
})?;

async fn process_data_internal(data: &ProcessRequest) -> Result<(), Box<dyn std::error::Error>> {
    // Simulate processing that may fail
    Ok(())
}

#[derive(serde::Deserialize)]
struct ProcessRequest {
    id: String,
}

Queue Monitoring

Monitor job health and status in production.

from celery.result import AsyncResult

@app.get("/health/jobs")
async def job_health():
    inspector = celery_app.control.inspect()
    active = inspector.active() or {}
    scheduled = inspector.scheduled() or {}

    return {
        "active_jobs": sum(len(tasks) for tasks in active.values()),
        "scheduled_jobs": sum(len(tasks) for tasks in scheduled.values()),
        "workers": len(active.keys()),
    }

@app.get("/jobs/{task_id}/status")
async def job_status(task_id: str):
    result = AsyncResult(task_id, app=celery_app)

    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
        "traceback": str(result.traceback) if result.failed() else None,
    }
app.addRoute(
  { method: "GET", path: "/health/jobs", handler_name: "job_health", is_async: true },
  async (req) => {
    const [uploadCounts, emailCounts] = await Promise.all([
      uploadQueue.getJobCounts(),
      emailQueue.getJobCounts(),
    ]);

    return {
      queues: {
        upload: uploadCounts,
        email: emailCounts,
      },
    };
  }
);

app.addRoute(
  { method: "GET", path: "/jobs/:jobId/status", handler_name: "job_status", is_async: true },
  async (req) => {
    const jobId = req.params.jobId;
    const job = await uploadQueue.getJob(jobId) || await emailQueue.getJob(jobId);

    if (!job) {
      return { error: "Job not found", jobId };
    }

    const state = await job.getState();
    const progress = job.progress();

    return {
      jobId: job.id,
      state,
      progress,
      data: job.data,
      finishedOn: job.finishedOn,
      failedReason: job.failedReason,
    };
  }
);
require 'spikard'
require 'sidekiq/api'

# Health check endpoint
app.get '/health/jobs' do
  stats = Sidekiq::Stats.new
  queues = Sidekiq::Queue.all.map do |queue|
    {
      name: queue.name,
      size: queue.size,
      latency: queue.latency
    }
  end

  {
    processed: stats.processed,
    failed: stats.failed,
    scheduled_size: stats.scheduled_size,
    retry_size: stats.retry_size,
    dead_size: stats.dead_size,
    queues: queues
  }
end

# Check specific job status
app.get '/jobs/:jid/status' do |params, _query, _body|
  jid = params['jid']

  # Check if job is still queued or processing
  status = if Sidekiq::Queue.new.find_job(jid)
    'queued'
  elsif Sidekiq::Workers.new.any? { |_, _, work| work['payload']['jid'] == jid }
    'processing'
  else
    'completed_or_failed'
  end

  { job_id: jid, status: status }
end
<?php

declare(strict_types=1);

use Spikard\App;
use Spikard\Attributes\Get;
use Spikard\Background\QueueStats;
use Spikard\Http\Request;
use Spikard\Http\Response;

final class JobHealthController
{
    #[Get('/health/jobs')]
    public function jobHealth(Request $request): Response
    {
        $stats = QueueStats::all();

        return Response::json([
            'active_jobs' => $stats->activeCount(),
            'scheduled_jobs' => $stats->scheduledCount(),
            'workers' => $stats->workerCount(),
        ]);
    }

    #[Get('/jobs/{taskId}/status')]
    public function jobStatus(Request $request): Response
    {
        $taskId = $request->pathParams['taskId'];
        $result = QueueStats::getTask($taskId);

        if ($result === null) {
            return Response::json(['error' => 'Job not found', 'taskId' => $taskId], 404);
        }

        return Response::json([
            'task_id' => $taskId,
            'status' => $result->status,
            'result' => $result->isReady() ? $result->result : null,
        ]);
    }
}
use spikard::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

struct TaskMonitor {
    active_tasks: Arc<AtomicU32>,
    completed_tasks: Arc<AtomicU32>,
}

impl TaskMonitor {
    fn new() -> Self {
        Self {
            active_tasks: Arc::new(AtomicU32::new(0)),
            completed_tasks: Arc::new(AtomicU32::new(0)),
        }
    }

    fn record_start(&self) {
        self.active_tasks.fetch_add(1, Ordering::SeqCst);
    }

    fn record_complete(&self) {
        self.active_tasks.fetch_sub(1, Ordering::SeqCst);
        self.completed_tasks.fetch_add(1, Ordering::SeqCst);
    }
}

let monitor = Arc::new(TaskMonitor::new());

app.route(get("/task-stats"), {
    let monitor = Arc::clone(&monitor);
    |_ctx: Context| async move {
        Ok(Json(json!({
            "active_tasks": monitor.active_tasks.load(Ordering::SeqCst),
            "completed_tasks": monitor.completed_tasks.load(Ordering::SeqCst),
        })))
    }
})?;

app.route(post("/process"), {
    let monitor = Arc::clone(&monitor);
    |ctx: Context| async move {
        let data = ctx.body_as::<ProcessRequest>().await?;

        let monitor_clone = Arc::clone(&monitor);
        tokio::spawn(async move {
            monitor_clone.record_start();
            if let Err(e) = process_data(&data).await {
                eprintln!("Processing failed: {}", e);
            }
            monitor_clone.record_complete();
        });

        Ok(Json(json!({"status": "processing"})))
    }
})?;

async fn process_data(data: &ProcessRequest) -> Result<(), Box<dyn std::error::Error>> {
    Ok(())
}

#[derive(serde::Deserialize)]
struct ProcessRequest {
    id: String,
}

Testing Background Jobs

Test asynchronous jobs reliably in your test suite.

import pytest
from celery import current_app
from unittest.mock import patch, MagicMock

# conftest.py
@pytest.fixture
def celery_config():
    return {
        'broker_url': 'memory://',
        'result_backend': 'cache+memory://',
    }

@pytest.fixture
def celery_worker_parameters():
    return {
        'perform_ping_check': False,
    }

# test_tasks.py
def test_process_upload_enqueued(celery_app, celery_worker):
    result = process_upload.delay(123)
    assert result.task_id is not None

def test_process_upload_success(celery_app, celery_worker):
    with patch('tasks.get_file') as mock_get_file:
        mock_file = MagicMock()
        mock_get_file.return_value = mock_file

        result = process_upload.apply(args=[123])

        assert result.successful()
        assert result.result['status'] == 'completed'
        mock_get_file.assert_called_once_with(123)

def test_process_upload_retry_on_failure(celery_app, celery_worker):
    with patch('tasks.get_file', side_effect=Exception("Temporary error")):
        result = process_upload.apply(args=[123])

        assert result.failed()
        # Check retry was attempted
        assert result.traceback is not None

@pytest.mark.asyncio
async def test_upload_endpoint_creates_task(client):
    response = await client.post("/upload", json={"file_id": 123})

    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "processing"
    assert "task_id" in data
import { Queue, Job } from "bull";
import { jest } from "@jest/globals";

describe("Upload Queue", () => {
  let testQueue: Queue;

  beforeEach(() => {
    testQueue = new Queue("test-upload", {
      redis: { host: "localhost", port: 6379, db: 1 },
    });
  });

  afterEach(async () => {
    await testQueue.close();
  });

  it("enqueues upload job with correct data", async () => {
    const job = await testQueue.add({ fileId: 123 });

    expect(job.id).toBeDefined();
    expect(job.data.fileId).toBe(123);
  });

  it("processes upload successfully", async () => {
    const getFile = jest.fn().mockResolvedValue({ id: 123, name: "test.txt" });
    const processFile = jest.fn().mockResolvedValue({ success: true });

    testQueue.process(async (job: Job) => {
      const file = await getFile(job.data.fileId);
      return await processFile(file);
    });

    const job = await testQueue.add({ fileId: 123 });
    await job.finished();

    expect(getFile).toHaveBeenCalledWith(123);
    expect(processFile).toHaveBeenCalled();
  });

  it("retries on failure", async () => {
    let attempts = 0;

    testQueue.process(async (job: Job) => {
      attempts++;
      if (attempts < 3) {
        throw new Error("Temporary failure");
      }
      return { success: true };
    });

    const job = await testQueue.add(
      { fileId: 123 },
      { attempts: 3, backoff: 100 }
    );

    const result = await job.finished();
    expect(attempts).toBe(3);
    expect(result.success).toBe(true);
  });

  it("moves to failed after max retries", async () => {
    testQueue.process(async (job: Job) => {
      throw new Error("Persistent failure");
    });

    const job = await testQueue.add(
      { fileId: 123 },
      { attempts: 2 }
    );

    await expect(job.finished()).rejects.toThrow("Persistent failure");
    const state = await job.getState();
    expect(state).toBe("failed");
  });
});
# test/jobs/process_upload_job_test.rb
require 'test_helper'
require 'sidekiq/testing'

class ProcessUploadJobTest < ActiveSupport::TestCase
  setup do
    Sidekiq::Testing.fake! # Queue jobs, don't process
  end

  test "enqueues job with correct arguments" do
    assert_equal 0, ProcessUploadJob.jobs.size

    ProcessUploadJob.perform_async(123)

    assert_equal 1, ProcessUploadJob.jobs.size
    assert_equal [123], ProcessUploadJob.jobs.last['args']
  end

  test "processes upload successfully" do
    Sidekiq::Testing.inline! do # Actually run the job
      file = files(:sample_file)

      assert_difference -> { ProcessedFile.count }, 1 do
        ProcessUploadJob.perform_async(file.id)
      end
    end
  end

  test "handles missing file gracefully" do
    Sidekiq::Testing.inline! do
      assert_raises(RuntimeError) do
        ProcessUploadJob.perform_async(999999)
      end
    end
  end

  test "retries on transient failures" do
    Sidekiq::Testing.inline! do
      file = files(:sample_file)

      # Simulate transient error
      FileProcessor.stub(:process, -> { raise "Temporary error" }) do
        assert_raises(RuntimeError) do
          ProcessUploadJob.perform_async(file.id)
        end
      end
    end
  end
end
<?php

declare(strict_types=1);

use PHPUnit\Framework\TestCase;
use Spikard\Background\BackgroundTask;
use Spikard\Testing\QueueFake;

final class BackgroundTaskTest extends TestCase
{
    protected function setUp(): void
    {
        QueueFake::fake(); // Don't actually process jobs
    }

    public function testEnqueuesJobWithCorrectArguments(): void
    {
        $this->assertCount(0, QueueFake::jobs());

        BackgroundTask::run(function (): void {
            sendEmail(123);
        });

        $this->assertCount(1, QueueFake::jobs());
    }

    public function testProcessesTaskSuccessfully(): void
    {
        QueueFake::processSync(); // Actually run jobs

        $result = BackgroundTask::run(function (): array {
            return ['status' => 'completed'];
        });

        $this->assertEquals('completed', $result['status']);
    }

    public function testHandlesFailureGracefully(): void
    {
        QueueFake::processSync();

        $this->expectException(\RuntimeException::class);

        BackgroundTask::run(function (): void {
            throw new \RuntimeException('Processing failed');
        });
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use tokio::time::{sleep, timeout, Duration};

    #[tokio::test]
    async fn test_background_task_completion() {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(1);

        tokio::spawn(async move {
            sleep(Duration::from_millis(100)).await;
            let _ = tx.send("task_complete".to_string()).await;
        });

        let result = timeout(Duration::from_secs(1), rx.recv())
            .await
            .expect("timeout")
            .expect("channel closed");

        assert_eq!(result, "task_complete");
    }

    #[tokio::test]
    async fn test_multiple_background_tasks() {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<u32>(10);

        for i in 0..5 {
            let tx_clone = tx.clone();
            tokio::spawn(async move {
                sleep(Duration::from_millis(50 * i)).await;
                let _ = tx_clone.send(i).await;
            });
        }
        drop(tx); // Close sender to allow recv to complete

        let mut results = Vec::new();
        while let Ok(value) = timeout(Duration::from_secs(1), rx.recv()).await {
            if let Some(v) = value {
                results.push(v);
            } else {
                break;
            }
        }

        assert_eq!(results.len(), 5);
    }

    #[tokio::test]
    async fn test_task_error_handling() {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<Result<String, String>>(1);

        tokio::spawn(async move {
            sleep(Duration::from_millis(50)).await;
            let _ = tx.send(Err("task_failed".to_string())).await;
        });

        let result = timeout(Duration::from_secs(1), rx.recv())
            .await
            .expect("timeout")
            .expect("channel closed");

        assert!(result.is_err());
        assert_eq!(result.err().unwrap(), "task_failed");
    }
}

Tips

  • Keep request handlers fast: Enqueue email, notifications, and ETL jobs instead of blocking responses.
  • Use durable queues: Prefer Redis, SQS, or RabbitMQ over in-process threads for production workloads.
  • Ensure idempotency: Jobs should be safe to retry. Check if work was already completed before processing.
  • Monitor queue depth: Alert when queues grow too large or latency increases.
  • Set appropriate timeouts: Prevent jobs from running indefinitely.
  • Use dead letter queues: Capture failed jobs for manual investigation.
  • Test both success and failure paths: Verify retry logic and error handling work as expected.