Files
tower/docs/superpowers/plans/2026-05-27-multi-account-approval-workflow.md
maaz519 d33b4e40b8 fix: use type-only Baileys import and raw status code to fix Jest ESM issue
Replaces DisconnectReason enum import with type-only WASocket import and
uses 401 directly instead of DisconnectReason.loggedOut. Baileys is an ES
module that cannot be executed in Jest's CommonJS mode, so removing the
value import (keeping only type imports) prevents ts-jest from trying to
execute the module.

Also updated session-pool.test.ts to verify end() is called with the
expected Boom error object instead of undefined.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 17:40:24 +05:30

38 KiB
Raw Permalink Blame History

Multi-Account Architecture, Adapter Boundary & Approval Workflow Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Seal the WhatsApp adapter so Baileys types never leak outside src/whatsapp/; add multi-account bot support via a session pool loaded from the DB; implement the star reaction approval workflow with a rate-limited cross-group forward queue.

Architecture: NormalizedMessage and NormalizedReaction move to @tower/types (platform-neutral). session.ts normalizes raw Baileys events internally before calling any callback — main.ts never imports from @whiskeysockets/baileys. WhatsAppSessionPool manages N sessions indexed by accountId, loaded from the accounts DB table on startup. A reaction from an admin JID triggers handleStarReaction → writes Approval record → enqueues ForwardJobData jobs into a rate-limited BullMQ queue (20 forwards/min) to avoid WhatsApp bans.

Tech Stack: BullMQ 5, Prisma 6, Baileys 7.0.0-rc13, TypeScript 5, Jest 29, pnpm workspaces


File Map

Modify:

  • packages/types/src/message.ts — add NormalizedMessage, NormalizedReaction, ForwardJobData; add accountId to IngestJobData
  • apps/api/prisma/schema.prisma — add Account model, AccountStatus enum; add optional accountId to Group
  • apps/worker/src/whatsapp/normalizer.ts — import NormalizedMessage/NormalizedReaction from @tower/types; accept accountId param; add normalizeReaction
  • apps/worker/src/whatsapp/normalizer.test.ts — add reaction tests
  • apps/worker/src/whatsapp/group-sync.ts — accept accountId param, set on group upsert
  • apps/worker/src/whatsapp/session.ts — normalize + react inside handler; change callback types; accept accountId as first param
  • apps/worker/src/main.ts — use pool, load accounts from DB, wire reactions through approval

Create:

  • apps/worker/src/whatsapp/session-pool.tsWhatsAppSessionPool class
  • apps/worker/src/core/approval.tshandleStarReaction
  • apps/worker/src/core/approval.test.ts
  • apps/worker/src/queues/forward.queue.ts
  • apps/worker/src/queues/forward.processor.ts
  • apps/worker/src/queues/forward.processor.test.ts

Task 1: Extend @tower/types

Files:

  • Modify: packages/types/src/message.ts

Add NormalizedMessage, NormalizedReaction, ForwardJobData as platform-neutral shared types. Add accountId to IngestJobData. These types must contain zero Baileys imports.

  • Step 1: Replace packages/types/src/message.ts entirely
export type Platform = 'whatsapp' | 'telegram' | 'discord';

export type MessageStatus =
  | 'PENDING'
  | 'APPROVED'
  | 'REJECTED'
  | 'DISTRIBUTED'
  | 'ARCHIVED';

export type ApprovalDecision = 'APPROVED' | 'REJECTED';

// Platform-neutral normalized message — zero Baileys/Telegram types here
export interface NormalizedMessage {
  platformMsgId: string;
  sourceGroupJid: string;
  senderJid: string;
  senderName?: string;
  content: string;
  accountId: string;  // which bot account received this message
}

// Platform-neutral normalized reaction (e.g. WhatsApp emoji reaction)
export interface NormalizedReaction {
  reactorJid: string;
  targetMsgId: string;    // platformMsgId of the message being reacted to
  sourceGroupJid: string;
  emoji: string;
  accountId: string;      // which bot account received this reaction
}

export interface CanonicalMessage {
  messageId: string;
  platform: Platform;
  platformMsgId: string;
  sourceGroupId: string;
  senderJid: string;
  senderName?: string;
  content: string;
  mediaUrl?: string;
  tags: string[];
  status: MessageStatus;
  createdAt: Date;
}

export interface Group {
  id: string;
  platform: Platform;
  platformId: string;
  name: string;
  description?: string;
  isActive: boolean;
}

export interface SyncRoute {
  id: string;
  sourceGroupId: string;
  targetGroupId: string;
  isActive: boolean;
}

