Realtime
Quickback can generate realtime notification helpers for broadcasting changes to connected clients. This enables live updates via WebSocket using Cloudflare Durable Objects.
Enabling Realtime
Add realtime configuration to individual table definitions:
// features/claims/claims.ts
import { sqliteTable, text } from "drizzle-orm/sqlite-core";
import { defineTable } from "@quickback/compiler";
export const claims = sqliteTable("claims", {
id: text("id").primaryKey(),
title: text("title").notNull(),
status: text("status").notNull(),
organizationId: text("organization_id").notNull(),
});
export default defineTable(claims, {
firewall: { organization: {} },
realtime: {
enabled: true, // Enable realtime for this table
onInsert: true, // Broadcast on INSERT (default: true)
onUpdate: true, // Broadcast on UPDATE (default: true)
onDelete: true, // Broadcast on DELETE (default: true)
requiredRoles: ["member", "admin"], // Who receives broadcasts
fields: ["id", "title", "status"], // Fields to include (optional)
},
// ... guards, crud
});When any table has realtime enabled, the compiler generates src/lib/realtime.ts with helper functions for sending notifications.
Generated Helper
The createRealtime() factory provides convenient methods for both Postgres Changes (CRUD events) and custom broadcasts:
import { createRealtime } from '../lib/realtime';
export const execute: ActionExecutor = async ({ db, ctx, input }) => {
const realtime = createRealtime(ctx.env);
// After creating a record
await realtime.insert('claims', newClaim, ctx.activeOrgId!);
// After updating a record
await realtime.update('claims', newClaim, oldClaim, ctx.activeOrgId!);
// After deleting a record
await realtime.delete('claims', { id: claimId }, ctx.activeOrgId!);
return { success: true };
};Event Format
Quickback broadcasts events in a structured JSON format for easy client-side handling.
Insert Event
await realtime.insert('materials', {
id: 'mat_123',
title: 'Breaking News',
status: 'pending'
}, ctx.activeOrgId!);Broadcasts:
{
"type": "postgres_changes",
"table": "materials",
"eventType": "INSERT",
"schema": "public",
"new": {
"id": "mat_123",
"title": "Breaking News",
"status": "pending"
},
"old": null
}Update Event
await realtime.update('materials',
{ id: 'mat_123', status: 'completed' }, // new
{ id: 'mat_123', status: 'pending' }, // old
ctx.activeOrgId!
);Broadcasts:
{
"type": "postgres_changes",
"table": "materials",
"eventType": "UPDATE",
"schema": "public",
"new": { "id": "mat_123", "status": "completed" },
"old": { "id": "mat_123", "status": "pending" }
}Delete Event
await realtime.delete('materials',
{ id: 'mat_123' },
ctx.activeOrgId!
);Broadcasts:
{
"type": "postgres_changes",
"table": "materials",
"eventType": "DELETE",
"schema": "public",
"new": null,
"old": { "id": "mat_123" }
}Custom Broadcasts
For arbitrary events that don't map to CRUD operations:
await realtime.broadcast('processing-complete', {
materialId: 'mat_123',
claimCount: 5,
duration: 1234
}, ctx.activeOrgId!);Broadcasts:
{
"type": "broadcast",
"event": "processing-complete",
"payload": {
"materialId": "mat_123",
"claimCount": 5,
"duration": 1234
}
}User-Specific Broadcasts
Target a specific user instead of the entire organization:
// Only this user receives the notification
await realtime.insert('notifications', newNotification, ctx.activeOrgId!, {
userId: ctx.userId
});
await realtime.broadcast('task-assigned', {
taskId: 'task_123'
}, ctx.activeOrgId!, {
userId: assigneeId
});Role-Based Filtering
Limit which roles receive a broadcast using targetRoles:
// Only admins and editors receive this broadcast
await realtime.insert('admin-actions', action, ctx.activeOrgId!, {
targetRoles: ['admin', 'editor']
});
// Members won't see this update
await realtime.update('sensitive-data', newRecord, oldRecord, ctx.activeOrgId!, {
targetRoles: ['admin']
});If targetRoles is not specified, all authenticated users in the organization receive the broadcast.
Per-Role Field Masking
Apply different field masking based on the subscriber's role using maskingConfig:
await realtime.insert('employees', newEmployee, ctx.activeOrgId!, {
maskingConfig: {
ssn: { type: 'ssn', show: { roles: ['admin', 'hr'] } },
salary: { type: 'redact', show: { roles: ['admin'] } },
email: { type: 'email', show: { roles: ['admin', 'hr', 'manager'] } },
},
});How masking works:
- Each subscriber receives a payload masked according to their role
- Roles in the
show.rolesarray see unmasked values - All other roles see the masked version
- Masking is pre-computed per-role (O(roles) not O(subscribers))
Available mask types:
| Type | Example Output |
|---|---|
email | j***@y*********.com |
phone | ******4567 |
ssn | *****6789 |
creditCard | **** **** **** 1111 |
name | J*** |
redact | [REDACTED] |
Owner-Based Masking
Show unmasked data to the record owner:
maskingConfig: {
ssn: {
type: 'ssn',
show: { roles: ['admin'], or: 'owner' } // Admin OR owner sees unmasked
},
}Client-Side Subscription
WebSocket Authentication
The Broadcaster supports two authentication methods:
Session Token (Browser/App):
const ws = new WebSocket('wss://api.yourdomain.com/realtime/v1/websocket');
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'auth',
token: sessionToken, // JWT from Better Auth session
organizationId: activeOrgId,
}));
};
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.type === 'auth_success') {
console.log('Authenticated:', {
role: msg.role,
roles: msg.roles,
authMethod: msg.authMethod, // 'session'
});
}
};API Key (Server/CLI):
const ws = new WebSocket('wss://api.yourdomain.com/realtime/v1/websocket');
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'auth',
token: apiKey, // API key for machine-to-machine auth
organizationId: orgId,
}));
};
// authMethod will be 'api_key' on successHandling Messages
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
// Handle CRUD events
if (msg.type === 'postgres_changes') {
const { table, eventType, new: newRecord, old: oldRecord } = msg;
if (eventType === 'INSERT') {
addRecord(table, newRecord);
} else if (eventType === 'UPDATE') {
updateRecord(table, newRecord);
} else if (eventType === 'DELETE') {
removeRecord(table, oldRecord.id);
}
}
// Handle custom broadcasts
if (msg.type === 'broadcast') {
const { event, payload } = msg;
if (event === 'processing-complete') {
refreshMaterial(payload.materialId);
} else if (event === 'task-assigned') {
showTaskNotification(payload.taskId);
}
}
};Required Environment Variables
| Variable | Description |
|---|---|
REALTIME_URL | URL of the broadcast/realtime worker |
ACCESS_TOKEN | Internal service-to-service auth token |
Example configuration:
# wrangler.toml
[vars]
REALTIME_URL = "https://your-realtime-worker.workers.dev"
ACCESS_TOKEN = "your-internal-secret-token"Architecture
┌─────────────┐ POST /broadcast ┌──────────────────┐
│ API Worker │ ───────────────────────► │ Realtime Worker │
│ (Quickback)│ │ (Durable Object)│
└─────────────┘ └────────┬─────────┘
│
WebSocket│
│
┌────────▼─────────┐
│ Browser Clients │
│ (WebSocket) │
└──────────────────┘- API Worker - Your Quickback-generated API. Calls
realtime.insert()etc. after CRUD operations. - Realtime Worker - Separate worker with Durable Object for managing WebSocket connections.
- Browser Clients - Connect via WebSocket, subscribe to channels.
Best Practices
Broadcast After Commit
Always broadcast after the database operation succeeds:
// Good - broadcast after successful insert
const [newClaim] = await db.insert(claims).values(data).returning();
await realtime.insert('claims', newClaim, ctx.activeOrgId!);
// Bad - don't broadcast before confirming success
await realtime.insert('claims', data, ctx.activeOrgId!);
await db.insert(claims).values(data); // Could fail!Minimal Payloads
Only include necessary data in broadcasts:
// Good - minimal payload
await realtime.update('materials',
{ id: record.id, status: 'completed' },
{ id: record.id, status: 'pending' },
ctx.activeOrgId!
);
// Avoid - sending entire record with large content
await realtime.update('materials', fullRecord, oldRecord, ctx.activeOrgId!);Use Broadcasts for Complex Events
For events that don't map cleanly to CRUD:
// Processing pipeline completed
await realtime.broadcast('pipeline-complete', {
materialId: material.id,
stages: ['fetch', 'extract', 'analyze'],
claimsCreated: 5,
quotesExtracted: 3
}, ctx.activeOrgId!);Custom Event Namespaces
For complex applications with many custom events, you can define typed event namespaces using defineRealtime. This generates strongly-typed helper methods for your custom events.
Defining Event Namespaces
Create a file in services/realtime/:
// services/realtime/extraction.ts
import { defineRealtime } from '@quickback/compiler';
export default defineRealtime({
name: 'extraction',
events: ['started', 'progress', 'completed', 'failed'],
description: 'Material extraction pipeline events',
});Generated Helper Methods
After compilation, the createRealtime() helper includes your custom namespace:
import { createRealtime } from '../lib/realtime';
export const execute: ActionExecutor = async ({ ctx, input }) => {
const realtime = createRealtime(ctx.env);
// Type-safe custom event methods
await realtime.extraction.started({
materialId: input.materialId,
}, ctx.activeOrgId!);
// Progress updates
await realtime.extraction.progress({
materialId: input.materialId,
percent: 50,
stage: 'extracting',
}, ctx.activeOrgId!);
// Completion
await realtime.extraction.completed({
materialId: input.materialId,
claimsExtracted: 15,
duration: 1234,
}, ctx.activeOrgId!);
return { success: true };
};Event Format
Custom namespace events use the broadcast type with namespaced event names:
{
"type": "broadcast",
"event": "extraction:started",
"payload": {
"materialId": "mat_123"
}
}Multiple Namespaces
Define multiple namespaces for different parts of your application:
// services/realtime/notifications.ts
export default defineRealtime({
name: 'notifications',
events: ['new', 'read', 'dismissed'],
description: 'User notification events',
});
// services/realtime/presence.ts
export default defineRealtime({
name: 'presence',
events: ['joined', 'left', 'typing', 'idle'],
description: 'User presence events',
});Usage:
const realtime = createRealtime(ctx.env);
// Notification events
await realtime.notifications.new({ ... }, ctx.activeOrgId!);
// Presence events
await realtime.presence.joined({ userId: ctx.userId }, ctx.activeOrgId!);Namespace vs Generic Broadcast
| Use Case | Approach |
|---|---|
| One-off custom event | realtime.broadcast('event-name', payload, orgId) |
| Repeated event patterns | defineRealtime namespace |
| Type-safe events | defineRealtime namespace |
| Event discovery | defineRealtime (appears in generated types) |