Quickback Docs

Realtime

Quickback can generate realtime notification helpers for broadcasting changes to connected clients. This enables live updates via WebSocket using Cloudflare Durable Objects.

Enabling Realtime

Add realtime configuration to individual table definitions:

// features/claims/claims.ts
import { sqliteTable, text } from "drizzle-orm/sqlite-core";
import { defineTable } from "@quickback/compiler";

export const claims = sqliteTable("claims", {
  id: text("id").primaryKey(),
  title: text("title").notNull(),
  status: text("status").notNull(),
  organizationId: text("organization_id").notNull(),
});

export default defineTable(claims, {
  firewall: { organization: {} },
  realtime: {
    enabled: true,           // Enable realtime for this table
    onInsert: true,          // Broadcast on INSERT (default: true)
    onUpdate: true,          // Broadcast on UPDATE (default: true)
    onDelete: true,          // Broadcast on DELETE (default: true)
    requiredRoles: ["member", "admin"],  // Who receives broadcasts
    fields: ["id", "title", "status"],   // Fields to include (optional)
  },
  // ... guards, crud
});

When any table has realtime enabled, the compiler generates src/lib/realtime.ts with helper functions for sending notifications.

Generated Helper

The createRealtime() factory provides convenient methods for both Postgres Changes (CRUD events) and custom broadcasts:

import { createRealtime } from '../lib/realtime';

export const execute: ActionExecutor = async ({ db, ctx, input }) => {
  const realtime = createRealtime(ctx.env);

  // After creating a record
  await realtime.insert('claims', newClaim, ctx.activeOrgId!);

  // After updating a record
  await realtime.update('claims', newClaim, oldClaim, ctx.activeOrgId!);

  // After deleting a record
  await realtime.delete('claims', { id: claimId }, ctx.activeOrgId!);

  return { success: true };
};

Event Format

Quickback broadcasts events in a structured JSON format for easy client-side handling.

Insert Event

await realtime.insert('materials', {
  id: 'mat_123',
  title: 'Breaking News',
  status: 'pending'
}, ctx.activeOrgId!);

Broadcasts:

{
  "type": "postgres_changes",
  "table": "materials",
  "eventType": "INSERT",
  "schema": "public",
  "new": {
    "id": "mat_123",
    "title": "Breaking News",
    "status": "pending"
  },
  "old": null
}

Update Event

await realtime.update('materials',
  { id: 'mat_123', status: 'completed' },  // new
  { id: 'mat_123', status: 'pending' },    // old
  ctx.activeOrgId!
);

Broadcasts:

{
  "type": "postgres_changes",
  "table": "materials",
  "eventType": "UPDATE",
  "schema": "public",
  "new": { "id": "mat_123", "status": "completed" },
  "old": { "id": "mat_123", "status": "pending" }
}

Delete Event

await realtime.delete('materials',
  { id: 'mat_123' },
  ctx.activeOrgId!
);

Broadcasts:

{
  "type": "postgres_changes",
  "table": "materials",
  "eventType": "DELETE",
  "schema": "public",
  "new": null,
  "old": { "id": "mat_123" }
}

Custom Broadcasts

For arbitrary events that don't map to CRUD operations:

await realtime.broadcast('processing-complete', {
  materialId: 'mat_123',
  claimCount: 5,
  duration: 1234
}, ctx.activeOrgId!);

Broadcasts:

{
  "type": "broadcast",
  "event": "processing-complete",
  "payload": {
    "materialId": "mat_123",
    "claimCount": 5,
    "duration": 1234
  }
}

User-Specific Broadcasts

Target a specific user instead of the entire organization:

// Only this user receives the notification
await realtime.insert('notifications', newNotification, ctx.activeOrgId!, {
  userId: ctx.userId
});

await realtime.broadcast('task-assigned', {
  taskId: 'task_123'
}, ctx.activeOrgId!, {
  userId: assigneeId
});

Role-Based Filtering

Limit which roles receive a broadcast using targetRoles:

// Only admins and editors receive this broadcast
await realtime.insert('admin-actions', action, ctx.activeOrgId!, {
  targetRoles: ['admin', 'editor']
});

// Members won't see this update
await realtime.update('sensitive-data', newRecord, oldRecord, ctx.activeOrgId!, {
  targetRoles: ['admin']
});

If targetRoles is not specified, all authenticated users in the organization receive the broadcast.

Per-Role Field Masking

Apply different field masking based on the subscriber's role using maskingConfig:

await realtime.insert('employees', newEmployee, ctx.activeOrgId!, {
  maskingConfig: {
    ssn: { type: 'ssn', show: { roles: ['admin', 'hr'] } },
    salary: { type: 'redact', show: { roles: ['admin'] } },
    email: { type: 'email', show: { roles: ['admin', 'hr', 'manager'] } },
  },
});

How masking works:

  • Each subscriber receives a payload masked according to their role
  • Roles in the show.roles array see unmasked values
  • All other roles see the masked version
  • Masking is pre-computed per-role (O(roles) not O(subscribers))

Available mask types:

TypeExample Output
emailj***@y*********.com
phone******4567
ssn*****6789
creditCard**** **** **** 1111
nameJ***
redact[REDACTED]

Owner-Based Masking

Show unmasked data to the record owner:

maskingConfig: {
  ssn: {
    type: 'ssn',
    show: { roles: ['admin'], or: 'owner' }  // Admin OR owner sees unmasked
  },
}

Client-Side Subscription

WebSocket Authentication

The Broadcaster supports two authentication methods:

Session Token (Browser/App):