export interface IngestJobData {
  platformMsgId: string;
  platform: Platform;
  accountId: string;          // which bot account received this message
  sourceGroupId: string;
  senderJid: string;
  senderName?: string;
  content: string;
  tags: string[];
}

export interface ForwardJobData {
  messageId: string;          // DB id of the approved Message record
  content: string;
  sourceGroupName: string;
  senderName?: string;
  toGroupJid: string;
  fromAccountId: string;      // which bot account to send the forward from
}
  • Step 2: Confirm types package compiles with zero errors
cd /path/to/repo && pnpm --filter @tower/types build

Expected: exits 0, no TypeScript errors.

  • Step 3: Commit
git add packages/types/src/message.ts
git commit -m "feat(types): NormalizedMessage, NormalizedReaction, ForwardJobData; accountId on IngestJobData"

Task 2: Seal the WhatsApp adapter boundary

Files:

  • Modify: apps/worker/src/whatsapp/normalizer.ts
  • Modify: apps/worker/src/whatsapp/normalizer.test.ts
  • Modify: apps/worker/src/whatsapp/session.ts

After this task: main.ts will never import anything from @whiskeysockets/baileys. All Baileys types stay inside src/whatsapp/.

  • Step 1: Replace apps/worker/src/whatsapp/normalizer.ts
import { proto } from '@whiskeysockets/baileys';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';

function extractText(msg: proto.IWebMessageInfo): string {
  const m = msg.message;
  if (!m) return '';
  return (
    m.conversation ||
    m.extendedTextMessage?.text ||
    m.imageMessage?.caption ||
    m.videoMessage?.caption ||
    m.documentMessage?.caption ||
    ''
  );
}

export function normalizeMessage(
  msg: proto.IWebMessageInfo,
  accountId: string,
): NormalizedMessage | null {
  const key = msg.key;
  if (!key) return null;

  const remoteJid = key.remoteJid ?? '';
  if (!remoteJid.endsWith('@g.us')) return null;
  if (key.fromMe) return null;
  if (!msg.message) return null;

  const platformMsgId = key.id;
  if (!platformMsgId) return null;

  const content = extractText(msg);

  return {
    platformMsgId,
    sourceGroupJid: remoteJid,
    senderJid: key.participant ?? '',
    senderName: msg.pushName ?? undefined,
    content,
    accountId,
  };
}

export function normalizeReaction(
  msg: proto.IWebMessageInfo,
  accountId: string,
): NormalizedReaction | null {
  const key = msg.key;
  if (!key) return null;

  const remoteJid = key.remoteJid ?? '';
  if (!remoteJid.endsWith('@g.us')) return null;
  if (key.fromMe) return null;

  const reaction = msg.message?.reactionMessage;
  if (!reaction) return null;

  const targetMsgId = reaction.key?.id;
  if (!targetMsgId) return null;

  return {
    reactorJid: key.participant ?? '',
    targetMsgId,
    sourceGroupJid: remoteJid,
    emoji: reaction.text ?? '',
    accountId,
  };
}
  • Step 2: Write failing tests for normalizeReaction

Append to apps/worker/src/whatsapp/normalizer.test.ts (keep all existing tests, add below):

import { normalizeReaction } from './normalizer';

describe('normalizeReaction', () => {
  it('normalizes a star reaction from a group participant', () => {
    const msg = {
      key: {
        remoteJid: '120363043312345678@g.us',
        fromMe: false,
        id: 'REACTION_ID',
        participant: '919876543210@s.whatsapp.net',
      },
      message: {
        reactionMessage: {
          key: { remoteJid: '120363043312345678@g.us', id: 'TARGET_MSG_ID' },
          text: '⭐',
        },
      },
    } as proto.IWebMessageInfo;

    const result = normalizeReaction(msg, 'acc_1');
    expect(result).toMatchObject({
      reactorJid: '919876543210@s.whatsapp.net',
      targetMsgId: 'TARGET_MSG_ID',
      sourceGroupJid: '120363043312345678@g.us',
      emoji: '⭐',
      accountId: 'acc_1',
    });
  });

  it('returns null when reactionMessage is missing (regular message)', () => {
    const msg = {
      key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'ID' },
      message: { conversation: 'hello' },
    } as proto.IWebMessageInfo;
    expect(normalizeReaction(msg, 'acc_1')).toBeNull();
  });

  it('returns null for own reactions (fromMe=true)', () => {
    const msg = {
      key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'ID' },
      message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } },
    } as proto.IWebMessageInfo;
    expect(normalizeReaction(msg, 'acc_1')).toBeNull();
  });

  it('returns null for DM reactions (non-group jid)', () => {
    const msg = {
      key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'ID' },
      message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } },
    } as proto.IWebMessageInfo;
    expect(normalizeReaction(msg, 'acc_1')).toBeNull();
  });

  it('returns null when targetMsgId is missing', () => {
    const msg = {
      key: {
        remoteJid: '120363043312345678@g.us',
        fromMe: false,
        id: 'ID',
        participant: '91@s.whatsapp.net',
      },
      message: { reactionMessage: { key: {}, text: '⭐' } },
    } as proto.IWebMessageInfo;
    expect(normalizeReaction(msg, 'acc_1')).toBeNull();
  });
});

