Queues

This document outlines the queue architecture and asynchronous processing system powered by PgBoss that handles background jobs and event-driven workflows.

Overview

The Ivyi platform uses PgBoss - a PostgreSQL-based job queue system - to handle asynchronous operations, event processing, and background workflows. This provides reliable, persistent job processing with database-level durability.

Queue Architecture

Core Components

PgBoss Configuration

Queue Types

The system manages multiple queues organized by business domains, each serving specific asynchronous processing needs while maintaining loose coupling between different parts of the system.

PgBoss Library Architecture

Core Utilities

Job Processing Wrapper

// pgboss.jobs.ts
export async function processBossJob<T>(
  context: BossJobContext,
  action: () => Promise<T>,
  options: BossEventOptions = {},
) {
  const { metadata } = options;
  const { label, entityId } = context;
  logger.info(metadata || {}, `Starting: ${label} | ID: ${entityId}`);
  try {
    const result = await action();
    logger.info(`✅ Finished: ${label}`);
    return result;
  } catch (error) {
    logger.error({ error }, `❌ Failed: ${label} | ID: ${entityId}`);
    throw error;
  }
}

Queue Factory Pattern

// pgboss.utils.ts
export const createPgBossQueue = <T extends object>(
  name: string,
  boss: PgBoss,
) => {
  return {
    enqueue: async (payload: T) => {
      const id = await boss.send(name, payload);
      return { id, data: payload };
    },
  };
};

Worker Factory Pattern

export const createPgBossWorker = <T>(
  boss: PgBoss,
  queueName: string,
  processor: (job: { id: string; data: T }) => Promise<void>,
) => {
  const start = async () => {
    await boss.work(queueName, async ([job]) => {
      try {
        await processor({
          id: job.id,
          data: job.data as T,
        });
      } catch (error) {
        logger.error(
          { error, jobId: job.id, queue: queueName },
          `Processing error`,
        );
        throw error;
      }
    });
  };
  return { start };
};

Initialization Pattern

// pgboss.workers.ts
export const initPgBossWorkers = async () => {
  try {
    // Workers are initialized and started automatically
    await Promise.all(workers.map((worker) => worker.start()));
    logger.info("👷 All PgBoss workers have been initialized.");
  } catch (error) {
    logger.error(error, "Failed to start workers");
  }
};

Queue Design Patterns

Queue Organization

Job Structure

// Standard job structure
{
  id: "unique-job-id",
  data: {
    event: "event-type",
    payload: { /* domain-specific data */ },
    metadata: {
      source: "service-name",
      timestamp: "2024-01-01T00:00:00Z",
      version: "1.0"
    }
  },
  createdOn: "2024-01-01T00:00:00Z",
  priority: 1
}

Queue Configuration

// Queue initialization pattern
export const initPgBoss = async () => {
  await boss.start();

  // Create all queues with appropriate configurations
  await Promise.all([
    boss.createQueue("domain-queue-name", {
      retryLimit: 3,
      retryBackoff: true,
      expireIn: 3600, // 1 hour
    }),
  ]);

  logger.info("🚀 Pg-boss started");
};

Job Processing Patterns

Event-Driven Processing

// Standard event processing pattern
export const handlers: Record<string, (payload: any) => Promise<void>> = {
  "event-type": async (payload) => {
    await processBossJob(
      { label: "Process Event", entityId: payload.id },
      async () => {
        // 1. Validate payload
        const validated = validatePayload(payload);

        // 2. Execute business logic
        const result = await businessLogic(validated);

        // 3. Emit follow-up events if needed
        await emitEvents(result);
      },
    );
  },
};

Worker Implementation

export const initDomainWorker = () => {
  return createPgBossWorker<JobPayload>(boss, "domain-queue", async (job) => {
    const payload = job.data;
    const eventType = payload.event || "default-event";

    const handler = handlers[eventType];
    if (handler) {
      await handler(payload);
    } else {
      logger.warn(`Unknown event type: ${eventType}`);
    }
  });
};

Queue Management

Job Lifecycle

  1. Enqueuing: Jobs are added to queues with specific payloads
  2. Scheduling: Jobs are scheduled based on priority and queue depth
  3. Processing: Workers pick up and process jobs
  4. Completion: Jobs are marked as completed or failed
  5. Cleanup: Completed jobs are archived or cleaned up

Retry Mechanisms