Custom Queue Handlers
Quickback lets you define custom queue handlers that integrate seamlessly with the generated queue consumer. This is useful for background processing like material extraction pipelines, batch jobs, and async workflows.
Overview
Custom queue handlers are defined in the services/queues/ directory using the defineQueue helper. The compiler extracts your handler logic and generates a unified queue consumer that dispatches messages based on their type.
quickback/
├── definitions/
│ ├── features/ # Data layer (CRUD, actions, security)
│ │ ├── materials/
│ │ └── claims/
│ └── services/ # Infrastructure layer
│ └── queues/
│ ├── ingest.ts # Material processing
│ └── claim-batches.ts # Batch processingDefining a Queue Handler
Use defineQueue to create a handler:
// services/queues/ingest.ts
import { defineQueue } from 'quickback';
import { eq } from 'drizzle-orm';
interface ProcessMaterialMessage {
type: 'process_material';
materialId: string;
organizationId: string;
}
export default defineQueue<ProcessMaterialMessage>({
name: 'ingest',
messageType: 'process_material',
description: 'Process materials through extraction pipeline',
execute: async ({ message, db, env, services, ack, retry }) => {
const { materialId, organizationId } = message;
// Dynamic import of schema
const { materials } = await import('./features/materials/schema');
// Get material from database
const [material] = await db
.select()
.from(materials)
.where(eq(materials.id, materialId))
.limit(1);
if (!material) {
console.error('[IngestQueue] Material not found:', materialId);
return { success: false, error: 'Material not found' };
}
// Process the material...
await db
.update(materials)
.set({ extractionStatus: 'processing' })
.where(eq(materials.id, materialId));
// Your processing logic here
return { success: true };
},
});Configuration Options
| Option | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Handler identifier (used in logs) |
messageType | string | Yes | Message type to match (e.g., 'process_material') |
description | string | No | Human-readable description |
execute | function | Yes | Handler function |
Execute Function
The execute function receives a context object:
execute: async ({ message, db, env, services, ack, retry }) => {
// message - The message payload (typed via generic)
// db - Drizzle database instance
// env - Cloudflare bindings (AI, queues, etc.)
// services - Generated services (ai, etc.)
// ack() - Acknowledge message (auto-called on success)
// retry() - Retry message (auto-called on failure)
return { success: true }; // or { success: false, error: 'reason' }
}Constants and Imports
The compiler extracts top-level constants and imports from your handler file and includes them in the generated queue consumer:
// services/queues/ingest.ts
import { eq } from 'drizzle-orm';
import { generateId } from '@news-assist/shared';
const TEXT_GENERATION_MODEL = '@cf/meta/llama-3.1-8b-instruct';
const EXTRACTION_PROMPT = `You are a news analyst. Extract factual claims from this article.
For each claim, output a JSON object with:
- content: The claim as a complete, standalone sentence
- claimType: One of NEWS, QUOTE, OFFICIAL, ALLEGATION, STATISTIC, FINDING
- urgency: 1-5 (5 = most urgent/breaking)
Output ONLY a JSON array of claims, no other text.`;
export default defineQueue({
// ... handler definition
});The compiler:
- Extracts all
importstatements - Extracts top-level
constdeclarations (including multi-line template literals) - Inlines them in the generated queue consumer
Dynamic Schema Imports
Since your schema files are generated by Quickback, use dynamic imports to reference them:
execute: async ({ message, db }) => {
// Dynamic import - path is relative to generated src/ directory
const { materials, claimExtractionBatches } = await import('./features/materials/schema');
const [material] = await db
.select()
.from(materials)
.where(eq(materials.id, message.materialId));
// ...
}Sending to Other Queues
Chain queue handlers by sending messages to other queues:
execute: async ({ message, db, env }) => {
const { materialId } = message;
// Process and create batch...
const batchId = generateId('batch');
// Send to next stage
if (env.CLAIM_BATCHES_QUEUE) {
await env.CLAIM_BATCHES_QUEUE.send({
type: 'claim_batch_stage',
batch_id: batchId,
material_id: materialId,
stage: 'subeditor',
});
}
return { success: true };
}Generated Output
When queue handlers are defined, the compiler generates a unified queue-consumer.ts:
// src/queue-consumer.ts (generated)
import { drizzle } from 'drizzle-orm/d1';
import { eq } from 'drizzle-orm';
import { generateId } from '@news-assist/shared';
import { createServices } from './lib/services';
const TEXT_GENERATION_MODEL = '@cf/meta/llama-3.1-8b-instruct';
const EXTRACTION_PROMPT = `...`;
// Embedding handler (if embeddings configured)
const embeddingQueueHandler = async (batch, env) => { ... };
// Custom handler: ingest
const ingestQueueHandler = async (batch, env) => {
const db = drizzle(env.DB);
const services = createServices(env);
for (const message of batch.messages) {
try {
const result = await executeHandler({
message: message.body,
env, db, services,
ack: () => message.ack(),
retry: () => message.retry(),
});
if (result.success) message.ack();
else message.retry();
} catch (error) {
message.retry();
}
}
};
// Unified dispatcher
export const queue = async (batch, env) => {
const messageType = batch.messages[0]?.body?.type;
switch (messageType) {
case 'embedding':
await embeddingQueueHandler(batch, env);
break;
case 'process_material':
await ingestQueueHandler(batch, env);
break;
default:
console.warn('[Queue] Unknown message type:', messageType);
for (const msg of batch.messages) msg.ack();
}
};wrangler.toml Configuration
Configure your queues in wrangler.toml:
# Queue producers (send to queues)
[[queues.producers]]
queue = "my-app-ingest-queue"
binding = "INGEST_QUEUE"
[[queues.producers]]
queue = "my-app-claim-batches-queue"
binding = "CLAIM_BATCHES_QUEUE"
# Queue consumers (receive from queues)
[[queues.consumers]]
queue = "my-app-ingest-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3
[[queues.consumers]]
queue = "my-app-claim-batches-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3Deployment
After defining queue handlers:
-
Create the queues:
wrangler queues create my-app-ingest-queue wrangler queues create my-app-claim-batches-queue -
Compile:
quickback compile -
Deploy:
wrangler deploy
Error Handling
The queue consumer handles errors automatically:
- Return
{ success: true }- Message is acknowledged - Return
{ success: false, error: '...' }- Message is retried - Throw an exception - Message is retried
After max_retries (default 3), failed messages are dropped or sent to a dead-letter queue if configured.
execute: async ({ message, db }) => {
try {
// Your processing logic
return { success: true };
} catch (error) {
console.error('[Handler] Failed:', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}Combining with Embeddings
Custom queue handlers work alongside automatic embeddings. The compiler generates a single queue consumer that handles both:
switch (messageType) {
case 'embedding':
// Auto-generated embedding handler
await embeddingQueueHandler(batch, env);
break;
case 'process_material':
// Your custom handler
await ingestQueueHandler(batch, env);
break;
}See Automatic Embeddings for configuring auto-embedding on CRUD operations.