Also update the normalizeMessage tests: the function signature changed to accept accountId as second param. Update every normalizeMessage(makeMsg(...)) call to normalizeMessage(makeMsg(...), 'acc_1') and add accountId: 'acc_1' to the expected toMatchObject where relevant.

  • Step 3: Run normalizer tests to confirm they pass
pnpm --filter @tower/worker test -- --testPathPattern=normalizer

Expected: all existing tests + 5 new reaction tests pass.

  • Step 4: Replace apps/worker/src/whatsapp/session.ts
import makeWASocket, {
  useMultiFileAuthState,
  fetchLatestBaileysVersion,
  DisconnectReason,
  WASocket,
  GroupMetadata,
} from '@whiskeysockets/baileys';
import { Boom } from '@hapi/boom';
import qrcode from 'qrcode-terminal';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
import { normalizeMessage, normalizeReaction } from './normalizer';
import { createLogger } from '@tower/logger';

const logger = createLogger('whatsapp-session');

export type OnMessageCallback = (msg: NormalizedMessage) => Promise<void> | void;
export type OnReactionCallback = (reaction: NormalizedReaction) => Promise<void> | void;
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => Promise<void> | void;

export async function createWhatsAppSession(
  accountId: string,
  sessionPath: string,
  onMessage: OnMessageCallback,
  onReaction: OnReactionCallback,
  onGroups: OnGroupsCallback,
): Promise<WASocket> {
  const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
  const { version } = await fetchLatestBaileysVersion();

  const sock = makeWASocket({
    version,
    auth: state,
    printQRInTerminal: false,
    logger: logger as any,
  });

  sock.ev.on('creds.update', saveCreds);

  sock.ev.on('connection.update', async ({ connection, qr, lastDisconnect }) => {
    if (qr) {
      qrcode.generate(qr, { small: true });
    }
    if (connection === 'close') {
      const reason = (lastDisconnect?.error as Boom)?.output?.statusCode;
      const shouldReconnect = reason !== DisconnectReason.loggedOut;
      logger.info({ reason, shouldReconnect }, 'Connection closed');
      if (shouldReconnect) {
        logger.info('Reconnecting in 5s...');
        setTimeout(
          () => createWhatsAppSession(accountId, sessionPath, onMessage, onReaction, onGroups),
          5000,
        );
      }
    } else if (connection === 'open') {
      try {
        logger.info({ accountId }, 'WhatsApp connected');
        const groups = await sock.groupFetchAllParticipating();
        await Promise.resolve(onGroups(groups)).catch((err) =>
          logger.error({ err }, 'Group sync error'),
        );
      } catch (err) {
        logger.error({ err }, 'Failed to fetch groups on connect');
      }
    }
  });

  sock.ev.on('messages.upsert', ({ messages, type }) => {
    if (type !== 'notify') return;
    for (const msg of messages) {
      if (msg.message?.reactionMessage) {
        const reaction = normalizeReaction(msg, accountId);
        if (reaction) {
          void Promise.resolve(onReaction(reaction)).catch((err) =>
            logger.error({ err }, 'Error processing reaction'),
          );
        }
        continue;
      }
      const normalized = normalizeMessage(msg, accountId);
      if (!normalized) continue;
      void Promise.resolve(onMessage(normalized)).catch((err) =>
        logger.error({ err }, 'Error processing message'),
      );
    }
  });

  return sock;
}
  • Step 5: Commit
git add apps/worker/src/whatsapp/normalizer.ts \
        apps/worker/src/whatsapp/normalizer.test.ts \
        apps/worker/src/whatsapp/session.ts
git commit -m "feat(worker): seal WhatsApp adapter — normalize inside session, reactions handled internally"

Task 3: Prisma — Account model and Group.accountId

Files:

  • Modify: apps/api/prisma/schema.prisma

  • Step 1: Add AccountStatus enum and Account model to the schema

