Workflows
Workflows are the orchestration layer that coordinates multiple features to achieve complex business goals. Unlike features which are scoped to single domains, workflows are global processes that interact with several features to complete multi-step business operations.
📑 Table of Contents
- 📖 Overview
- 🏗️ Architecture
- 🔄 Workflow vs Features
- 🎯 Gifting Process Workflow
- 📊 State Management
- ⚡ Event-Driven Execution
- 👷 Worker Integration
- 🔧 Implementation Patterns
- 🚀 Best Practices
📖 Overview
Workflows in the Ivyi platform represent complex business processes that span multiple domains. They orchestrate the interaction between different features, manage state transitions, and coordinate asynchronous operations through an event-driven architecture powered by PgBoss queues and workers.
Key Characteristics
- Cross-Domain: Coordinate multiple features (users, gifts, relationships, classifier)
- State Management: Track workflow progress and enforce valid transitions
- Event-Driven: Use queues and workers for asynchronous processing
- Resilient: Built-in retry mechanisms and error handling
- Scalable: Can process multiple workflows concurrently
🏗️ Architecture
Workflow System Architecture
┌─────────────────────────────────────────────────────────┐
│ Workflow Orchestration │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ State │ │ Events │ │ Workers │ │
│ │ Management │ │ System │ │ Layer │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Feature Integration │ │
│ │ │ │
│ │ • Users • Gifts • Relationships • Classifier │ │
│ │ • Occasions • Decisions • Options │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Directory Structure
workflows/
└── gifting-process/
├── controllers/ # Workflow HTTP endpoints
├── gifting-process.schema.ts # Database schemas
├── gifting-process.types.ts # TypeScript types
├── gifting-process.config.ts # Configuration
├── gifting-process.states.ts # State machine definitions
├── operations/ # Workflow business logic
│ ├── gifting-process.create.ts
│ ├── gifting-process.find.ts
│ └── gifting-process.update.ts
└── workers/ # Background processing
├── gifting-process.queue.ts
├── gifting-process.workers.ts
└── gifting-process.events.ts
🔄 Workflow vs Features
Fundamental Differences
| Aspect | Features | Workflows |
|---|---|---|
| Scope | Single business domain | Cross-domain processes |
| Purpose | CRUD operations | Business process orchestration |
| State | Entity state | Workflow state |
| Dependencies | Self-contained | Multiple feature dependencies |
| Processing | Synchronous | Asynchronous (queues/workers) |
| Example | Create user, get gifts | Complete gifting process |
Interaction Patterns
Feature-to-Workflow Communication
// Features can trigger workflows
export const createOccasionController = asyncHandler(
async (req: Request, res: Response) => {
const occasion = await createOccasion(req.body);
// Trigger workflow
await giftingProcessQueue.enqueue({
event: "start-workflow",
payload: { occasionId: occasion.id },
metadata: { source: "occasion-controller" },
});
sendSuccessResponse(req, res, { data: occasion });
},
);
Workflow-to-Feature Communication
// Workflows coordinate multiple features
export const GiftProcessJobHandler = {
async startGiftingWorkflow(occasionId: string) {
// 1. Get occasion data
const occasion = await findOccasionById(occasionId);
// 2. Create workflow state
const process = await createProcess(occasionId, "OCCASION_DETECTED");
// 3. Coordinate features
const recipient = await findUserById(occasion.recipientId);
const relationship = await findRelationship(occasion.relationshipId);
// 4. Trigger next step
await giftingProcessQueue.enqueue({
event: "occasion-detected",
payload: { processId: process.id },
});
},
};
🎯 Gifting Process Workflow
Workflow Overview
The gifting process is the primary workflow in the Ivyi platform, orchestrating the complete journey from occasion detection to gift selection and delivery.
Workflow States
export const giftProcessTransitions: Record<
GiftProcessStatus,
GiftProcessStatus[]
> = {
CREATED: ["OCCASION_DETECTED"],
OCCASION_DETECTED: ["OPTIONS_GENERATED"],
OPTIONS_GENERATED: ["DECISION_WINDOW_OPEN"],
DECISION_WINDOW_OPEN: ["DEFAULT_SELECTED", "CONFIRMED"],
DEFAULT_SELECTED: ["CONFIRMED"],
CONFIRMED: ["ORDERED"],
ORDERED: ["SHIPPED"],
SHIPPED: ["COMPLETED"],
COMPLETED: [],
};
State Machine Visualization
┌─────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ CREATED │───▶│ OCCASION │───▶│ OPTIONS │───▶│ DECISION │
│ │ │ DETECTED │ │ GENERATED │ │ WINDOW │
└─────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ DEFAULT │ │ CONFIRMED │
│ SELECTED │ │ │
└─────────────┘ └─────────────┘
│ │
└─────────┬─────────┘
▼
┌─────────────┐
│ ORDERED │
│ │
└─────────────┘
│
▼
┌─────────────┐
│ SHIPPED │
│ │
└─────────────┘
│
▼
┌─────────────┐
│ COMPLETED │
│ │
└─────────────┘
Database Schema
export const giftProcessSchema = pgTable("gift_processes", {
id: uuid("id").primaryKey().defaultRandom(),
occasionId: text("occasion_id")
.references(() => occasionsSchema.id)
.notNull(),
status: giftProcessStatusEnum("status")
.default("OPTIONS_GENERATED")
.notNull(),
selectedGiftId: uuid("selected_gift_id").references(() => giftsSchema.id),
createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at").defaultNow().notNull(),
recipientId: uuid("recipient_id")
.references(() => usersSchema.id, { onDelete: "cascade" })
.notNull(),
currentStep: giftProcessStepEnum("current_step")
.default("INTENT_LEVEL_1")
.notNull(),
relationshipId: uuid("relationship_id")
.references(() => relationshipsSchema.id, { onDelete: "cascade" })
.notNull(),
intent: giftIntentEnum("intent"),
relationshipDepthOverride: giftRelationshipDepthEnum(
"relationship_depth_override",
),
signalIntensity: giftSignalIntensityEnum("signal_intensity"),
});
📊 State Management
State Transition Validation
export function canTransition(
current: GiftProcessStatus,
next: GiftProcessStatus,
): boolean {
return giftProcessTransitions[current].includes(next);
}
// Usage in operations
export const updateProcessStatus = async (
processId: string,
newStatus: GiftProcessStatus,
) => {
const currentProcess = await findProcessById(processId);
if (!canTransition(currentProcess.status, newStatus)) {
throw new Error(
`Invalid transition from ${currentProcess.status} to ${newStatus}`,
);
}
await db
.update(giftProcessSchema)
.set({ status: newStatus, updatedAt: new Date() })
.where(eq(giftProcessSchema.id, processId));
};
State Persistence
- Database State: Workflow state persisted in PostgreSQL
- Audit Trail: All state changes tracked with timestamps
- Rollback Support: Failed transitions can be rolled back
- Concurrent Safety: Database locks prevent race conditions
⚡ Event-Driven Execution
Event Types
export const GiftingProcessJobs = {
START_WORKFLOW: "start-workflow",
OCCASION_DETECTED: "occasion-detected",
OPTIONS_GENERATED: "options-generated",
DECISION_WINDOW_OPEN: "decision-window-open",
OPTION_CONFIRMED: "option-confirmed",
OPTION_SELECTED: "option-selected",
} as const;
Event Handlers
export const handlers: Record<
string,
(payload: GiftProcessJobPayload) => Promise<void>
> = {
"occasion-detected": async (payload) => {
await GiftProcessJobHandler.processDetected(payload?.processId || "");
},
"options-generated": async (payload) => {
await GiftProcessJobHandler.optionsGenerated(payload?.processId || "");
},
"decision-window-open": async (payload) => {
await GiftProcessJobHandler.decisionWindowOpen(payload?.processId || "");
},
"option-confirmed": async (payload) => {
await GiftProcessJobHandler.optionConfirmed(
payload?.processId || "",
payload?.giftId || "",
);
},
"start-workflow": async (payload) => {
await GiftProcessJobHandler.startGiftingWorkflow(payload?.occasionId || "");
},
};
Event Flow Example
export class GiftProcessJobHandler {
static async startGiftingWorkflow(occasionId: string) {
// 1. Create workflow instance
const process = await createProcess(occasionId, "OCCASION_DETECTED");
// 2. Get context data from features
const occasion = await findOccasionById(occasionId);
const recipient = await findUserById(occasion.recipientId);
const relationship = await findRelationship(occasion.relationshipId);
// 3. Update process with context
await updateProcess(process.id, {
recipientId: recipient.id,
relationshipId: relationship.id,
});
// 4. Trigger next step
await giftingProcessQueue.enqueue({
event: "occasion-detected",
payload: { processId: process.id },
metadata: { occasionId, step: "1" },
});
}
static async processDetected(processId: string) {
// 1. Get process context
const process = await findProcessById(processId);
// 2. Generate gift options using multiple features
const options = await generateGiftOptions({
recipientId: process.recipientId,
relationshipId: process.relationshipId,
occasionId: process.occasionId,
});
// 3. Update process state
await updateProcessStatus(processId, "OPTIONS_GENERATED");
// 4. Trigger next step
await giftingProcessQueue.enqueue({
event: "options-generated",
payload: { processId, optionsCount: options.length },
});
}
}
👷 Worker Integration
Queue Configuration
export const giftingProcessQueue = createPgBossQueue<GiftProcessJobPayload>(
PgBossQueueName.GiftingProcessQueue,
boss,
);
Worker Setup
export const initGiftingProcessWorker = () => {
return createPgBossWorker<GiftProcessJobPayload>(
boss,
PgBossQueueName.GiftingProcessQueue,
async (job) => {
const payload = job.data;
const eventType = job.data.event || "start-workflow";
const handler = handlers[eventType];
if (handler) {
await handler(payload);
} else {
logger.info(`Unknown event type: ${eventType}`);
}
},
);
};
Job Processing
// Standard job structure
interface GiftProcessJobPayload {
event: string;
processId?: string;
occasionId?: string;
giftId?: string;
metadata?: {
source?: string;
step?: string;
[key: string]: any;
};
}
🔧 Implementation Patterns
1. Workflow Creation
// operations/gifting-process.create.ts
export async function createProcess(
occasionId: string,
status?: GiftProcessStatus,
): Promise<any | null> {
const [process] = await db
.insert(giftProcessSchema)
.values({
occasionId,
status: status || "OCCASION_DETECTED",
})
.onConflictDoNothing() // Prevent double-processing
.returning();
return process || null;
}
2. Workflow Updates
// operations/gifting-process.update.ts
export async function updateProcess(
processId: string,
updates: Partial<GiftProcessUpdate>,
) {
const [updatedProcess] = await db
.update(giftProcessSchema)
.set({ ...updates, updatedAt: new Date() })
.where(eq(giftProcessSchema.id, processId))
.returning();
return updatedProcess;
}
export async function updateProcessStatus(
processId: string,
newStatus: GiftProcessStatus,
) {
const currentProcess = await findProcessById(processId);
if (!canTransition(currentProcess.status, newStatus)) {
throw new Error(
`Invalid transition from ${currentProcess.status} to ${newStatus}`,
);
}
return await updateProcess(processId, { status: newStatus });
}
3. Cross-Feature Coordination
// workers/gifting-process.events.ts
export class GiftProcessJobHandler {
static async generateGiftOptions(processId: string) {
const process = await findProcessById(processId);
// Coordinate multiple features
const [user, relationship, occasion] = await Promise.all([
findUserById(process.recipientId),
findRelationship(process.relationshipId),
findOccasionById(process.occasionId),
]);
// Use classifier feature for personalization
const preferences = await classifierApi.getUserPreferences(user.id);
// Use gifts feature for suggestions
const suggestions = await giftsApi.getGiftSuggestions({
preferences,
relationship,
occasion,
});
// Use decisions feature for ranking
const rankedOptions = await decisionsApi.rankGifts(suggestions);
return rankedOptions;
}
}
🚀 Best Practices
1. State Management
- Immutable Updates: Always create new state rather than mutating
- Transition Validation: Enforce valid state transitions
- Audit Logging: Log all state changes for debugging
- Rollback Support: Implement compensation for failed operations
2. Event Design
- Descriptive Names: Use clear, descriptive event names
- Consistent Payloads: Maintain consistent payload structures
- Metadata: Include context metadata for debugging
- Versioning: Version events when breaking changes occur
3. Error Handling
- Retry Logic: Implement exponential backoff for retries
- Dead Letter Queues: Isolate failed jobs for manual inspection
- Monitoring: Track workflow success/failure rates
- Alerting: Alert on critical workflow failures
4. Performance
- Batch Operations: Batch database operations where possible
- Async Processing: Use queues for long-running operations
- Caching: Cache frequently accessed workflow data
- Optimistic Locking: Use optimistic locking for concurrent updates
5. Testing
- Unit Tests: Test individual workflow steps
- Integration Tests: Test cross-feature coordination
- State Tests: Test state transition logic
- Event Tests: Test event handling and propagation
🔗 Related Documentation
- 🏗️ API Architecture - Complete system design
- 🚀 Queues & Jobs - PgBoss queue system
- 👷 Workers & Events - Background workers and events
- 🎮 Controllers Documentation - HTTP endpoint patterns
- 📋 Operations Documentation - Business logic operations