Workflows

Workflows are the orchestration layer that coordinates multiple features to achieve complex business goals. Unlike features which are scoped to single domains, workflows are global processes that interact with several features to complete multi-step business operations.

📑 Table of Contents

📖 Overview

Workflows in the Ivyi platform represent complex business processes that span multiple domains. They orchestrate the interaction between different features, manage state transitions, and coordinate asynchronous operations through an event-driven architecture powered by PgBoss queues and workers.

Key Characteristics

🏗️ Architecture

Workflow System Architecture

┌─────────────────────────────────────────────────────────┐
│                 Workflow Orchestration                   │
│                                                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │   State     │  │   Events    │  │   Workers    │     │
│  │ Management  │  │   System    │  │   Layer      │     │
│  └─────────────┘  └─────────────┘  └─────────────┘     │
│         │                 │                 │         │
│         ▼                 ▼                 ▼         │
│  ┌─────────────────────────────────────────────────┐ │
│  │              Feature Integration                 │ │
│  │                                                 │ │
│  │ • Users • Gifts • Relationships • Classifier   │ │
│  │ • Occasions • Decisions • Options              │ │
│  └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

Directory Structure

workflows/
└── gifting-process/
    ├── controllers/           # Workflow HTTP endpoints
    ├── gifting-process.schema.ts  # Database schemas
    ├── gifting-process.types.ts   # TypeScript types
    ├── gifting-process.config.ts  # Configuration
    ├── gifting-process.states.ts  # State machine definitions
    ├── operations/           # Workflow business logic
    │   ├── gifting-process.create.ts
    │   ├── gifting-process.find.ts
    │   └── gifting-process.update.ts
    └── workers/              # Background processing
        ├── gifting-process.queue.ts
        ├── gifting-process.workers.ts
        └── gifting-process.events.ts

🔄 Workflow vs Features

Fundamental Differences

Aspect Features Workflows
Scope Single business domain Cross-domain processes
Purpose CRUD operations Business process orchestration
State Entity state Workflow state
Dependencies Self-contained Multiple feature dependencies
Processing Synchronous Asynchronous (queues/workers)
Example Create user, get gifts Complete gifting process

Interaction Patterns

Feature-to-Workflow Communication

// Features can trigger workflows
export const createOccasionController = asyncHandler(
  async (req: Request, res: Response) => {
    const occasion = await createOccasion(req.body);

    // Trigger workflow
    await giftingProcessQueue.enqueue({
      event: "start-workflow",
      payload: { occasionId: occasion.id },
      metadata: { source: "occasion-controller" },
    });

    sendSuccessResponse(req, res, { data: occasion });
  },
);

Workflow-to-Feature Communication

// Workflows coordinate multiple features
export const GiftProcessJobHandler = {
  async startGiftingWorkflow(occasionId: string) {
    // 1. Get occasion data
    const occasion = await findOccasionById(occasionId);

    // 2. Create workflow state
    const process = await createProcess(occasionId, "OCCASION_DETECTED");

    // 3. Coordinate features
    const recipient = await findUserById(occasion.recipientId);
    const relationship = await findRelationship(occasion.relationshipId);

    // 4. Trigger next step
    await giftingProcessQueue.enqueue({
      event: "occasion-detected",
      payload: { processId: process.id },
    });
  },
};

🎯 Gifting Process Workflow

Workflow Overview

The gifting process is the primary workflow in the Ivyi platform, orchestrating the complete journey from occasion detection to gift selection and delivery.

Workflow States

export const giftProcessTransitions: Record<
  GiftProcessStatus,
  GiftProcessStatus[]
> = {
  CREATED: ["OCCASION_DETECTED"],
  OCCASION_DETECTED: ["OPTIONS_GENERATED"],
  OPTIONS_GENERATED: ["DECISION_WINDOW_OPEN"],
  DECISION_WINDOW_OPEN: ["DEFAULT_SELECTED", "CONFIRMED"],
  DEFAULT_SELECTED: ["CONFIRMED"],
  CONFIRMED: ["ORDERED"],
  ORDERED: ["SHIPPED"],
  SHIPPED: ["COMPLETED"],
  COMPLETED: [],
};

State Machine Visualization