Open apps/api/prisma/schema.prisma. Append after the ConsentRecord model block:

enum AccountStatus {
  ACTIVE
  DISCONNECTED
  BANNED
}

model Account {
  id          String        @id @default(cuid())
  platform    String
  jid         String
  sessionPath String
  displayName String?
  status      AccountStatus @default(ACTIVE)
  groups      Group[]
  createdAt   DateTime      @default(now())
  updatedAt   DateTime      @updatedAt

  @@unique([platform, jid])
}
  • Step 2: Add optional accountId to the Group model

The full updated Group model (replace the existing one):

model Group {
  id          String   @id @default(cuid())
  platform    String
  platformId  String
  name        String
  description String?
  isActive    Boolean  @default(true)
  accountId   String?
  account     Account? @relation(fields: [accountId], references: [id])
  createdAt   DateTime @default(now())
  updatedAt   DateTime @updatedAt

  messages       Message[]
  syncRoutesFrom SyncRoute[]     @relation("sourceGroup")
  syncRoutesTo   SyncRoute[]     @relation("targetGroup")
  consentRecords ConsentRecord[]

  @@unique([platform, platformId])
}
  • Step 3: Run migration
cd apps/api && DATABASE_URL="postgresql://tower:tower@localhost:5433/tower" \
  npx prisma migrate dev --name add-account-model

Expected: "Your database is now in sync with your schema."

  • Step 4: Regenerate Prisma client in worker
pnpm --filter @tower/worker generate

Expected: "Generated Prisma Client..."

  • Step 5: Seed the first Account record

Find your WhatsApp session's JID. It is stored in sessions/creds.json (the file at the path you set as WHATSAPP_SESSION_PATH in .env). Open it and look for "me": { "id": "..." } — that string is your JID.

Insert the account record (replace YOUR_JID and YOUR_SESSION_PATH):

psql "postgresql://tower:tower@localhost:5433/tower" <<'SQL'
INSERT INTO "Account" (id, platform, jid, "sessionPath", "displayName", status, "createdAt", "updatedAt")
VALUES (
  'acc_' || substring(gen_random_uuid()::text, 1, 8),
  'whatsapp',
  'YOUR_JID',
  'YOUR_SESSION_PATH',
  'Bot 1',
  'ACTIVE',
  now(),
  now()
)
ON CONFLICT DO NOTHING;
SQL

Verify it was inserted:

psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT id, jid, "sessionPath", status FROM "Account";'

Expected: one row with your JID and ACTIVE status.

  • Step 6: Commit
git add apps/api/prisma/schema.prisma apps/api/prisma/migrations/
git commit -m "feat(schema): Account model with AccountStatus enum, optional Group.accountId"

Task 4: WhatsAppSessionPool and updated group-sync

Files:

  • Modify: apps/worker/src/whatsapp/group-sync.ts

  • Create: apps/worker/src/whatsapp/session-pool.ts

  • Step 1: Update group-sync.ts to accept accountId

Replace apps/worker/src/whatsapp/group-sync.ts:

import { GroupMetadata } from '@whiskeysockets/baileys';
import { createLogger } from '@tower/logger';

const logger = createLogger('group-sync');

export async function syncGroups(
  groups: Record<string, GroupMetadata>,
  accountId: string,
  prisma: any,
): Promise<Map<string, string>> {
  const jidToDbId = new Map<string, string>();

  for (const [jid, meta] of Object.entries(groups)) {
    const group = await prisma.group.upsert({
      where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
      create: {
        platform: 'whatsapp',
        platformId: jid,
        name: meta.subject,
        description: meta.desc ?? undefined,
        isActive: true,
        accountId,
      },
      update: {
        name: meta.subject,
        description: meta.desc ?? undefined,
        accountId,
      },
    });
    jidToDbId.set(jid, group.id);
  }

  logger.info({ count: jidToDbId.size, accountId }, 'Groups synced');
  return jidToDbId;
}
  • Step 2: Create apps/worker/src/whatsapp/session-pool.ts
import { WASocket } from '@whiskeysockets/baileys';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
import { createWhatsAppSession } from './session';
import { createLogger } from '@tower/logger';

const logger = createLogger('session-pool');

// Callbacks the pool exposes to main.ts — accountId injected by the pool
export type PoolMessageCallback = (msg: NormalizedMessage, accountId: string) => Promise<void> | void;
export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: string) => Promise<void> | void;
// groups typed as `any` — GroupMetadata (Baileys type) stays inside whatsapp/ folder
export type PoolGroupsCallback = (groups: any, accountId: string) => Promise<void> | void;

