Workers & Events
This document outlines the worker architecture and event-driven system that powers asynchronous processing throughout the Ivyi platform. Workers handle background tasks while events coordinate communication between different system components.
Overview
The Ivyi platform uses an event-driven architecture with PgBoss-powered workers to handle asynchronous operations. This system ensures reliable processing of background tasks, event coordination, and workflow management through a sophisticated event-driven design.
Worker Architecture
Core Components
Event Handlers
- Purpose: Define business logic for specific events
- Pattern: Static methods in event handler classes
- Integration: Use
processBossJobwrapper for logging and error handling - Queue Communication: Enqueue jobs in other queues for cross-domain coordination
Worker Processors
- Purpose: Listen to queues and process incoming jobs
- Pattern: Handler functions mapped to event types
- Initialization: Created using
createPgBossWorkerfactory - Error Handling: Centralized error logging and retry mechanisms
Event Types
- Configuration: Defined in
*.config.tsfiles - Type Safety: TypeScript constants for event names
- Mapping: Workers map event strings to handler functions
Event-Driven Architecture
Design Principles
Event-First Approach
- Loose Coupling: Components communicate through events rather than direct calls
- Scalability: Event-driven systems scale horizontally by adding more workers
- Resilience: Failed events can be retried without affecting the entire system
- Observability: All events are logged and traceable across the system
Domain Separation
- Feature Isolation: Each domain (users, occasions, gifts) has its own workers and events
- Clear Boundaries: Events define the contract between different domains
- Independent Scaling: Each domain can be scaled independently based on load
- Team Ownership: Different teams can own different domains without coordination overhead
Event Flow Patterns
1. Command Events
// Events that trigger actions
{
event: "occasion-created",
payload: { occasionId, userId },
metadata: { source: "occasion-controller" }
}
2. Domain Events
// Events that communicate state changes
{
event: "options-generated",
payload: { processId, optionsCount },
metadata: { step: "3", status: "OPTIONS_GENERATED" }
}
3. Workflow Events
// Events that coordinate multi-step processes
{
event: "decision-window-open",
payload: { processId, occasionId },
metadata: { workflow: "gifting", step: "4" }
}
Worker Implementation Patterns
Event Handler Pattern
export class DomainEventHandler {
static async handleEvent(params: EventParams) {
return processBossJob(
{ label: "Descriptive Action", entityId: params.id },
async () => {
// 1. Business logic execution
const result = await businessLogic(params);
// 2. Cross-domain communication (if needed)
await otherDomainQueue.enqueue({
event: "next-event",
payload: result,
});
// 3. State updates
await updateState(result);
},
{
metadata: {
// Context for debugging and monitoring
source: "domain-service",
step: "current-step",
event: "event-type",
},
},
);
}
}
Worker Processor Pattern
export const handlers: Record<string, (payload: any) => Promise<void>> = {
"event-type": async (payload) => {
await DomainEventHandler.handleEvent(payload);
},
"another-event": async (payload) => {
await AnotherEventHandler.handleEvent(payload);
},
};
export const initDomainWorker = () => {
return createPgBossWorker<JobPayload>(
boss,
PgBossQueueName.DomainQueue,
async (job) => {
const payload = job.data.payload;
const eventType = job.data.event || "default-event";
const handler = handlers[eventType];
if (handler) {
await handler(payload);
} else {
logger.warn(`Unknown event type: ${eventType}`);
}
},
);
};
Configuration Pattern
// Domain-specific event types
export const DomainEventTypes = {
EventCreated: "event-created",
EventUpdated: "event-updated",
EventProcessed: "event-processed",
} as const;
export type DomainEventType =
(typeof DomainEventTypes)[keyof typeof DomainEventTypes];
Cross-Domain Communication
Event Broadcasting
- Publisher-Subscriber: Domains publish events without knowing who subscribes
- Event Contracts: Event schemas define the communication contract
- Versioning: Events can be versioned to handle breaking changes
- Backwards Compatibility: Multiple event versions can coexist
Workflow Orchestration
- State Machines: Complex workflows implemented as state machines
- Event Chains: Events trigger other events in coordinated sequences
- Compensation: Failed workflows can be compensated with rollback events
- Sagas: Long-running transactions managed through event coordination
Data Consistency
- Eventual Consistency: System maintains eventual consistency across domains
- Event Sourcing: Critical events are stored for audit and replay
- Idempotency: Event handlers are designed to be safe for retries
- Dead Letter Queues: Unprocessable events are isolated for analysis