┌─────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ CREATED │───▶│ OCCASION    │───▶│ OPTIONS     │───▶│ DECISION    │
│         │    │ DETECTED    │    │ GENERATED   │    │ WINDOW      │
└─────────┘    └─────────────┘    └─────────────┘    └─────────────┘
                                           │                   │
                                           ▼                   ▼
                                    ┌─────────────┐    ┌─────────────┐
                                    │ DEFAULT      │    │ CONFIRMED    │
                                    │ SELECTED     │    │             │
                                    └─────────────┘    └─────────────┘
                                           │                   │
                                           └─────────┬─────────┘
                                                     ▼
                                            ┌─────────────┐
                                            │ ORDERED     │
                                            │             │
                                            └─────────────┘
                                                     │
                                                     ▼
                                            ┌─────────────┐
                                            │ SHIPPED     │
                                            │             │
                                            └─────────────┘
                                                     │
                                                     ▼
                                            ┌─────────────┐
                                            │ COMPLETED   │
                                            │             │
                                            └─────────────┘

Database Schema

export const giftProcessSchema = pgTable("gift_processes", {
  id: uuid("id").primaryKey().defaultRandom(),
  occasionId: text("occasion_id")
    .references(() => occasionsSchema.id)
    .notNull(),
  status: giftProcessStatusEnum("status")
    .default("OPTIONS_GENERATED")
    .notNull(),
  selectedGiftId: uuid("selected_gift_id").references(() => giftsSchema.id),
  createdAt: timestamp("created_at").defaultNow().notNull(),
  updatedAt: timestamp("updated_at").defaultNow().notNull(),
  recipientId: uuid("recipient_id")
    .references(() => usersSchema.id, { onDelete: "cascade" })
    .notNull(),
  currentStep: giftProcessStepEnum("current_step")
    .default("INTENT_LEVEL_1")
    .notNull(),
  relationshipId: uuid("relationship_id")
    .references(() => relationshipsSchema.id, { onDelete: "cascade" })
    .notNull(),
  intent: giftIntentEnum("intent"),
  relationshipDepthOverride: giftRelationshipDepthEnum(
    "relationship_depth_override",
  ),
  signalIntensity: giftSignalIntensityEnum("signal_intensity"),
});

📊 State Management

State Transition Validation

export function canTransition(
  current: GiftProcessStatus,
  next: GiftProcessStatus,
): boolean {
  return giftProcessTransitions[current].includes(next);
}

// Usage in operations
export const updateProcessStatus = async (
  processId: string,
  newStatus: GiftProcessStatus,
) => {
  const currentProcess = await findProcessById(processId);

  if (!canTransition(currentProcess.status, newStatus)) {
    throw new Error(
      `Invalid transition from ${currentProcess.status} to ${newStatus}`,
    );
  }

  await db
    .update(giftProcessSchema)
    .set({ status: newStatus, updatedAt: new Date() })
    .where(eq(giftProcessSchema.id, processId));
};

State Persistence

⚡ Event-Driven Execution

Event Types

export const GiftingProcessJobs = {
  START_WORKFLOW: "start-workflow",
  OCCASION_DETECTED: "occasion-detected",
  OPTIONS_GENERATED: "options-generated",
  DECISION_WINDOW_OPEN: "decision-window-open",
  OPTION_CONFIRMED: "option-confirmed",
  OPTION_SELECTED: "option-selected",
} as const;

Event Handlers

export const handlers: Record<
  string,
  (payload: GiftProcessJobPayload) => Promise<void>
> = {
  "occasion-detected": async (payload) => {
    await GiftProcessJobHandler.processDetected(payload?.processId || "");
  },
  "options-generated": async (payload) => {
    await GiftProcessJobHandler.optionsGenerated(payload?.processId || "");
  },
  "decision-window-open": async (payload) => {
    await GiftProcessJobHandler.decisionWindowOpen(payload?.processId || "");
  },
  "option-confirmed": async (payload) => {
    await GiftProcessJobHandler.optionConfirmed(
      payload?.processId || "",
      payload?.giftId || "",
    );
  },
  "start-workflow": async (payload) => {
    await GiftProcessJobHandler.startGiftingWorkflow(payload?.occasionId || "");
  },
};

Event Flow Example