export class WhatsAppSessionPool {
  private sessions = new Map<string, WASocket>();

  async add(
    accountId: string,
    sessionPath: string,
    onMessage: PoolMessageCallback,
    onReaction: PoolReactionCallback,
    onGroups: PoolGroupsCallback,
  ): Promise<void> {
    logger.info({ accountId }, 'Starting session');
    const sock = await createWhatsAppSession(
      accountId,
      sessionPath,
      (msg) => onMessage(msg, accountId),
      (reaction) => onReaction(reaction, accountId),
      (groups) => onGroups(groups, accountId),
    );
    this.sessions.set(accountId, sock);
  }

  get(accountId: string): WASocket | undefined {
    return this.sessions.get(accountId);
  }

  getAll(): Map<string, WASocket> {
    return this.sessions;
  }

  async sendMessage(accountId: string, groupJid: string, text: string): Promise<void> {
    const sock = this.sessions.get(accountId);
    if (!sock) throw new Error(`No active session for account ${accountId}`);
    await sock.sendMessage(groupJid, { text });
  }

  async remove(accountId: string): Promise<void> {
    const sock = this.sessions.get(accountId);
    if (sock) {
      await sock.logout().catch(() => {});
      this.sessions.delete(accountId);
      logger.info({ accountId }, 'Session removed');
    }
  }
}
  • Step 3: Confirm worker compiles (ignoring main.ts errors for now)
pnpm --filter @tower/worker build 2>&1 | grep -v "main.ts"

Expected: no errors outside main.ts (it still uses old signatures — fixed in Task 7).

  • Step 4: Commit
git add apps/worker/src/whatsapp/session-pool.ts apps/worker/src/whatsapp/group-sync.ts
git commit -m "feat(worker): WhatsAppSessionPool + group-sync accepts accountId"

Task 5: Approval core logic

Files:

  • Create: apps/worker/src/core/approval.test.ts
  • Create: apps/worker/src/core/approval.ts

Context: When an admin reacts with to a group message, handleStarReaction finds the message in DB, marks it APPROVED, creates an Approval record, then returns ForwardJobData[] — one entry per active SyncRoute from the message's source group. The caller (main.ts) enqueues those as forward jobs.

The Prisma relation graph: Message.sourceGroup → Group.syncRoutesFrom → SyncRoute.targetGroup → Group. The query must include sourceGroup.syncRoutesFrom.targetGroup — NOT a top-level syncRoutesFrom on Message (that relation doesn't exist on Message).

  • Step 1: Write the failing tests

Create apps/worker/src/core/approval.test.ts:

import { handleStarReaction } from './approval';
import { NormalizedReaction } from '@tower/types';

function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
  return {
    reactorJid: '919876543210@s.whatsapp.net',
    targetMsgId: 'TARGET_MSG_123',
    sourceGroupJid: '120363043312345678@g.us',
    emoji: '⭐',
    accountId: 'acc_1',
    ...overrides,
  };
}

const adminJids = ['919876543210@s.whatsapp.net'];

