Quickback Docs

Custom Queue Handlers

Quickback lets you define custom queue handlers that integrate seamlessly with the generated queue consumer. This is useful for background processing like material extraction pipelines, batch jobs, and async workflows.

Overview

Custom queue handlers are defined in the services/queues/ directory using the defineQueue helper. The compiler extracts your handler logic and generates a unified queue consumer that dispatches messages based on their type.

quickback/
├── definitions/
│   ├── features/           # Data layer (CRUD, actions, security)
│   │   ├── materials/
│   │   └── claims/
│   └── services/           # Infrastructure layer
│       └── queues/
│           ├── ingest.ts          # Material processing
│           └── claim-batches.ts   # Batch processing

Defining a Queue Handler

Use defineQueue to create a handler:

// services/queues/ingest.ts
import { defineQueue } from 'quickback';
import { eq } from 'drizzle-orm';

interface ProcessMaterialMessage {
  type: 'process_material';
  materialId: string;
  organizationId: string;
}

export default defineQueue<ProcessMaterialMessage>({
  name: 'ingest',
  messageType: 'process_material',
  description: 'Process materials through extraction pipeline',

  execute: async ({ message, db, env, services, ack, retry }) => {
    const { materialId, organizationId } = message;

    // Dynamic import of schema
    const { materials } = await import('./features/materials/schema');

    // Get material from database
    const [material] = await db
      .select()
      .from(materials)
      .where(eq(materials.id, materialId))
      .limit(1);

    if (!material) {
      console.error('[IngestQueue] Material not found:', materialId);
      return { success: false, error: 'Material not found' };
    }

    // Process the material...
    await db
      .update(materials)
      .set({ extractionStatus: 'processing' })
      .where(eq(materials.id, materialId));

    // Your processing logic here

    return { success: true };
  },
});

Configuration Options

OptionTypeRequiredDescription
namestringYesHandler identifier (used in logs)
messageTypestringYesMessage type to match (e.g., 'process_material')
descriptionstringNoHuman-readable description
executefunctionYesHandler function

Execute Function

The execute function receives a context object:

execute: async ({ message, db, env, services, ack, retry }) => {
  // message - The message payload (typed via generic)
  // db - Drizzle database instance
  // env - Cloudflare bindings (AI, queues, etc.)
  // services - Generated services (ai, etc.)
  // ack() - Acknowledge message (auto-called on success)
  // retry() - Retry message (auto-called on failure)

  return { success: true }; // or { success: false, error: 'reason' }
}

Constants and Imports

The compiler extracts top-level constants and imports from your handler file and includes them in the generated queue consumer:

// services/queues/ingest.ts
import { eq } from 'drizzle-orm';
import { generateId } from '@news-assist/shared';

const TEXT_GENERATION_MODEL = '@cf/meta/llama-3.1-8b-instruct';

const EXTRACTION_PROMPT = `You are a news analyst. Extract factual claims from this article.

For each claim, output a JSON object with:
- content: The claim as a complete, standalone sentence
- claimType: One of NEWS, QUOTE, OFFICIAL, ALLEGATION, STATISTIC, FINDING
- urgency: 1-5 (5 = most urgent/breaking)

Output ONLY a JSON array of claims, no other text.`;

export default defineQueue({
  // ... handler definition
});

The compiler:

  1. Extracts all import statements
  2. Extracts top-level const declarations (including multi-line template literals)
  3. Inlines them in the generated queue consumer

Dynamic Schema Imports

Since your schema files are generated by Quickback, use dynamic imports to reference them:

execute: async ({ message, db }) => {
  // Dynamic import - path is relative to generated src/ directory
  const { materials, claimExtractionBatches } = await import('./features/materials/schema');

  const [material] = await db
    .select()
    .from(materials)
    .where(eq(materials.id, message.materialId));

  // ...
}

Sending to Other Queues

Chain queue handlers by sending messages to other queues:

execute: async ({ message, db, env }) => {
  const { materialId } = message;

  // Process and create batch...
  const batchId = generateId('batch');

  // Send to next stage
  if (env.CLAIM_BATCHES_QUEUE) {
    await env.CLAIM_BATCHES_QUEUE.send({
      type: 'claim_batch_stage',
      batch_id: batchId,
      material_id: materialId,
      stage: 'subeditor',
    });
  }

  return { success: true };
}

Generated Output

When queue handlers are defined, the compiler generates a unified queue-consumer.ts:

// src/queue-consumer.ts (generated)
import { drizzle } from 'drizzle-orm/d1';
import { eq } from 'drizzle-orm';
import { generateId } from '@news-assist/shared';
import { createServices } from './lib/services';

const TEXT_GENERATION_MODEL = '@cf/meta/llama-3.1-8b-instruct';
const EXTRACTION_PROMPT = `...`;

// Embedding handler (if embeddings configured)
const embeddingQueueHandler = async (batch, env) => { ... };

// Custom handler: ingest
const ingestQueueHandler = async (batch, env) => {
  const db = drizzle(env.DB);
  const services = createServices(env);

  for (const message of batch.messages) {
    try {
      const result = await executeHandler({
        message: message.body,
        env, db, services,
        ack: () => message.ack(),
        retry: () => message.retry(),
      });

      if (result.success) message.ack();
      else message.retry();
    } catch (error) {
      message.retry();
    }
  }
};

// Unified dispatcher
export const queue = async (batch, env) => {
  const messageType = batch.messages[0]?.body?.type;

  switch (messageType) {
    case 'embedding':
      await embeddingQueueHandler(batch, env);
      break;
    case 'process_material':
      await ingestQueueHandler(batch, env);
      break;
    default:
      console.warn('[Queue] Unknown message type:', messageType);
      for (const msg of batch.messages) msg.ack();
  }
};

wrangler.toml Configuration

Configure your queues in wrangler.toml:

# Queue producers (send to queues)
[[queues.producers]]
queue = "my-app-ingest-queue"
binding = "INGEST_QUEUE"

[[queues.producers]]
queue = "my-app-claim-batches-queue"
binding = "CLAIM_BATCHES_QUEUE"

# Queue consumers (receive from queues)
[[queues.consumers]]
queue = "my-app-ingest-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3

[[queues.consumers]]
queue = "my-app-claim-batches-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3

Deployment

After defining queue handlers:

  1. Create the queues:

    wrangler queues create my-app-ingest-queue
    wrangler queues create my-app-claim-batches-queue
  2. Compile:

    quickback compile
  3. Deploy:

    wrangler deploy

Error Handling

The queue consumer handles errors automatically:

  • Return { success: true } - Message is acknowledged
  • Return { success: false, error: '...' } - Message is retried
  • Throw an exception - Message is retried

After max_retries (default 3), failed messages are dropped or sent to a dead-letter queue if configured.

execute: async ({ message, db }) => {
  try {
    // Your processing logic
    return { success: true };
  } catch (error) {
    console.error('[Handler] Failed:', error);
    return {
      success: false,
      error: error instanceof Error ? error.message : 'Unknown error'
    };
  }
}

Combining with Embeddings

Custom queue handlers work alongside automatic embeddings. The compiler generates a single queue consumer that handles both:

switch (messageType) {
  case 'embedding':
    // Auto-generated embedding handler
    await embeddingQueueHandler(batch, env);
    break;
  case 'process_material':
    // Your custom handler
    await ingestQueueHandler(batch, env);
    break;
}

See Automatic Embeddings for configuring auto-embedding on CRUD operations.

On this page