# Multi-Account Architecture, Adapter Boundary & Approval Workflow Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Seal the WhatsApp adapter so Baileys types never leak outside `src/whatsapp/`; add multi-account bot support via a session pool loaded from the DB; implement the ⭐ star reaction approval workflow with a rate-limited cross-group forward queue. **Architecture:** `NormalizedMessage` and `NormalizedReaction` move to `@tower/types` (platform-neutral). `session.ts` normalizes raw Baileys events internally before calling any callback — `main.ts` never imports from `@whiskeysockets/baileys`. `WhatsAppSessionPool` manages N sessions indexed by `accountId`, loaded from the `accounts` DB table on startup. A ⭐ reaction from an admin JID triggers `handleStarReaction` → writes `Approval` record → enqueues `ForwardJobData` jobs into a rate-limited BullMQ queue (20 forwards/min) to avoid WhatsApp bans. **Tech Stack:** BullMQ 5, Prisma 6, Baileys 7.0.0-rc13, TypeScript 5, Jest 29, pnpm workspaces --- ## File Map **Modify:** - `packages/types/src/message.ts` — add `NormalizedMessage`, `NormalizedReaction`, `ForwardJobData`; add `accountId` to `IngestJobData` - `apps/api/prisma/schema.prisma` — add `Account` model, `AccountStatus` enum; add optional `accountId` to `Group` - `apps/worker/src/whatsapp/normalizer.ts` — import `NormalizedMessage`/`NormalizedReaction` from `@tower/types`; accept `accountId` param; add `normalizeReaction` - `apps/worker/src/whatsapp/normalizer.test.ts` — add reaction tests - `apps/worker/src/whatsapp/group-sync.ts` — accept `accountId` param, set on group upsert - `apps/worker/src/whatsapp/session.ts` — normalize + react inside handler; change callback types; accept `accountId` as first param - `apps/worker/src/main.ts` — use pool, load accounts from DB, wire reactions through approval **Create:** - `apps/worker/src/whatsapp/session-pool.ts` — `WhatsAppSessionPool` class - `apps/worker/src/core/approval.ts` — `handleStarReaction` - `apps/worker/src/core/approval.test.ts` - `apps/worker/src/queues/forward.queue.ts` - `apps/worker/src/queues/forward.processor.ts` - `apps/worker/src/queues/forward.processor.test.ts` --- ### Task 1: Extend @tower/types **Files:** - Modify: `packages/types/src/message.ts` Add `NormalizedMessage`, `NormalizedReaction`, `ForwardJobData` as platform-neutral shared types. Add `accountId` to `IngestJobData`. These types must contain zero Baileys imports. - [ ] **Step 1: Replace `packages/types/src/message.ts` entirely** ```typescript export type Platform = 'whatsapp' | 'telegram' | 'discord'; export type MessageStatus = | 'PENDING' | 'APPROVED' | 'REJECTED' | 'DISTRIBUTED' | 'ARCHIVED'; export type ApprovalDecision = 'APPROVED' | 'REJECTED'; // Platform-neutral normalized message — zero Baileys/Telegram types here export interface NormalizedMessage { platformMsgId: string; sourceGroupJid: string; senderJid: string; senderName?: string; content: string; accountId: string; // which bot account received this message } // Platform-neutral normalized reaction (e.g. WhatsApp emoji reaction) export interface NormalizedReaction { reactorJid: string; targetMsgId: string; // platformMsgId of the message being reacted to sourceGroupJid: string; emoji: string; accountId: string; // which bot account received this reaction } export interface CanonicalMessage { messageId: string; platform: Platform; platformMsgId: string; sourceGroupId: string; senderJid: string; senderName?: string; content: string; mediaUrl?: string; tags: string[]; status: MessageStatus; createdAt: Date; } export interface Group { id: string; platform: Platform; platformId: string; name: string; description?: string; isActive: boolean; } export interface SyncRoute { id: string; sourceGroupId: string; targetGroupId: string; isActive: boolean; } export interface IngestJobData { platformMsgId: string; platform: Platform; accountId: string; // which bot account received this message sourceGroupId: string; senderJid: string; senderName?: string; content: string; tags: string[]; } export interface ForwardJobData { messageId: string; // DB id of the approved Message record content: string; sourceGroupName: string; senderName?: string; toGroupJid: string; fromAccountId: string; // which bot account to send the forward from } ``` - [ ] **Step 2: Confirm types package compiles with zero errors** ```bash cd /path/to/repo && pnpm --filter @tower/types build ``` Expected: exits 0, no TypeScript errors. - [ ] **Step 3: Commit** ```bash git add packages/types/src/message.ts git commit -m "feat(types): NormalizedMessage, NormalizedReaction, ForwardJobData; accountId on IngestJobData" ``` --- ### Task 2: Seal the WhatsApp adapter boundary **Files:** - Modify: `apps/worker/src/whatsapp/normalizer.ts` - Modify: `apps/worker/src/whatsapp/normalizer.test.ts` - Modify: `apps/worker/src/whatsapp/session.ts` After this task: `main.ts` will never import anything from `@whiskeysockets/baileys`. All Baileys types stay inside `src/whatsapp/`. - [ ] **Step 1: Replace `apps/worker/src/whatsapp/normalizer.ts`** ```typescript import { proto } from '@whiskeysockets/baileys'; import { NormalizedMessage, NormalizedReaction } from '@tower/types'; function extractText(msg: proto.IWebMessageInfo): string { const m = msg.message; if (!m) return ''; return ( m.conversation || m.extendedTextMessage?.text || m.imageMessage?.caption || m.videoMessage?.caption || m.documentMessage?.caption || '' ); } export function normalizeMessage( msg: proto.IWebMessageInfo, accountId: string, ): NormalizedMessage | null { const key = msg.key; if (!key) return null; const remoteJid = key.remoteJid ?? ''; if (!remoteJid.endsWith('@g.us')) return null; if (key.fromMe) return null; if (!msg.message) return null; const platformMsgId = key.id; if (!platformMsgId) return null; const content = extractText(msg); return { platformMsgId, sourceGroupJid: remoteJid, senderJid: key.participant ?? '', senderName: msg.pushName ?? undefined, content, accountId, }; } export function normalizeReaction( msg: proto.IWebMessageInfo, accountId: string, ): NormalizedReaction | null { const key = msg.key; if (!key) return null; const remoteJid = key.remoteJid ?? ''; if (!remoteJid.endsWith('@g.us')) return null; if (key.fromMe) return null; const reaction = msg.message?.reactionMessage; if (!reaction) return null; const targetMsgId = reaction.key?.id; if (!targetMsgId) return null; return { reactorJid: key.participant ?? '', targetMsgId, sourceGroupJid: remoteJid, emoji: reaction.text ?? '', accountId, }; } ``` - [ ] **Step 2: Write failing tests for normalizeReaction** Append to `apps/worker/src/whatsapp/normalizer.test.ts` (keep all existing tests, add below): ```typescript import { normalizeReaction } from './normalizer'; describe('normalizeReaction', () => { it('normalizes a star reaction from a group participant', () => { const msg = { key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'REACTION_ID', participant: '919876543210@s.whatsapp.net', }, message: { reactionMessage: { key: { remoteJid: '120363043312345678@g.us', id: 'TARGET_MSG_ID' }, text: '⭐', }, }, } as proto.IWebMessageInfo; const result = normalizeReaction(msg, 'acc_1'); expect(result).toMatchObject({ reactorJid: '919876543210@s.whatsapp.net', targetMsgId: 'TARGET_MSG_ID', sourceGroupJid: '120363043312345678@g.us', emoji: '⭐', accountId: 'acc_1', }); }); it('returns null when reactionMessage is missing (regular message)', () => { const msg = { key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'ID' }, message: { conversation: 'hello' }, } as proto.IWebMessageInfo; expect(normalizeReaction(msg, 'acc_1')).toBeNull(); }); it('returns null for own reactions (fromMe=true)', () => { const msg = { key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'ID' }, message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } }, } as proto.IWebMessageInfo; expect(normalizeReaction(msg, 'acc_1')).toBeNull(); }); it('returns null for DM reactions (non-group jid)', () => { const msg = { key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'ID' }, message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } }, } as proto.IWebMessageInfo; expect(normalizeReaction(msg, 'acc_1')).toBeNull(); }); it('returns null when targetMsgId is missing', () => { const msg = { key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'ID', participant: '91@s.whatsapp.net', }, message: { reactionMessage: { key: {}, text: '⭐' } }, } as proto.IWebMessageInfo; expect(normalizeReaction(msg, 'acc_1')).toBeNull(); }); }); ``` Also update the `normalizeMessage` tests: the function signature changed to accept `accountId` as second param. Update every `normalizeMessage(makeMsg(...))` call to `normalizeMessage(makeMsg(...), 'acc_1')` and add `accountId: 'acc_1'` to the expected `toMatchObject` where relevant. - [ ] **Step 3: Run normalizer tests to confirm they pass** ```bash pnpm --filter @tower/worker test -- --testPathPattern=normalizer ``` Expected: all existing tests + 5 new reaction tests pass. - [ ] **Step 4: Replace `apps/worker/src/whatsapp/session.ts`** ```typescript import makeWASocket, { useMultiFileAuthState, fetchLatestBaileysVersion, DisconnectReason, WASocket, GroupMetadata, } from '@whiskeysockets/baileys'; import { Boom } from '@hapi/boom'; import qrcode from 'qrcode-terminal'; import { NormalizedMessage, NormalizedReaction } from '@tower/types'; import { normalizeMessage, normalizeReaction } from './normalizer'; import { createLogger } from '@tower/logger'; const logger = createLogger('whatsapp-session'); export type OnMessageCallback = (msg: NormalizedMessage) => Promise | void; export type OnReactionCallback = (reaction: NormalizedReaction) => Promise | void; export type OnGroupsCallback = (groups: Record) => Promise | void; export async function createWhatsAppSession( accountId: string, sessionPath: string, onMessage: OnMessageCallback, onReaction: OnReactionCallback, onGroups: OnGroupsCallback, ): Promise { const { state, saveCreds } = await useMultiFileAuthState(sessionPath); const { version } = await fetchLatestBaileysVersion(); const sock = makeWASocket({ version, auth: state, printQRInTerminal: false, logger: logger as any, }); sock.ev.on('creds.update', saveCreds); sock.ev.on('connection.update', async ({ connection, qr, lastDisconnect }) => { if (qr) { qrcode.generate(qr, { small: true }); } if (connection === 'close') { const reason = (lastDisconnect?.error as Boom)?.output?.statusCode; const shouldReconnect = reason !== DisconnectReason.loggedOut; logger.info({ reason, shouldReconnect }, 'Connection closed'); if (shouldReconnect) { logger.info('Reconnecting in 5s...'); setTimeout( () => createWhatsAppSession(accountId, sessionPath, onMessage, onReaction, onGroups), 5000, ); } } else if (connection === 'open') { try { logger.info({ accountId }, 'WhatsApp connected'); const groups = await sock.groupFetchAllParticipating(); await Promise.resolve(onGroups(groups)).catch((err) => logger.error({ err }, 'Group sync error'), ); } catch (err) { logger.error({ err }, 'Failed to fetch groups on connect'); } } }); sock.ev.on('messages.upsert', ({ messages, type }) => { if (type !== 'notify') return; for (const msg of messages) { if (msg.message?.reactionMessage) { const reaction = normalizeReaction(msg, accountId); if (reaction) { void Promise.resolve(onReaction(reaction)).catch((err) => logger.error({ err }, 'Error processing reaction'), ); } continue; } const normalized = normalizeMessage(msg, accountId); if (!normalized) continue; void Promise.resolve(onMessage(normalized)).catch((err) => logger.error({ err }, 'Error processing message'), ); } }); return sock; } ``` - [ ] **Step 5: Commit** ```bash git add apps/worker/src/whatsapp/normalizer.ts \ apps/worker/src/whatsapp/normalizer.test.ts \ apps/worker/src/whatsapp/session.ts git commit -m "feat(worker): seal WhatsApp adapter — normalize inside session, reactions handled internally" ``` --- ### Task 3: Prisma — Account model and Group.accountId **Files:** - Modify: `apps/api/prisma/schema.prisma` - [ ] **Step 1: Add `AccountStatus` enum and `Account` model to the schema** Open `apps/api/prisma/schema.prisma`. Append after the `ConsentRecord` model block: ```prisma enum AccountStatus { ACTIVE DISCONNECTED BANNED } model Account { id String @id @default(cuid()) platform String jid String sessionPath String displayName String? status AccountStatus @default(ACTIVE) groups Group[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@unique([platform, jid]) } ``` - [ ] **Step 2: Add optional `accountId` to the `Group` model** The full updated `Group` model (replace the existing one): ```prisma model Group { id String @id @default(cuid()) platform String platformId String name String description String? isActive Boolean @default(true) accountId String? account Account? @relation(fields: [accountId], references: [id]) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt messages Message[] syncRoutesFrom SyncRoute[] @relation("sourceGroup") syncRoutesTo SyncRoute[] @relation("targetGroup") consentRecords ConsentRecord[] @@unique([platform, platformId]) } ``` - [ ] **Step 3: Run migration** ```bash cd apps/api && DATABASE_URL="postgresql://tower:tower@localhost:5433/tower" \ npx prisma migrate dev --name add-account-model ``` Expected: "Your database is now in sync with your schema." - [ ] **Step 4: Regenerate Prisma client in worker** ```bash pnpm --filter @tower/worker generate ``` Expected: "Generated Prisma Client..." - [ ] **Step 5: Seed the first Account record** Find your WhatsApp session's JID. It is stored in `sessions/creds.json` (the file at the path you set as `WHATSAPP_SESSION_PATH` in `.env`). Open it and look for `"me": { "id": "..." }` — that string is your JID. Insert the account record (replace `YOUR_JID` and `YOUR_SESSION_PATH`): ```bash psql "postgresql://tower:tower@localhost:5433/tower" <<'SQL' INSERT INTO "Account" (id, platform, jid, "sessionPath", "displayName", status, "createdAt", "updatedAt") VALUES ( 'acc_' || substring(gen_random_uuid()::text, 1, 8), 'whatsapp', 'YOUR_JID', 'YOUR_SESSION_PATH', 'Bot 1', 'ACTIVE', now(), now() ) ON CONFLICT DO NOTHING; SQL ``` Verify it was inserted: ```bash psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT id, jid, "sessionPath", status FROM "Account";' ``` Expected: one row with your JID and `ACTIVE` status. - [ ] **Step 6: Commit** ```bash git add apps/api/prisma/schema.prisma apps/api/prisma/migrations/ git commit -m "feat(schema): Account model with AccountStatus enum, optional Group.accountId" ``` --- ### Task 4: WhatsAppSessionPool and updated group-sync **Files:** - Modify: `apps/worker/src/whatsapp/group-sync.ts` - Create: `apps/worker/src/whatsapp/session-pool.ts` - [ ] **Step 1: Update `group-sync.ts` to accept `accountId`** Replace `apps/worker/src/whatsapp/group-sync.ts`: ```typescript import { GroupMetadata } from '@whiskeysockets/baileys'; import { createLogger } from '@tower/logger'; const logger = createLogger('group-sync'); export async function syncGroups( groups: Record, accountId: string, prisma: any, ): Promise> { const jidToDbId = new Map(); for (const [jid, meta] of Object.entries(groups)) { const group = await prisma.group.upsert({ where: { platform_platformId: { platform: 'whatsapp', platformId: jid } }, create: { platform: 'whatsapp', platformId: jid, name: meta.subject, description: meta.desc ?? undefined, isActive: true, accountId, }, update: { name: meta.subject, description: meta.desc ?? undefined, accountId, }, }); jidToDbId.set(jid, group.id); } logger.info({ count: jidToDbId.size, accountId }, 'Groups synced'); return jidToDbId; } ``` - [ ] **Step 2: Create `apps/worker/src/whatsapp/session-pool.ts`** ```typescript import { WASocket } from '@whiskeysockets/baileys'; import { NormalizedMessage, NormalizedReaction } from '@tower/types'; import { createWhatsAppSession } from './session'; import { createLogger } from '@tower/logger'; const logger = createLogger('session-pool'); // Callbacks the pool exposes to main.ts — accountId injected by the pool export type PoolMessageCallback = (msg: NormalizedMessage, accountId: string) => Promise | void; export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: string) => Promise | void; // groups typed as `any` — GroupMetadata (Baileys type) stays inside whatsapp/ folder export type PoolGroupsCallback = (groups: any, accountId: string) => Promise | void; export class WhatsAppSessionPool { private sessions = new Map(); async add( accountId: string, sessionPath: string, onMessage: PoolMessageCallback, onReaction: PoolReactionCallback, onGroups: PoolGroupsCallback, ): Promise { logger.info({ accountId }, 'Starting session'); const sock = await createWhatsAppSession( accountId, sessionPath, (msg) => onMessage(msg, accountId), (reaction) => onReaction(reaction, accountId), (groups) => onGroups(groups, accountId), ); this.sessions.set(accountId, sock); } get(accountId: string): WASocket | undefined { return this.sessions.get(accountId); } getAll(): Map { return this.sessions; } async sendMessage(accountId: string, groupJid: string, text: string): Promise { const sock = this.sessions.get(accountId); if (!sock) throw new Error(`No active session for account ${accountId}`); await sock.sendMessage(groupJid, { text }); } async remove(accountId: string): Promise { const sock = this.sessions.get(accountId); if (sock) { await sock.logout().catch(() => {}); this.sessions.delete(accountId); logger.info({ accountId }, 'Session removed'); } } } ``` - [ ] **Step 3: Confirm worker compiles (ignoring main.ts errors for now)** ```bash pnpm --filter @tower/worker build 2>&1 | grep -v "main.ts" ``` Expected: no errors outside `main.ts` (it still uses old signatures — fixed in Task 7). - [ ] **Step 4: Commit** ```bash git add apps/worker/src/whatsapp/session-pool.ts apps/worker/src/whatsapp/group-sync.ts git commit -m "feat(worker): WhatsAppSessionPool + group-sync accepts accountId" ``` --- ### Task 5: Approval core logic **Files:** - Create: `apps/worker/src/core/approval.test.ts` - Create: `apps/worker/src/core/approval.ts` **Context:** When an admin reacts with ⭐ to a group message, `handleStarReaction` finds the message in DB, marks it `APPROVED`, creates an `Approval` record, then returns `ForwardJobData[]` — one entry per active `SyncRoute` from the message's source group. The caller (main.ts) enqueues those as forward jobs. The Prisma relation graph: `Message.sourceGroup → Group.syncRoutesFrom → SyncRoute.targetGroup → Group`. The query must include `sourceGroup.syncRoutesFrom.targetGroup` — NOT a top-level `syncRoutesFrom` on Message (that relation doesn't exist on Message). - [ ] **Step 1: Write the failing tests** Create `apps/worker/src/core/approval.test.ts`: ```typescript import { handleStarReaction } from './approval'; import { NormalizedReaction } from '@tower/types'; function makeReaction(overrides: Partial = {}): NormalizedReaction { return { reactorJid: '919876543210@s.whatsapp.net', targetMsgId: 'TARGET_MSG_123', sourceGroupJid: '120363043312345678@g.us', emoji: '⭐', accountId: 'acc_1', ...overrides, }; } const adminJids = ['919876543210@s.whatsapp.net']; describe('handleStarReaction', () => { it('returns null for non-star emoji', async () => { const result = await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any); expect(result).toBeNull(); }); it('returns null when reactor is not an admin', async () => { const result = await handleStarReaction( makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any, ); expect(result).toBeNull(); }); it('returns null when message not found', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any; const result = await handleStarReaction(makeReaction(), adminJids, prisma); expect(result).toBeNull(); expect(prisma.message.findUnique).toHaveBeenCalledWith({ where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' }, }, include: { approval: true, sourceGroup: { include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } }, }, }, }, }); }); it('returns null when message status is not PENDING', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue({ id: 'msg_1', status: 'REJECTED', approval: null, sourceGroup: { name: 'Test Group', syncRoutesFrom: [] }, }), }, } as any; expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); }); it('returns null when message is already approved (approval record exists)', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue({ id: 'msg_1', status: 'APPROVED', approval: { id: 'appr_1' }, sourceGroup: { name: 'Test Group', syncRoutesFrom: [] }, }), }, } as any; expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); }); it('approves message and returns empty array when no sync routes', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue({ id: 'msg_1', status: 'PENDING', approval: null, content: 'hello', senderName: 'Alice', sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] }, }), update: jest.fn().mockResolvedValue({}), }, approval: { create: jest.fn().mockResolvedValue({}) }, } as any; const result = await handleStarReaction(makeReaction(), adminJids, prisma); expect(result).toEqual([]); expect(prisma.message.update).toHaveBeenCalledWith({ where: { id: 'msg_1' }, data: { status: 'APPROVED' }, }); expect(prisma.approval.create).toHaveBeenCalledWith({ data: { messageId: 'msg_1', adminId: '919876543210@s.whatsapp.net', decision: 'APPROVED', }, }); }); it('returns ForwardJobData for each active sync route', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue({ id: 'msg_1', status: 'PENDING', approval: null, content: 'important announcement', senderName: 'Bob', sourceGroup: { name: 'Source Group', syncRoutesFrom: [ { targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } }, { targetGroup: { platformId: '888@g.us', accountId: null } }, ], }, }), update: jest.fn().mockResolvedValue({}), }, approval: { create: jest.fn().mockResolvedValue({}) }, } as any; const result = await handleStarReaction(makeReaction(), adminJids, prisma); expect(result).toHaveLength(2); expect(result![0]).toMatchObject({ messageId: 'msg_1', content: 'important announcement', sourceGroupName: 'Source Group', senderName: 'Bob', toGroupJid: '999@g.us', fromAccountId: 'acc_2', }); // falls back to reaction.accountId when targetGroup.accountId is null expect(result![1]).toMatchObject({ toGroupJid: '888@g.us', fromAccountId: 'acc_1', }); }); }); ``` - [ ] **Step 2: Run tests to confirm they fail** ```bash pnpm --filter @tower/worker test -- --testPathPattern=approval ``` Expected: FAIL — "Cannot find module './approval'" - [ ] **Step 3: Create `apps/worker/src/core/approval.ts`** ```typescript import { NormalizedReaction, ForwardJobData } from '@tower/types'; export async function handleStarReaction( reaction: NormalizedReaction, adminJids: string[], prisma: any, ): Promise { if (reaction.emoji !== '⭐') return null; if (!adminJids.includes(reaction.reactorJid)) return null; const message = await prisma.message.findUnique({ where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: reaction.targetMsgId, }, }, include: { approval: true, sourceGroup: { include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } }, }, }, }, }); if (!message) return null; if (message.status !== 'PENDING') return null; if (message.approval) return null; await prisma.message.update({ where: { id: message.id }, data: { status: 'APPROVED' }, }); await prisma.approval.create({ data: { messageId: message.id, adminId: reaction.reactorJid, decision: 'APPROVED', }, }); const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom.map((route: any) => ({ messageId: message.id, content: message.content, sourceGroupName: message.sourceGroup.name, senderName: message.senderName ?? undefined, toGroupJid: route.targetGroup.platformId, fromAccountId: route.targetGroup.accountId ?? reaction.accountId, })); return jobs; } ``` - [ ] **Step 4: Run tests to confirm they pass** ```bash pnpm --filter @tower/worker test -- --testPathPattern=approval ``` Expected: 7 tests pass. - [ ] **Step 5: Commit** ```bash git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts git commit -m "feat(worker): handleStarReaction approval core with tests" ``` --- ### Task 6: Forward queue and processor **Files:** - Create: `apps/worker/src/queues/forward.processor.test.ts` - Create: `apps/worker/src/queues/forward.processor.ts` - Create: `apps/worker/src/queues/forward.queue.ts` The rate limiter (20 forwards/min) goes on the **Worker**, not the Queue — this is the BullMQ v5 pattern. - [ ] **Step 1: Write the failing processor test** Create `apps/worker/src/queues/forward.processor.test.ts`: ```typescript import { processForwardJob } from './forward.processor'; import { ForwardJobData } from '@tower/types'; const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) }; const baseJob: ForwardJobData = { messageId: 'msg_1', content: 'Event this Saturday at the temple', sourceGroupName: 'UP Parivar Dallas', senderName: 'Rajesh', toGroupJid: '120363099999@g.us', fromAccountId: 'acc_1', }; describe('processForwardJob', () => { beforeEach(() => jest.clearAllMocks()); it('sends a formatted message via the pool', async () => { await processForwardJob(baseJob, mockPool as any); expect(mockPool.sendMessage).toHaveBeenCalledWith( 'acc_1', '120363099999@g.us', expect.stringContaining('Event this Saturday at the temple'), ); }); it('includes source group name in the forwarded text', async () => { await processForwardJob(baseJob, mockPool as any); const [, , text] = mockPool.sendMessage.mock.calls[0]; expect(text).toContain('UP Parivar Dallas'); }); it('includes sender name in the forwarded text', async () => { await processForwardJob(baseJob, mockPool as any); const [, , text] = mockPool.sendMessage.mock.calls[0]; expect(text).toContain('Rajesh'); }); it('handles missing senderName without throwing', async () => { await processForwardJob({ ...baseJob, senderName: undefined }, mockPool as any); expect(mockPool.sendMessage).toHaveBeenCalledTimes(1); }); it('throws when pool has no session for the account', async () => { const brokenPool = { sendMessage: jest.fn().mockRejectedValue(new Error('No active session for account acc_99')), }; await expect( processForwardJob({ ...baseJob, fromAccountId: 'acc_99' }, brokenPool as any), ).rejects.toThrow('No active session'); }); }); ``` - [ ] **Step 2: Run test to confirm it fails** ```bash pnpm --filter @tower/worker test -- --testPathPattern=forward.processor ``` Expected: FAIL — "Cannot find module './forward.processor'" - [ ] **Step 3: Create `apps/worker/src/queues/forward.processor.ts`** ```typescript import { Worker } from 'bullmq'; import { ForwardJobData } from '@tower/types'; import { parseRedisUrl } from './redis-connection'; import { WhatsAppSessionPool } from '../whatsapp/session-pool'; function formatForwardText(job: ForwardJobData): string { const sender = job.senderName ? `_${job.senderName}_` : '_Unknown_'; return `📢 *${job.sourceGroupName}*\n${sender}:\n\n${job.content}\n\n_— Forwarded by TOWER_`; } export async function processForwardJob( job: ForwardJobData, pool: WhatsAppSessionPool, ): Promise { await pool.sendMessage(job.fromAccountId, job.toGroupJid, formatForwardText(job)); } export function createForwardWorker( redisUrl: string, pool: WhatsAppSessionPool, ): Worker { return new Worker( 'tower-forward', async (job) => processForwardJob(job.data, pool), { connection: parseRedisUrl(redisUrl), limiter: { max: 20, duration: 60_000 }, // 20 forwards per minute — avoids WhatsApp bans }, ); } ``` - [ ] **Step 4: Create `apps/worker/src/queues/forward.queue.ts`** ```typescript import { Queue } from 'bullmq'; import { ForwardJobData } from '@tower/types'; import { parseRedisUrl } from './redis-connection'; export function createForwardQueue(redisUrl: string): Queue { return new Queue('tower-forward', { connection: parseRedisUrl(redisUrl), }); } ``` - [ ] **Step 5: Run processor tests to confirm they pass** ```bash pnpm --filter @tower/worker test -- --testPathPattern=forward.processor ``` Expected: 5 tests pass. - [ ] **Step 6: Commit** ```bash git add apps/worker/src/queues/forward.queue.ts \ apps/worker/src/queues/forward.processor.ts \ apps/worker/src/queues/forward.processor.test.ts git commit -m "feat(worker): forward queue + processor with 20/min rate limiter" ``` --- ### Task 7: Wire main.ts — multi-account pool, reactions, full pipeline **Files:** - Modify: `apps/worker/src/main.ts` This is the final assembly. `main.ts` no longer imports anything from `@whiskeysockets/baileys`. It loads accounts from the DB, creates the pool, and wires: ingest message → tag detect → ingest queue; reaction → approval → forward queue. - [ ] **Step 1: Replace `apps/worker/src/main.ts` entirely** ```typescript import { PrismaClient } from '@prisma/client'; import { createLogger } from '@tower/logger'; import { validateEnv } from '@tower/config'; import { createIngestQueue } from './queues/ingest.queue'; import { createIngestWorker } from './queues/ingest.processor'; import { createForwardQueue } from './queues/forward.queue'; import { createForwardWorker } from './queues/forward.processor'; import { WhatsAppSessionPool } from './whatsapp/session-pool'; import { detectTags, isFlagged } from './whatsapp/tag-detector'; import { syncGroups } from './whatsapp/group-sync'; import { handleStarReaction } from './core/approval'; const logger = createLogger('tower-worker'); async function bootstrap() { const env = validateEnv(); const prisma = new PrismaClient(); await prisma.$connect(); const adminJids = env.TOWER_ADMIN_JIDS ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) : []; const ingestQueue = createIngestQueue(env.REDIS_URL); const forwardQueue = createForwardQueue(env.REDIS_URL); const pool = new WhatsAppSessionPool(); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); const forwardWorker = createForwardWorker(env.REDIS_URL, pool); ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); // Load active accounts from DB — each becomes one WhatsApp session const accounts = await prisma.account.findMany({ where: { status: 'ACTIVE', platform: 'whatsapp' }, }); if (accounts.length === 0) { logger.warn('No active WhatsApp accounts found — add one via Prisma Studio (see Task 3 Step 5)'); } // Per-account map of groupJid → DB Group id const groupMaps = new Map>(); for (const account of accounts) { groupMaps.set(account.id, new Map()); await pool.add( account.id, account.sessionPath, async (msg, accountId) => { const tags = detectTags(msg.content, msg.senderJid, adminJids); if (!isFlagged(tags)) return; const groupMap = groupMaps.get(accountId) ?? new Map(); const sourceGroupId = groupMap.get(msg.sourceGroupJid); if (!sourceGroupId) { logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); return; } await ingestQueue.add( 'ingest', { platformMsgId: msg.platformMsgId, platform: 'whatsapp', accountId, sourceGroupId, senderJid: msg.senderJid, senderName: msg.senderName, content: msg.content, tags, }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, ); logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); }, async (reaction) => { const forwardJobs = await handleStarReaction(reaction, adminJids, prisma); if (!forwardJobs || forwardJobs.length === 0) return; for (const job of forwardJobs) { await forwardQueue.add('forward', job, { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, }); } logger.info( { count: forwardJobs.length, messageId: forwardJobs[0]?.messageId }, 'Forward jobs enqueued', ); }, async (groups, accountId) => { logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); const map = await syncGroups(groups, accountId, prisma); groupMaps.set(accountId, map); }, ); } logger.info({ accountCount: accounts.length }, 'Tower worker ready'); const shutdown = async () => { logger.info('Shutting down...'); await ingestWorker.close(); await forwardWorker.close(); await ingestQueue.close(); await forwardQueue.close(); await prisma.$disconnect(); process.exit(0); }; process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); } bootstrap().catch((err) => { console.error('Worker failed to start', err); process.exit(1); }); ``` - [ ] **Step 2: Build to confirm zero TypeScript errors** ```bash pnpm --filter @tower/worker build ``` Expected: 0 errors. `dist/` created. - [ ] **Step 3: Run all worker tests** ```bash pnpm --filter @tower/worker test ``` Expected: all tests pass (normalizer ×13, approval ×7, forward.processor ×5). - [ ] **Step 4: Start the worker and verify multi-account startup** ```bash pnpm --filter @tower/worker dev ``` Expected log output (with one account seeded): ``` INFO (tower-worker): Tower worker ready {"accountCount": 1} INFO (whatsapp-session): WhatsApp connected {"accountId": "acc_..."} INFO (group-sync): Groups synced {"count": N, "accountId": "acc_..."} ``` If `accountCount` is 0, the account row wasn't inserted in Task 3 Step 5 — insert it now. - [ ] **Step 5: End-to-end smoke test** With the worker running and the WhatsApp session authenticated: 1. Send a message containing `#important` from a non-bot number to a group the bot is in. 2. Confirm the worker logs `"Message enqueued"` with the correct tags. 3. Check the DB: `psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT status, tags FROM "Message" ORDER BY "createdAt" DESC LIMIT 3;'` Expected: one row with `status = PENDING` and `tags = {#important}`. 4. React to that message with ⭐ from an admin JID (the JID listed in `TOWER_ADMIN_JIDS` in `.env`). 5. Confirm the worker logs `"Forward jobs enqueued"`. 6. Check DB approvals: `SELECT decision, "adminId" FROM "Approval" ORDER BY "decidedAt" DESC LIMIT 1;` Expected: one row with `decision = APPROVED`. - [ ] **Step 6: Commit** ```bash git add apps/worker/src/main.ts git commit -m "feat(worker): multi-account session pool, reactions → approval → forward pipeline" ``` --- ## What this plan does NOT cover (Plan 4+) - Admin dashboard UI for approving messages (Next.js — Plan 5) - Meilisearch archive indexing (Plan 4) - Adding a second bot account at runtime without restart (future: webhook endpoint to trigger `pool.add`) - `/tower` bot command handling - Rejection workflow (admin removes ⭐ or uses a different emoji/command)