describe('handleStarReaction', () => {
  it('returns null for non-star emoji', async () => {
    const result = await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any);
    expect(result).toBeNull();
  });

  it('returns null when reactor is not an admin', async () => {
    const result = await handleStarReaction(
      makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }),
      adminJids,
      {} as any,
    );
    expect(result).toBeNull();
  });

  it('returns null when message not found', async () => {
    const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
    const result = await handleStarReaction(makeReaction(), adminJids, prisma);
    expect(result).toBeNull();
    expect(prisma.message.findUnique).toHaveBeenCalledWith({
      where: {
        platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' },
      },
      include: {
        approval: true,
        sourceGroup: {
          include: {
            syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
          },
        },
      },
    });
  });

  it('returns null when message status is not PENDING', async () => {
    const prisma = {
      message: {
        findUnique: jest.fn().mockResolvedValue({
          id: 'msg_1',
          status: 'REJECTED',
          approval: null,
          sourceGroup: { name: 'Test Group', syncRoutesFrom: [] },
        }),
      },
    } as any;
    expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
  });

  it('returns null when message is already approved (approval record exists)', async () => {
    const prisma = {
      message: {
        findUnique: jest.fn().mockResolvedValue({
          id: 'msg_1',
          status: 'APPROVED',
          approval: { id: 'appr_1' },
          sourceGroup: { name: 'Test Group', syncRoutesFrom: [] },
        }),
      },
    } as any;
    expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
  });

  it('approves message and returns empty array when no sync routes', async () => {
    const prisma = {
      message: {
        findUnique: jest.fn().mockResolvedValue({
          id: 'msg_1',
          status: 'PENDING',
          approval: null,
          content: 'hello',
          senderName: 'Alice',
          sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
        }),
        update: jest.fn().mockResolvedValue({}),
      },
      approval: { create: jest.fn().mockResolvedValue({}) },
    } as any;

    const result = await handleStarReaction(makeReaction(), adminJids, prisma);
    expect(result).toEqual([]);
    expect(prisma.message.update).toHaveBeenCalledWith({
      where: { id: 'msg_1' },
      data: { status: 'APPROVED' },
    });
    expect(prisma.approval.create).toHaveBeenCalledWith({
      data: {
        messageId: 'msg_1',
        adminId: '919876543210@s.whatsapp.net',
        decision: 'APPROVED',
      },
    });
  });

  it('returns ForwardJobData for each active sync route', async () => {
    const prisma = {
      message: {
        findUnique: jest.fn().mockResolvedValue({
          id: 'msg_1',
          status: 'PENDING',
          approval: null,
          content: 'important announcement',
          senderName: 'Bob',
          sourceGroup: {
            name: 'Source Group',
            syncRoutesFrom: [
              { targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
              { targetGroup: { platformId: '888@g.us', accountId: null } },
            ],
          },
        }),
        update: jest.fn().mockResolvedValue({}),
      },
      approval: { create: jest.fn().mockResolvedValue({}) },
    } as any;

    const result = await handleStarReaction(makeReaction(), adminJids, prisma);
    expect(result).toHaveLength(2);
    expect(result![0]).toMatchObject({
      messageId: 'msg_1',
      content: 'important announcement',
      sourceGroupName: 'Source Group',
      senderName: 'Bob',
      toGroupJid: '999@g.us',
      fromAccountId: 'acc_2',
    });
    // falls back to reaction.accountId when targetGroup.accountId is null
    expect(result![1]).toMatchObject({
      toGroupJid: '888@g.us',
      fromAccountId: 'acc_1',
    });
  });
});
  • Step 2: Run tests to confirm they fail
pnpm --filter @tower/worker test -- --testPathPattern=approval

Expected: FAIL — "Cannot find module './approval'"

  • Step 3: Create apps/worker/src/core/approval.ts
import { NormalizedReaction, ForwardJobData } from '@tower/types';

export async function handleStarReaction(
  reaction: NormalizedReaction,
  adminJids: string[],
  prisma: any,
): Promise<ForwardJobData[] | null> {
  if (reaction.emoji !== '⭐') return null;
  if (!adminJids.includes(reaction.reactorJid)) return null;

  const message = await prisma.message.findUnique({
    where: {
      platform_platformMsgId: {
        platform: 'whatsapp',
        platformMsgId: reaction.targetMsgId,
      },
    },
    include: {
      approval: true,
      sourceGroup: {
        include: {
          syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
        },
      },
    },
  });

  if (!message) return null;
  if (message.status !== 'PENDING') return null;
  if (message.approval) return null;

  await prisma.message.update({
    where: { id: message.id },
    data: { status: 'APPROVED' },
  });

  await prisma.approval.create({
    data: {
      messageId: message.id,
      adminId: reaction.reactorJid,
      decision: 'APPROVED',
    },
  });

  const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom.map((route: any) => ({
    messageId: message.id,
    content: message.content,
    sourceGroupName: message.sourceGroup.name,
    senderName: message.senderName ?? undefined,
    toGroupJid: route.targetGroup.platformId,
    fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
  }));

  return jobs;
}
  • Step 4: Run tests to confirm they pass
pnpm --filter @tower/worker test -- --testPathPattern=approval

Expected: 7 tests pass.

  • Step 5: Commit
git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts
git commit -m "feat(worker): handleStarReaction approval core with tests"

Task 6: Forward queue and processor

Files:

  • Create: apps/worker/src/queues/forward.processor.test.ts
  • Create: apps/worker/src/queues/forward.processor.ts
  • Create: apps/worker/src/queues/forward.queue.ts

The rate limiter (20 forwards/min) goes on the Worker, not the Queue — this is the BullMQ v5 pattern.

  • Step 1: Write the failing processor test

Create apps/worker/src/queues/forward.processor.test.ts:

import { processForwardJob } from './forward.processor';
import { ForwardJobData } from '@tower/types';

const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) };

