Queues
This document outlines the queue architecture and asynchronous processing system powered by PgBoss that handles background jobs and event-driven workflows.
Overview
The Ivyi platform uses PgBoss - a PostgreSQL-based job queue system - to handle asynchronous operations, event processing, and background workflows. This provides reliable, persistent job processing with database-level durability.
Queue Architecture
Core Components
PgBoss Configuration
- Database: PostgreSQL with job persistence
- Connection: Single PgBoss instance managing multiple queues
- Error Handling: Centralized error logging and monitoring
- Startup: Automatic queue creation and worker initialization
Queue Types
The system manages multiple queues organized by business domains, each serving specific asynchronous processing needs while maintaining loose coupling between different parts of the system.
PgBoss Library Architecture
Core Utilities
Job Processing Wrapper
// pgboss.jobs.ts
export async function processBossJob<T>(
context: BossJobContext,
action: () => Promise<T>,
options: BossEventOptions = {},
) {
const { metadata } = options;
const { label, entityId } = context;
logger.info(metadata || {}, `Starting: ${label} | ID: ${entityId}`);
try {
const result = await action();
logger.info(`✅ Finished: ${label}`);
return result;
} catch (error) {
logger.error({ error }, `❌ Failed: ${label} | ID: ${entityId}`);
throw error;
}
}
Queue Factory Pattern
// pgboss.utils.ts
export const createPgBossQueue = <T extends object>(
name: string,
boss: PgBoss,
) => {
return {
enqueue: async (payload: T) => {
const id = await boss.send(name, payload);
return { id, data: payload };
},
};
};
Worker Factory Pattern
export const createPgBossWorker = <T>(
boss: PgBoss,
queueName: string,
processor: (job: { id: string; data: T }) => Promise<void>,
) => {
const start = async () => {
await boss.work(queueName, async ([job]) => {
try {
await processor({
id: job.id,
data: job.data as T,
});
} catch (error) {
logger.error(
{ error, jobId: job.id, queue: queueName },
`Processing error`,
);
throw error;
}
});
};
return { start };
};
Initialization Pattern
// pgboss.workers.ts
export const initPgBossWorkers = async () => {
try {
// Workers are initialized and started automatically
await Promise.all(workers.map((worker) => worker.start()));
logger.info("👷 All PgBoss workers have been initialized.");
} catch (error) {
logger.error(error, "Failed to start workers");
}
};
Queue Design Patterns
Queue Organization
- Domain-Based: Queues organized by business domains (users, occasions, gifts, etc.)
- Event Types: Each queue handles specific event types within its domain
- Priority Levels: Different queues can have different processing priorities
- Isolation: Each queue operates independently to prevent cross-contamination
Job Structure
// Standard job structure
{
id: "unique-job-id",
data: {
event: "event-type",
payload: { /* domain-specific data */ },
metadata: {
source: "service-name",
timestamp: "2024-01-01T00:00:00Z",
version: "1.0"
}
},
createdOn: "2024-01-01T00:00:00Z",
priority: 1
}
Queue Configuration
// Queue initialization pattern
export const initPgBoss = async () => {
await boss.start();
// Create all queues with appropriate configurations
await Promise.all([
boss.createQueue("domain-queue-name", {
retryLimit: 3,
retryBackoff: true,
expireIn: 3600, // 1 hour
}),
]);
logger.info("🚀 Pg-boss started");
};
Job Processing Patterns
Event-Driven Processing
// Standard event processing pattern
export const handlers: Record<string, (payload: any) => Promise<void>> = {
"event-type": async (payload) => {
await processBossJob(
{ label: "Process Event", entityId: payload.id },
async () => {
// 1. Validate payload
const validated = validatePayload(payload);
// 2. Execute business logic
const result = await businessLogic(validated);
// 3. Emit follow-up events if needed
await emitEvents(result);
},
);
},
};
Worker Implementation
export const initDomainWorker = () => {
return createPgBossWorker<JobPayload>(boss, "domain-queue", async (job) => {
const payload = job.data;
const eventType = payload.event || "default-event";
const handler = handlers[eventType];
if (handler) {
await handler(payload);
} else {
logger.warn(`Unknown event type: ${eventType}`);
}
});
};
Queue Management
Job Lifecycle
- Enqueuing: Jobs are added to queues with specific payloads
- Scheduling: Jobs are scheduled based on priority and queue depth
- Processing: Workers pick up and process jobs
- Completion: Jobs are marked as completed or failed
- Cleanup: Completed jobs are archived or cleaned up
Retry Mechanisms
- Automatic Retry: Failed jobs are retried with exponential backoff
- Retry Limits: Configurable maximum retry attempts per job
- Dead Letter Queue: Unrecoverable jobs are moved to error queues
- Manual Recovery: Failed jobs can be manually inspected and retried