Realtime
Quickback can generate realtime notification helpers for broadcasting changes to connected clients. This enables live updates via WebSocket using Cloudflare Durable Objects.
Enabling Realtime
Add realtime configuration to individual table definitions:
// features/applications/applications.ts
import { sqliteTable, text } from "drizzle-orm/sqlite-core";
import { defineTable } from "@quickback/compiler";
export const applications = sqliteTable("applications", {
id: text("id").primaryKey(),
candidateId: text("candidate_id").notNull(),
jobId: text("job_id").notNull(),
stage: text("stage").notNull(),
organizationId: text("organization_id").notNull(),
});
export default defineTable(applications, {
firewall: { organization: {} },
realtime: {
enabled: true, // Enable realtime for this table
onInsert: true, // Broadcast on INSERT (default: true)
onUpdate: true, // Broadcast on UPDATE (default: true)
onDelete: true, // Broadcast on DELETE (default: true)
requiredRoles: ["recruiter", "hiring-manager"], // Who receives broadcasts
fields: ["id", "candidateId", "stage"], // Fields to include (optional)
},
// ... guards, crud
});When any table has realtime enabled, the compiler generates src/lib/realtime.ts with helper functions for sending notifications.
Generated Helper
The createRealtime() factory provides convenient methods for both Postgres Changes (CRUD events) and custom broadcasts:
import { createRealtime } from '../lib/realtime';
export const execute: ActionExecutor = async ({ db, ctx, input }) => {
const realtime = createRealtime(ctx.env);
// After creating a record
await realtime.insert('applications', newApplication, ctx.activeOrgId!);
// After updating a record
await realtime.update('applications', newApplication, oldApplication, ctx.activeOrgId!);
// After deleting a record
await realtime.delete('applications', { id: applicationId }, ctx.activeOrgId!);
return { success: true };
};Event Format
Quickback broadcasts events in a structured JSON format for easy client-side handling.
Insert Event
await realtime.insert('applications', {
id: 'app_123',
candidateId: 'cnd_456',
stage: 'applied'
}, ctx.activeOrgId!);Broadcasts:
{
"type": "postgres_changes",
"table": "applications",
"eventType": "INSERT",
"schema": "public",
"new": {
"id": "app_123",
"candidateId": "cnd_456",
"stage": "applied"
},
"old": null
}Update Event
await realtime.update('applications',
{ id: 'app_123', stage: 'interview' }, // new
{ id: 'app_123', stage: 'screening' }, // old
ctx.activeOrgId!
);Broadcasts:
{
"type": "postgres_changes",
"table": "applications",
"eventType": "UPDATE",
"schema": "public",
"new": { "id": "app_123", "stage": "interview" },
"old": { "id": "app_123", "stage": "screening" }
}Delete Event
await realtime.delete('applications',
{ id: 'app_123' },
ctx.activeOrgId!
);Broadcasts:
{
"type": "postgres_changes",
"table": "applications",
"eventType": "DELETE",
"schema": "public",
"new": null,
"old": { "id": "app_123" }
}Custom Broadcasts
For arbitrary events that don't map to CRUD operations:
await realtime.broadcast('screening-complete', {
applicationId: 'app_123',
candidateId: 'cnd_456',
stage: 'interview'
}, ctx.activeOrgId!);Broadcasts:
{
"type": "broadcast",
"event": "screening-complete",
"payload": {
"applicationId": "app_123",
"candidateId": "cnd_456",
"stage": "interview"
}
}User-Specific Broadcasts
Target a specific user instead of the entire organization:
// Only this user receives the notification
await realtime.insert('notifications', newNotification, ctx.activeOrgId!, {
userId: ctx.userId
});
// Notify a hiring manager of a new application
await realtime.broadcast('application-received', {
applicationId: 'app_123',
jobId: 'job_789'
}, ctx.activeOrgId!, {
userId: hiringManagerId
});Role-Based Filtering
Limit which roles receive a broadcast using targetRoles:
// Only hiring managers and recruiters receive this broadcast
await realtime.insert('applications', application, ctx.activeOrgId!, {
targetRoles: ['hiring-manager', 'recruiter']
});
// Interviewers only see applications assigned to them
await realtime.update('applications', newRecord, oldRecord, ctx.activeOrgId!, {
targetRoles: ['hiring-manager']
});If targetRoles is not specified, all authenticated users in the organization receive the broadcast.
Per-Role Field Masking
Apply different field masking based on the subscriber's role using maskingConfig:
await realtime.insert('candidates', newCandidate, ctx.activeOrgId!, {
maskingConfig: {
phone: { type: 'phone', show: { roles: ['owner', 'hiring-manager', 'recruiter'] } },
email: { type: 'email', show: { roles: ['owner', 'hiring-manager', 'recruiter'] } },
resumeUrl: { type: 'redact', show: { roles: ['owner', 'hiring-manager', 'recruiter'] } },
},
});How masking works:
- Each subscriber receives a payload masked according to their role
- Roles in the
show.rolesarray see unmasked values - All other roles see the masked version
- Masking is pre-computed per-role (O(roles) not O(subscribers))
Available mask types:
| Type | Example Output |
|---|---|
email | j***@y*********.com |
phone | ******4567 |
ssn | *****6789 |
creditCard | **** **** **** 1111 |
name | J*** |
redact | [REDACTED] |
Owner-Based Masking
Show unmasked data to the record owner:
maskingConfig: {
phone: {
type: 'phone',
show: { roles: ['hiring-manager'], or: 'owner' } // Hiring manager OR owner sees unmasked
},
}Client-Side Subscription
WebSocket Authentication
Quickback uses ticket-based authentication for WebSocket connections. The client first obtains a short-lived ticket from the main API, then connects with it as a URL parameter. The Broadcaster verifies the ticket cryptographically at upgrade time — no HTTP round-trip needed.
// 1. Get a ws-ticket from your API (requires session auth)
const response = await fetch('/realtime/v1/ws-ticket', {
method: 'POST',
headers: { Authorization: `Bearer ${sessionToken}` },
});
const { wsTicket } = await response.json();
// 2. Connect with ticket as URL param
const ws = new WebSocket(
`wss://api.yourdomain.com/realtime/v1/websocket?ws_ticket=${wsTicket}&organizationId=${activeOrgId}`
);
ws.onopen = () => {
console.log('Connected and authenticated!');
// No auth message needed — pre-authenticated at upgrade
};Tickets expire after 60 seconds, so fetch a fresh one before each connection or reconnection.
Handling Messages
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
// Handle CRUD events
if (msg.type === 'postgres_changes') {
const { table, eventType, new: newRecord, old: oldRecord } = msg;
if (eventType === 'INSERT') {
addRecord(table, newRecord);
} else if (eventType === 'UPDATE') {
updateRecord(table, newRecord);
} else if (eventType === 'DELETE') {
removeRecord(table, oldRecord.id);
}
}
// Handle custom broadcasts
if (msg.type === 'broadcast') {
const { event, payload } = msg;
if (event === 'screening-complete') {
refreshApplication(payload.applicationId);
} else if (event === 'interview-scheduled') {
showInterviewNotification(payload.applicationId);
}
}
};Required Environment Variables
| Variable | Where | Description |
|---|---|---|
REALTIME_URL | Main API | URL of the broadcast/realtime worker |
ACCESS_TOKEN | Both | Shared secret for broadcast API auth and ws-ticket signing |
The ACCESS_TOKEN serves double duty: it authenticates internal broadcast API calls from the main worker, and it signs/verifies WebSocket tickets. Set it as a secret on the broadcast worker:
cd cloudflare-workers/broadcast && wrangler secret put ACCESS_TOKENArchitecture
1. POST /ws-ticket
┌──────────────┐ ◄──────────────────────────────── ┌─────────────────┐
│ API Worker │ ────────────────────────────────► │ Browser Client │
│ (Quickback) │ { wsTicket, expiresIn: 60 } │ │
└──────┬───────┘ └───────┬─────────┘
│ │
│ POST /broadcast 2. WS ?ws_ticket=...│
│ │
▼ ▼
┌──────────────────┐ Verified at upgrade
│ Realtime Worker │ ◄──────── WebSocket ────── (no HTTP call)
│ (Durable Object) │
└──────────────────┘- Client gets a ticket —
POST /realtime/v1/ws-ticketon the main API (session-authenticated). Returns a 60-second HMAC-signed ticket. - Client connects — Opens WebSocket with
?ws_ticket=<ticket>. The Durable Object verifies cryptographically at upgrade. - API broadcasts — After CRUD ops, the main API calls
POST /broadcaston the realtime worker, which delivers to matching subscribers.
Best Practices
Broadcast After Commit
Always broadcast after the database operation succeeds:
// Good - broadcast after successful insert
const [newApp] = await db.insert(applications).values(data).returning();
await realtime.insert('applications', newApp, ctx.activeOrgId!);
// Bad - don't broadcast before confirming success
await realtime.insert('applications', data, ctx.activeOrgId!);
await db.insert(applications).values(data); // Could fail!Minimal Payloads
Only include necessary data in broadcasts:
// Good - minimal payload
await realtime.update('applications',
{ id: record.id, stage: 'interview' },
{ id: record.id, stage: 'screening' },
ctx.activeOrgId!
);
// Avoid - sending entire record with large content
await realtime.update('applications', fullRecord, oldRecord, ctx.activeOrgId!);Use Broadcasts for Complex Events
For events that don't map cleanly to CRUD:
// Hiring pipeline stage completed
await realtime.broadcast('pipeline-stage-complete', {
applicationId: application.id,
stages: ['applied', 'screening', 'interview'],
currentStage: 'offer',
candidateId: application.candidateId
}, ctx.activeOrgId!);Custom Event Namespaces
For complex applications with many custom events, you can define typed event namespaces using defineRealtime. This generates strongly-typed helper methods for your custom events.
Defining Event Namespaces
Create a file in services/realtime/:
// services/realtime/screening.ts
import { defineRealtime } from '@quickback/compiler';
export default defineRealtime({
name: 'screening',
events: ['started', 'progress', 'completed', 'failed'],
description: 'Candidate screening pipeline events',
});Generated Helper Methods
After compilation, the createRealtime() helper includes your custom namespace:
import { createRealtime } from '../lib/realtime';
export const execute: ActionExecutor = async ({ ctx, input }) => {
const realtime = createRealtime(ctx.env);
// Type-safe custom event methods
await realtime.screening.started({
applicationId: input.applicationId,
}, ctx.activeOrgId!);
// Progress updates
await realtime.screening.progress({
applicationId: input.applicationId,
percent: 50,
step: 'background-check',
}, ctx.activeOrgId!);
// Completion
await realtime.screening.completed({
applicationId: input.applicationId,
result: 'passed',
duration: 1234,
}, ctx.activeOrgId!);
return { success: true };
};Event Format
Custom namespace events use the broadcast type with namespaced event names:
{
"type": "broadcast",
"event": "screening:started",
"payload": {
"applicationId": "app_123"
}
}Multiple Namespaces
Define multiple namespaces for different parts of your application:
// services/realtime/notifications.ts
export default defineRealtime({
name: 'notifications',
events: ['interview-reminder', 'offer-update', 'application-received'],
description: 'Recruitment notification events',
});
// services/realtime/presence.ts
export default defineRealtime({
name: 'presence',
events: ['joined', 'left', 'reviewing', 'idle'],
description: 'Who is reviewing which candidate',
});Usage:
const realtime = createRealtime(ctx.env);
// Notification events
await realtime.notifications['interview-reminder']({ ... }, ctx.activeOrgId!);
// Presence events — who's reviewing which candidate
await realtime.presence.reviewing({ userId: ctx.userId, candidateId: 'cnd_456' }, ctx.activeOrgId!);Namespace vs Generic Broadcast
| Use Case | Approach |
|---|---|
| One-off custom event | realtime.broadcast('event-name', payload, orgId) |
| Repeated event patterns | defineRealtime namespace |
| Type-safe events | defineRealtime namespace |
| Event discovery | defineRealtime (appears in generated types) |