const baseJob: ForwardJobData = {
  messageId: 'msg_1',
  content: 'Event this Saturday at the temple',
  sourceGroupName: 'UP Parivar Dallas',
  senderName: 'Rajesh',
  toGroupJid: '120363099999@g.us',
  fromAccountId: 'acc_1',
};

describe('processForwardJob', () => {
  beforeEach(() => jest.clearAllMocks());

  it('sends a formatted message via the pool', async () => {
    await processForwardJob(baseJob, mockPool as any);
    expect(mockPool.sendMessage).toHaveBeenCalledWith(
      'acc_1',
      '120363099999@g.us',
      expect.stringContaining('Event this Saturday at the temple'),
    );
  });

  it('includes source group name in the forwarded text', async () => {
    await processForwardJob(baseJob, mockPool as any);
    const [, , text] = mockPool.sendMessage.mock.calls[0];
    expect(text).toContain('UP Parivar Dallas');
  });

  it('includes sender name in the forwarded text', async () => {
    await processForwardJob(baseJob, mockPool as any);
    const [, , text] = mockPool.sendMessage.mock.calls[0];
    expect(text).toContain('Rajesh');
  });

  it('handles missing senderName without throwing', async () => {
    await processForwardJob({ ...baseJob, senderName: undefined }, mockPool as any);
    expect(mockPool.sendMessage).toHaveBeenCalledTimes(1);
  });

  it('throws when pool has no session for the account', async () => {
    const brokenPool = {
      sendMessage: jest.fn().mockRejectedValue(new Error('No active session for account acc_99')),
    };
    await expect(
      processForwardJob({ ...baseJob, fromAccountId: 'acc_99' }, brokenPool as any),
    ).rejects.toThrow('No active session');
  });
});
  • Step 2: Run test to confirm it fails
pnpm --filter @tower/worker test -- --testPathPattern=forward.processor

Expected: FAIL — "Cannot find module './forward.processor'"

  • Step 3: Create apps/worker/src/queues/forward.processor.ts
import { Worker } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
import { WhatsAppSessionPool } from '../whatsapp/session-pool';

function formatForwardText(job: ForwardJobData): string {
  const sender = job.senderName ? `_${job.senderName}_` : '_Unknown_';
  return `📢 *${job.sourceGroupName}*\n${sender}:\n\n${job.content}\n\n_— Forwarded by TOWER_`;
}

export async function processForwardJob(
  job: ForwardJobData,
  pool: WhatsAppSessionPool,
): Promise<void> {
  await pool.sendMessage(job.fromAccountId, job.toGroupJid, formatForwardText(job));
}

export function createForwardWorker(
  redisUrl: string,
  pool: WhatsAppSessionPool,
): Worker<ForwardJobData> {
  return new Worker<ForwardJobData>(
    'tower-forward',
    async (job) => processForwardJob(job.data, pool),
    {
      connection: parseRedisUrl(redisUrl),
      limiter: { max: 20, duration: 60_000 }, // 20 forwards per minute — avoids WhatsApp bans
    },
  );
}
  • Step 4: Create apps/worker/src/queues/forward.queue.ts
import { Queue } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';

export function createForwardQueue(redisUrl: string): Queue<ForwardJobData> {
  return new Queue<ForwardJobData>('tower-forward', {
    connection: parseRedisUrl(redisUrl),
  });
}
  • Step 5: Run processor tests to confirm they pass
pnpm --filter @tower/worker test -- --testPathPattern=forward.processor

Expected: 5 tests pass.

  • Step 6: Commit
git add apps/worker/src/queues/forward.queue.ts \
        apps/worker/src/queues/forward.processor.ts \
        apps/worker/src/queues/forward.processor.test.ts
git commit -m "feat(worker): forward queue + processor with 20/min rate limiter"

Task 7: Wire main.ts — multi-account pool, reactions, full pipeline

Files:

  • Modify: apps/worker/src/main.ts

This is the final assembly. main.ts no longer imports anything from @whiskeysockets/baileys. It loads accounts from the DB, creates the pool, and wires: ingest message → tag detect → ingest queue; reaction → approval → forward queue.

  • Step 1: Replace apps/worker/src/main.ts entirely
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';

const logger = createLogger('tower-worker');