export class GiftProcessJobHandler {
  static async startGiftingWorkflow(occasionId: string) {
    // 1. Create workflow instance
    const process = await createProcess(occasionId, "OCCASION_DETECTED");

    // 2. Get context data from features
    const occasion = await findOccasionById(occasionId);
    const recipient = await findUserById(occasion.recipientId);
    const relationship = await findRelationship(occasion.relationshipId);

    // 3. Update process with context
    await updateProcess(process.id, {
      recipientId: recipient.id,
      relationshipId: relationship.id,
    });

    // 4. Trigger next step
    await giftingProcessQueue.enqueue({
      event: "occasion-detected",
      payload: { processId: process.id },
      metadata: { occasionId, step: "1" },
    });
  }

  static async processDetected(processId: string) {
    // 1. Get process context
    const process = await findProcessById(processId);

    // 2. Generate gift options using multiple features
    const options = await generateGiftOptions({
      recipientId: process.recipientId,
      relationshipId: process.relationshipId,
      occasionId: process.occasionId,
    });

    // 3. Update process state
    await updateProcessStatus(processId, "OPTIONS_GENERATED");

    // 4. Trigger next step
    await giftingProcessQueue.enqueue({
      event: "options-generated",
      payload: { processId, optionsCount: options.length },
    });
  }
}

👷 Worker Integration

Queue Configuration

export const giftingProcessQueue = createPgBossQueue<GiftProcessJobPayload>(
  PgBossQueueName.GiftingProcessQueue,
  boss,
);

Worker Setup

export const initGiftingProcessWorker = () => {
  return createPgBossWorker<GiftProcessJobPayload>(
    boss,
    PgBossQueueName.GiftingProcessQueue,
    async (job) => {
      const payload = job.data;
      const eventType = job.data.event || "start-workflow";
      const handler = handlers[eventType];

      if (handler) {
        await handler(payload);
      } else {
        logger.info(`Unknown event type: ${eventType}`);
      }
    },
  );
};

Job Processing

// Standard job structure
interface GiftProcessJobPayload {
  event: string;
  processId?: string;
  occasionId?: string;
  giftId?: string;
  metadata?: {
    source?: string;
    step?: string;
    [key: string]: any;
  };
}

🔧 Implementation Patterns

1. Workflow Creation

// operations/gifting-process.create.ts
export async function createProcess(
  occasionId: string,
  status?: GiftProcessStatus,
): Promise<any | null> {
  const [process] = await db
    .insert(giftProcessSchema)
    .values({
      occasionId,
      status: status || "OCCASION_DETECTED",
    })
    .onConflictDoNothing() // Prevent double-processing
    .returning();

  return process || null;
}

2. Workflow Updates

// operations/gifting-process.update.ts
export async function updateProcess(
  processId: string,
  updates: Partial<GiftProcessUpdate>,
) {
  const [updatedProcess] = await db
    .update(giftProcessSchema)
    .set({ ...updates, updatedAt: new Date() })
    .where(eq(giftProcessSchema.id, processId))
    .returning();

  return updatedProcess;
}

export async function updateProcessStatus(
  processId: string,
  newStatus: GiftProcessStatus,
) {
  const currentProcess = await findProcessById(processId);

  if (!canTransition(currentProcess.status, newStatus)) {
    throw new Error(
      `Invalid transition from ${currentProcess.status} to ${newStatus}`,
    );
  }

  return await updateProcess(processId, { status: newStatus });
}

3. Cross-Feature Coordination

// workers/gifting-process.events.ts
export class GiftProcessJobHandler {
  static async generateGiftOptions(processId: string) {
    const process = await findProcessById(processId);

    // Coordinate multiple features
    const [user, relationship, occasion] = await Promise.all([
      findUserById(process.recipientId),
      findRelationship(process.relationshipId),
      findOccasionById(process.occasionId),
    ]);

    // Use classifier feature for personalization
    const preferences = await classifierApi.getUserPreferences(user.id);

    // Use gifts feature for suggestions
    const suggestions = await giftsApi.getGiftSuggestions({
      preferences,
      relationship,
      occasion,
    });

    // Use decisions feature for ranking
    const rankedOptions = await decisionsApi.rankGifts(suggestions);

    return rankedOptions;
  }
}

🚀 Best Practices

1. State Management

2. Event Design

3. Error Handling

4. Performance

5. Testing


🔗 Related Documentation