const ws = new WebSocket('wss://api.yourdomain.com/realtime/v1/websocket');

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'auth',
    token: sessionToken,  // JWT from Better Auth session
    organizationId: activeOrgId,
  }));
};

ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.type === 'auth_success') {
    console.log('Authenticated:', {
      role: msg.role,
      roles: msg.roles,
      authMethod: msg.authMethod,  // 'session'
    });
  }
};

API Key (Server/CLI):

const ws = new WebSocket('wss://api.yourdomain.com/realtime/v1/websocket');

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'auth',
    token: apiKey,  // API key for machine-to-machine auth
    organizationId: orgId,
  }));
};

// authMethod will be 'api_key' on success

Handling Messages

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);

  // Handle CRUD events
  if (msg.type === 'postgres_changes') {
    const { table, eventType, new: newRecord, old: oldRecord } = msg;

    if (eventType === 'INSERT') {
      addRecord(table, newRecord);
    } else if (eventType === 'UPDATE') {
      updateRecord(table, newRecord);
    } else if (eventType === 'DELETE') {
      removeRecord(table, oldRecord.id);
    }
  }

  // Handle custom broadcasts
  if (msg.type === 'broadcast') {
    const { event, payload } = msg;

    if (event === 'processing-complete') {
      refreshMaterial(payload.materialId);
    } else if (event === 'task-assigned') {
      showTaskNotification(payload.taskId);
    }
  }
};

Required Environment Variables

VariableDescription
REALTIME_URLURL of the broadcast/realtime worker
ACCESS_TOKENInternal service-to-service auth token

Example configuration:

# wrangler.toml
[vars]
REALTIME_URL = "https://your-realtime-worker.workers.dev"
ACCESS_TOKEN = "your-internal-secret-token"

Architecture

┌─────────────┐     POST /broadcast      ┌──────────────────┐
│  API Worker │ ───────────────────────► │  Realtime Worker │
│  (Quickback)│                          │  (Durable Object)│
└─────────────┘                          └────────┬─────────┘

                                          WebSocket│

                                         ┌────────▼─────────┐
                                         │  Browser Clients │
                                         │   (WebSocket)    │
                                         └──────────────────┘
  1. API Worker - Your Quickback-generated API. Calls realtime.insert() etc. after CRUD operations.
  2. Realtime Worker - Separate worker with Durable Object for managing WebSocket connections.
  3. Browser Clients - Connect via WebSocket, subscribe to channels.

Best Practices

Broadcast After Commit

Always broadcast after the database operation succeeds:

// Good - broadcast after successful insert
const [newClaim] = await db.insert(claims).values(data).returning();
await realtime.insert('claims', newClaim, ctx.activeOrgId!);

// Bad - don't broadcast before confirming success
await realtime.insert('claims', data, ctx.activeOrgId!);
await db.insert(claims).values(data);  // Could fail!

Minimal Payloads

Only include necessary data in broadcasts:

// Good - minimal payload
await realtime.update('materials',
  { id: record.id, status: 'completed' },
  { id: record.id, status: 'pending' },
  ctx.activeOrgId!
);

// Avoid - sending entire record with large content
await realtime.update('materials', fullRecord, oldRecord, ctx.activeOrgId!);

Use Broadcasts for Complex Events

For events that don't map cleanly to CRUD:

// Processing pipeline completed
await realtime.broadcast('pipeline-complete', {
  materialId: material.id,
  stages: ['fetch', 'extract', 'analyze'],
  claimsCreated: 5,
  quotesExtracted: 3
}, ctx.activeOrgId!);

Custom Event Namespaces

For complex applications with many custom events, you can define typed event namespaces using defineRealtime. This generates strongly-typed helper methods for your custom events.

Defining Event Namespaces

Create a file in services/realtime/:

// services/realtime/extraction.ts
import { defineRealtime } from '@quickback/compiler';

export default defineRealtime({
  name: 'extraction',
  events: ['started', 'progress', 'completed', 'failed'],
  description: 'Material extraction pipeline events',
});

Generated Helper Methods

After compilation, the createRealtime() helper includes your custom namespace:

import { createRealtime } from '../lib/realtime';

export const execute: ActionExecutor = async ({ ctx, input }) => {
  const realtime = createRealtime(ctx.env);

  // Type-safe custom event methods
  await realtime.extraction.started({
    materialId: input.materialId,
  }, ctx.activeOrgId!);

  // Progress updates
  await realtime.extraction.progress({
    materialId: input.materialId,
    percent: 50,
    stage: 'extracting',
  }, ctx.activeOrgId!);

  // Completion
  await realtime.extraction.completed({
    materialId: input.materialId,
    claimsExtracted: 15,
    duration: 1234,
  }, ctx.activeOrgId!);

  return { success: true };
};

Event Format

Custom namespace events use the broadcast type with namespaced event names:

{
  "type": "broadcast",
  "event": "extraction:started",
  "payload": {
    "materialId": "mat_123"
  }
}

Multiple Namespaces

Define multiple namespaces for different parts of your application:

// services/realtime/notifications.ts
export default defineRealtime({
  name: 'notifications',
  events: ['new', 'read', 'dismissed'],
  description: 'User notification events',
});

// services/realtime/presence.ts
export default defineRealtime({
  name: 'presence',
  events: ['joined', 'left', 'typing', 'idle'],
  description: 'User presence events',
});

Usage:

const realtime = createRealtime(ctx.env);

// Notification events
await realtime.notifications.new({ ... }, ctx.activeOrgId!);

// Presence events
await realtime.presence.joined({ userId: ctx.userId }, ctx.activeOrgId!);

Namespace vs Generic Broadcast

Use CaseApproach
One-off custom eventrealtime.broadcast('event-name', payload, orgId)
Repeated event patternsdefineRealtime namespace
Type-safe eventsdefineRealtime namespace
Event discoverydefineRealtime (appears in generated types)

On this page