async function bootstrap() {
  const env = validateEnv();
  const prisma = new PrismaClient();
  await prisma.$connect();

  const adminJids = env.TOWER_ADMIN_JIDS
    ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
    : [];

  const ingestQueue = createIngestQueue(env.REDIS_URL);
  const forwardQueue = createForwardQueue(env.REDIS_URL);
  const pool = new WhatsAppSessionPool();

  const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
  const forwardWorker = createForwardWorker(env.REDIS_URL, pool);

  ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
  ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
  forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
  forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));

  // Load active accounts from DB — each becomes one WhatsApp session
  const accounts = await prisma.account.findMany({
    where: { status: 'ACTIVE', platform: 'whatsapp' },
  });

  if (accounts.length === 0) {
    logger.warn('No active WhatsApp accounts found — add one via Prisma Studio (see Task 3 Step 5)');
  }

  // Per-account map of groupJid → DB Group id
  const groupMaps = new Map<string, Map<string, string>>();

  for (const account of accounts) {
    groupMaps.set(account.id, new Map());

    await pool.add(
      account.id,
      account.sessionPath,
      async (msg, accountId) => {
        const tags = detectTags(msg.content, msg.senderJid, adminJids);
        if (!isFlagged(tags)) return;

        const groupMap = groupMaps.get(accountId) ?? new Map();
        const sourceGroupId = groupMap.get(msg.sourceGroupJid);
        if (!sourceGroupId) {
          logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
          return;
        }

        await ingestQueue.add(
          'ingest',
          {
            platformMsgId: msg.platformMsgId,
            platform: 'whatsapp',
            accountId,
            sourceGroupId,
            senderJid: msg.senderJid,
            senderName: msg.senderName,
            content: msg.content,
            tags,
          },
          { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
        );

        logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
      },
      async (reaction) => {
        const forwardJobs = await handleStarReaction(reaction, adminJids, prisma);
        if (!forwardJobs || forwardJobs.length === 0) return;

        for (const job of forwardJobs) {
          await forwardQueue.add('forward', job, {
            attempts: 3,
            backoff: { type: 'exponential', delay: 2000 },
          });
        }

        logger.info(
          { count: forwardJobs.length, messageId: forwardJobs[0]?.messageId },
          'Forward jobs enqueued',
        );
      },
      async (groups, accountId) => {
        logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
        const map = await syncGroups(groups, accountId, prisma);
        groupMaps.set(accountId, map);
      },
    );
  }

  logger.info({ accountCount: accounts.length }, 'Tower worker ready');

  const shutdown = async () => {
    logger.info('Shutting down...');
    await ingestWorker.close();
    await forwardWorker.close();
    await ingestQueue.close();
    await forwardQueue.close();
    await prisma.$disconnect();
    process.exit(0);
  };

  process.on('SIGTERM', shutdown);
  process.on('SIGINT', shutdown);
}

bootstrap().catch((err) => {
  console.error('Worker failed to start', err);
  process.exit(1);
});
  • Step 2: Build to confirm zero TypeScript errors
pnpm --filter @tower/worker build

Expected: 0 errors. dist/ created.

  • Step 3: Run all worker tests
pnpm --filter @tower/worker test

Expected: all tests pass (normalizer ×13, approval ×7, forward.processor ×5).

  • Step 4: Start the worker and verify multi-account startup
pnpm --filter @tower/worker dev

Expected log output (with one account seeded):

INFO (tower-worker): Tower worker ready {"accountCount": 1}
INFO (whatsapp-session): WhatsApp connected {"accountId": "acc_..."}
INFO (group-sync): Groups synced {"count": N, "accountId": "acc_..."}

If accountCount is 0, the account row wasn't inserted in Task 3 Step 5 — insert it now.

  • Step 5: End-to-end smoke test

With the worker running and the WhatsApp session authenticated:

  1. Send a message containing #important from a non-bot number to a group the bot is in.
  2. Confirm the worker logs "Message enqueued" with the correct tags.
  3. Check the DB: psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT status, tags FROM "Message" ORDER BY "createdAt" DESC LIMIT 3;' Expected: one row with status = PENDING and tags = {#important}.
  4. React to that message with from an admin JID (the JID listed in TOWER_ADMIN_JIDS in .env).
  5. Confirm the worker logs "Forward jobs enqueued".
  6. Check DB approvals: SELECT decision, "adminId" FROM "Approval" ORDER BY "decidedAt" DESC LIMIT 1; Expected: one row with decision = APPROVED.
  • Step 6: Commit
git add apps/worker/src/main.ts
git commit -m "feat(worker): multi-account session pool, reactions → approval → forward pipeline"

What this plan does NOT cover (Plan 4+)

  • Admin dashboard UI for approving messages (Next.js — Plan 5)
  • Meilisearch archive indexing (Plan 4)
  • Adding a second bot account at runtime without restart (future: webhook endpoint to trigger pool.add)
  • /tower bot command handling
  • Rejection workflow (admin removes or uses a different emoji/command)