Files
tower/docs/superpowers/plans/2026-05-27-multi-account-approval-workflow.md
T
maaz519 d33b4e40b8 fix: use type-only Baileys import and raw status code to fix Jest ESM issue
Replaces DisconnectReason enum import with type-only WASocket import and
uses 401 directly instead of DisconnectReason.loggedOut. Baileys is an ES
module that cannot be executed in Jest's CommonJS mode, so removing the
value import (keeping only type imports) prevents ts-jest from trying to
execute the module.

Also updated session-pool.test.ts to verify end() is called with the
expected Boom error object instead of undefined.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 17:40:24 +05:30

1244 lines
38 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Multi-Account Architecture, Adapter Boundary & Approval Workflow Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Seal the WhatsApp adapter so Baileys types never leak outside `src/whatsapp/`; add multi-account bot support via a session pool loaded from the DB; implement the ⭐ star reaction approval workflow with a rate-limited cross-group forward queue.
**Architecture:** `NormalizedMessage` and `NormalizedReaction` move to `@tower/types` (platform-neutral). `session.ts` normalizes raw Baileys events internally before calling any callback — `main.ts` never imports from `@whiskeysockets/baileys`. `WhatsAppSessionPool` manages N sessions indexed by `accountId`, loaded from the `accounts` DB table on startup. A ⭐ reaction from an admin JID triggers `handleStarReaction` → writes `Approval` record → enqueues `ForwardJobData` jobs into a rate-limited BullMQ queue (20 forwards/min) to avoid WhatsApp bans.
**Tech Stack:** BullMQ 5, Prisma 6, Baileys 7.0.0-rc13, TypeScript 5, Jest 29, pnpm workspaces
---
## File Map
**Modify:**
- `packages/types/src/message.ts` — add `NormalizedMessage`, `NormalizedReaction`, `ForwardJobData`; add `accountId` to `IngestJobData`
- `apps/api/prisma/schema.prisma` — add `Account` model, `AccountStatus` enum; add optional `accountId` to `Group`
- `apps/worker/src/whatsapp/normalizer.ts` — import `NormalizedMessage`/`NormalizedReaction` from `@tower/types`; accept `accountId` param; add `normalizeReaction`
- `apps/worker/src/whatsapp/normalizer.test.ts` — add reaction tests
- `apps/worker/src/whatsapp/group-sync.ts` — accept `accountId` param, set on group upsert
- `apps/worker/src/whatsapp/session.ts` — normalize + react inside handler; change callback types; accept `accountId` as first param
- `apps/worker/src/main.ts` — use pool, load accounts from DB, wire reactions through approval
**Create:**
- `apps/worker/src/whatsapp/session-pool.ts``WhatsAppSessionPool` class
- `apps/worker/src/core/approval.ts``handleStarReaction`
- `apps/worker/src/core/approval.test.ts`
- `apps/worker/src/queues/forward.queue.ts`
- `apps/worker/src/queues/forward.processor.ts`
- `apps/worker/src/queues/forward.processor.test.ts`
---
### Task 1: Extend @tower/types
**Files:**
- Modify: `packages/types/src/message.ts`
Add `NormalizedMessage`, `NormalizedReaction`, `ForwardJobData` as platform-neutral shared types. Add `accountId` to `IngestJobData`. These types must contain zero Baileys imports.
- [ ] **Step 1: Replace `packages/types/src/message.ts` entirely**
```typescript
export type Platform = 'whatsapp' | 'telegram' | 'discord';
export type MessageStatus =
| 'PENDING'
| 'APPROVED'
| 'REJECTED'
| 'DISTRIBUTED'
| 'ARCHIVED';
export type ApprovalDecision = 'APPROVED' | 'REJECTED';
// Platform-neutral normalized message — zero Baileys/Telegram types here
export interface NormalizedMessage {
platformMsgId: string;
sourceGroupJid: string;
senderJid: string;
senderName?: string;
content: string;
accountId: string; // which bot account received this message
}
// Platform-neutral normalized reaction (e.g. WhatsApp emoji reaction)
export interface NormalizedReaction {
reactorJid: string;
targetMsgId: string; // platformMsgId of the message being reacted to
sourceGroupJid: string;
emoji: string;
accountId: string; // which bot account received this reaction
}
export interface CanonicalMessage {
messageId: string;
platform: Platform;
platformMsgId: string;
sourceGroupId: string;
senderJid: string;
senderName?: string;
content: string;
mediaUrl?: string;
tags: string[];
status: MessageStatus;
createdAt: Date;
}
export interface Group {
id: string;
platform: Platform;
platformId: string;
name: string;
description?: string;
isActive: boolean;
}
export interface SyncRoute {
id: string;
sourceGroupId: string;
targetGroupId: string;
isActive: boolean;
}
export interface IngestJobData {
platformMsgId: string;
platform: Platform;
accountId: string; // which bot account received this message
sourceGroupId: string;
senderJid: string;
senderName?: string;
content: string;
tags: string[];
}
export interface ForwardJobData {
messageId: string; // DB id of the approved Message record
content: string;
sourceGroupName: string;
senderName?: string;
toGroupJid: string;
fromAccountId: string; // which bot account to send the forward from
}
```
- [ ] **Step 2: Confirm types package compiles with zero errors**
```bash
cd /path/to/repo && pnpm --filter @tower/types build
```
Expected: exits 0, no TypeScript errors.
- [ ] **Step 3: Commit**
```bash
git add packages/types/src/message.ts
git commit -m "feat(types): NormalizedMessage, NormalizedReaction, ForwardJobData; accountId on IngestJobData"
```
---
### Task 2: Seal the WhatsApp adapter boundary
**Files:**
- Modify: `apps/worker/src/whatsapp/normalizer.ts`
- Modify: `apps/worker/src/whatsapp/normalizer.test.ts`
- Modify: `apps/worker/src/whatsapp/session.ts`
After this task: `main.ts` will never import anything from `@whiskeysockets/baileys`. All Baileys types stay inside `src/whatsapp/`.
- [ ] **Step 1: Replace `apps/worker/src/whatsapp/normalizer.ts`**
```typescript
import { proto } from '@whiskeysockets/baileys';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
function extractText(msg: proto.IWebMessageInfo): string {
const m = msg.message;
if (!m) return '';
return (
m.conversation ||
m.extendedTextMessage?.text ||
m.imageMessage?.caption ||
m.videoMessage?.caption ||
m.documentMessage?.caption ||
''
);
}
export function normalizeMessage(
msg: proto.IWebMessageInfo,
accountId: string,
): NormalizedMessage | null {
const key = msg.key;
if (!key) return null;
const remoteJid = key.remoteJid ?? '';
if (!remoteJid.endsWith('@g.us')) return null;
if (key.fromMe) return null;
if (!msg.message) return null;
const platformMsgId = key.id;
if (!platformMsgId) return null;
const content = extractText(msg);
return {
platformMsgId,
sourceGroupJid: remoteJid,
senderJid: key.participant ?? '',
senderName: msg.pushName ?? undefined,
content,
accountId,
};
}
export function normalizeReaction(
msg: proto.IWebMessageInfo,
accountId: string,
): NormalizedReaction | null {
const key = msg.key;
if (!key) return null;
const remoteJid = key.remoteJid ?? '';
if (!remoteJid.endsWith('@g.us')) return null;
if (key.fromMe) return null;
const reaction = msg.message?.reactionMessage;
if (!reaction) return null;
const targetMsgId = reaction.key?.id;
if (!targetMsgId) return null;
return {
reactorJid: key.participant ?? '',
targetMsgId,
sourceGroupJid: remoteJid,
emoji: reaction.text ?? '',
accountId,
};
}
```
- [ ] **Step 2: Write failing tests for normalizeReaction**
Append to `apps/worker/src/whatsapp/normalizer.test.ts` (keep all existing tests, add below):
```typescript
import { normalizeReaction } from './normalizer';
describe('normalizeReaction', () => {
it('normalizes a star reaction from a group participant', () => {
const msg = {
key: {
remoteJid: '120363043312345678@g.us',
fromMe: false,
id: 'REACTION_ID',
participant: '919876543210@s.whatsapp.net',
},
message: {
reactionMessage: {
key: { remoteJid: '120363043312345678@g.us', id: 'TARGET_MSG_ID' },
text: '⭐',
},
},
} as proto.IWebMessageInfo;
const result = normalizeReaction(msg, 'acc_1');
expect(result).toMatchObject({
reactorJid: '919876543210@s.whatsapp.net',
targetMsgId: 'TARGET_MSG_ID',
sourceGroupJid: '120363043312345678@g.us',
emoji: '⭐',
accountId: 'acc_1',
});
});
it('returns null when reactionMessage is missing (regular message)', () => {
const msg = {
key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'ID' },
message: { conversation: 'hello' },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
it('returns null for own reactions (fromMe=true)', () => {
const msg = {
key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'ID' },
message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
it('returns null for DM reactions (non-group jid)', () => {
const msg = {
key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'ID' },
message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
it('returns null when targetMsgId is missing', () => {
const msg = {
key: {
remoteJid: '120363043312345678@g.us',
fromMe: false,
id: 'ID',
participant: '91@s.whatsapp.net',
},
message: { reactionMessage: { key: {}, text: '⭐' } },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
});
```
Also update the `normalizeMessage` tests: the function signature changed to accept `accountId` as second param. Update every `normalizeMessage(makeMsg(...))` call to `normalizeMessage(makeMsg(...), 'acc_1')` and add `accountId: 'acc_1'` to the expected `toMatchObject` where relevant.
- [ ] **Step 3: Run normalizer tests to confirm they pass**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=normalizer
```
Expected: all existing tests + 5 new reaction tests pass.
- [ ] **Step 4: Replace `apps/worker/src/whatsapp/session.ts`**
```typescript
import makeWASocket, {
useMultiFileAuthState,
fetchLatestBaileysVersion,
DisconnectReason,
WASocket,
GroupMetadata,
} from '@whiskeysockets/baileys';
import { Boom } from '@hapi/boom';
import qrcode from 'qrcode-terminal';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
import { normalizeMessage, normalizeReaction } from './normalizer';
import { createLogger } from '@tower/logger';
const logger = createLogger('whatsapp-session');
export type OnMessageCallback = (msg: NormalizedMessage) => Promise<void> | void;
export type OnReactionCallback = (reaction: NormalizedReaction) => Promise<void> | void;
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => Promise<void> | void;
export async function createWhatsAppSession(
accountId: string,
sessionPath: string,
onMessage: OnMessageCallback,
onReaction: OnReactionCallback,
onGroups: OnGroupsCallback,
): Promise<WASocket> {
const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
const { version } = await fetchLatestBaileysVersion();
const sock = makeWASocket({
version,
auth: state,
printQRInTerminal: false,
logger: logger as any,
});
sock.ev.on('creds.update', saveCreds);
sock.ev.on('connection.update', async ({ connection, qr, lastDisconnect }) => {
if (qr) {
qrcode.generate(qr, { small: true });
}
if (connection === 'close') {
const reason = (lastDisconnect?.error as Boom)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info({ reason, shouldReconnect }, 'Connection closed');
if (shouldReconnect) {
logger.info('Reconnecting in 5s...');
setTimeout(
() => createWhatsAppSession(accountId, sessionPath, onMessage, onReaction, onGroups),
5000,
);
}
} else if (connection === 'open') {
try {
logger.info({ accountId }, 'WhatsApp connected');
const groups = await sock.groupFetchAllParticipating();
await Promise.resolve(onGroups(groups)).catch((err) =>
logger.error({ err }, 'Group sync error'),
);
} catch (err) {
logger.error({ err }, 'Failed to fetch groups on connect');
}
}
});
sock.ev.on('messages.upsert', ({ messages, type }) => {
if (type !== 'notify') return;
for (const msg of messages) {
if (msg.message?.reactionMessage) {
const reaction = normalizeReaction(msg, accountId);
if (reaction) {
void Promise.resolve(onReaction(reaction)).catch((err) =>
logger.error({ err }, 'Error processing reaction'),
);
}
continue;
}
const normalized = normalizeMessage(msg, accountId);
if (!normalized) continue;
void Promise.resolve(onMessage(normalized)).catch((err) =>
logger.error({ err }, 'Error processing message'),
);
}
});
return sock;
}
```
- [ ] **Step 5: Commit**
```bash
git add apps/worker/src/whatsapp/normalizer.ts \
apps/worker/src/whatsapp/normalizer.test.ts \
apps/worker/src/whatsapp/session.ts
git commit -m "feat(worker): seal WhatsApp adapter — normalize inside session, reactions handled internally"
```
---
### Task 3: Prisma — Account model and Group.accountId
**Files:**
- Modify: `apps/api/prisma/schema.prisma`
- [ ] **Step 1: Add `AccountStatus` enum and `Account` model to the schema**
Open `apps/api/prisma/schema.prisma`. Append after the `ConsentRecord` model block:
```prisma
enum AccountStatus {
ACTIVE
DISCONNECTED
BANNED
}
model Account {
id String @id @default(cuid())
platform String
jid String
sessionPath String
displayName String?
status AccountStatus @default(ACTIVE)
groups Group[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([platform, jid])
}
```
- [ ] **Step 2: Add optional `accountId` to the `Group` model**
The full updated `Group` model (replace the existing one):
```prisma
model Group {
id String @id @default(cuid())
platform String
platformId String
name String
description String?
isActive Boolean @default(true)
accountId String?
account Account? @relation(fields: [accountId], references: [id])
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
messages Message[]
syncRoutesFrom SyncRoute[] @relation("sourceGroup")
syncRoutesTo SyncRoute[] @relation("targetGroup")
consentRecords ConsentRecord[]
@@unique([platform, platformId])
}
```
- [ ] **Step 3: Run migration**
```bash
cd apps/api && DATABASE_URL="postgresql://tower:tower@localhost:5433/tower" \
npx prisma migrate dev --name add-account-model
```
Expected: "Your database is now in sync with your schema."
- [ ] **Step 4: Regenerate Prisma client in worker**
```bash
pnpm --filter @tower/worker generate
```
Expected: "Generated Prisma Client..."
- [ ] **Step 5: Seed the first Account record**
Find your WhatsApp session's JID. It is stored in `sessions/creds.json` (the file at the path you set as `WHATSAPP_SESSION_PATH` in `.env`). Open it and look for `"me": { "id": "..." }` — that string is your JID.
Insert the account record (replace `YOUR_JID` and `YOUR_SESSION_PATH`):
```bash
psql "postgresql://tower:tower@localhost:5433/tower" <<'SQL'
INSERT INTO "Account" (id, platform, jid, "sessionPath", "displayName", status, "createdAt", "updatedAt")
VALUES (
'acc_' || substring(gen_random_uuid()::text, 1, 8),
'whatsapp',
'YOUR_JID',
'YOUR_SESSION_PATH',
'Bot 1',
'ACTIVE',
now(),
now()
)
ON CONFLICT DO NOTHING;
SQL
```
Verify it was inserted:
```bash
psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT id, jid, "sessionPath", status FROM "Account";'
```
Expected: one row with your JID and `ACTIVE` status.
- [ ] **Step 6: Commit**
```bash
git add apps/api/prisma/schema.prisma apps/api/prisma/migrations/
git commit -m "feat(schema): Account model with AccountStatus enum, optional Group.accountId"
```
---
### Task 4: WhatsAppSessionPool and updated group-sync
**Files:**
- Modify: `apps/worker/src/whatsapp/group-sync.ts`
- Create: `apps/worker/src/whatsapp/session-pool.ts`
- [ ] **Step 1: Update `group-sync.ts` to accept `accountId`**
Replace `apps/worker/src/whatsapp/group-sync.ts`:
```typescript
import { GroupMetadata } from '@whiskeysockets/baileys';
import { createLogger } from '@tower/logger';
const logger = createLogger('group-sync');
export async function syncGroups(
groups: Record<string, GroupMetadata>,
accountId: string,
prisma: any,
): Promise<Map<string, string>> {
const jidToDbId = new Map<string, string>();
for (const [jid, meta] of Object.entries(groups)) {
const group = await prisma.group.upsert({
where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
create: {
platform: 'whatsapp',
platformId: jid,
name: meta.subject,
description: meta.desc ?? undefined,
isActive: true,
accountId,
},
update: {
name: meta.subject,
description: meta.desc ?? undefined,
accountId,
},
});
jidToDbId.set(jid, group.id);
}
logger.info({ count: jidToDbId.size, accountId }, 'Groups synced');
return jidToDbId;
}
```
- [ ] **Step 2: Create `apps/worker/src/whatsapp/session-pool.ts`**
```typescript
import { WASocket } from '@whiskeysockets/baileys';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
import { createWhatsAppSession } from './session';
import { createLogger } from '@tower/logger';
const logger = createLogger('session-pool');
// Callbacks the pool exposes to main.ts — accountId injected by the pool
export type PoolMessageCallback = (msg: NormalizedMessage, accountId: string) => Promise<void> | void;
export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: string) => Promise<void> | void;
// groups typed as `any` — GroupMetadata (Baileys type) stays inside whatsapp/ folder
export type PoolGroupsCallback = (groups: any, accountId: string) => Promise<void> | void;
export class WhatsAppSessionPool {
private sessions = new Map<string, WASocket>();
async add(
accountId: string,
sessionPath: string,
onMessage: PoolMessageCallback,
onReaction: PoolReactionCallback,
onGroups: PoolGroupsCallback,
): Promise<void> {
logger.info({ accountId }, 'Starting session');
const sock = await createWhatsAppSession(
accountId,
sessionPath,
(msg) => onMessage(msg, accountId),
(reaction) => onReaction(reaction, accountId),
(groups) => onGroups(groups, accountId),
);
this.sessions.set(accountId, sock);
}
get(accountId: string): WASocket | undefined {
return this.sessions.get(accountId);
}
getAll(): Map<string, WASocket> {
return this.sessions;
}
async sendMessage(accountId: string, groupJid: string, text: string): Promise<void> {
const sock = this.sessions.get(accountId);
if (!sock) throw new Error(`No active session for account ${accountId}`);
await sock.sendMessage(groupJid, { text });
}
async remove(accountId: string): Promise<void> {
const sock = this.sessions.get(accountId);
if (sock) {
await sock.logout().catch(() => {});
this.sessions.delete(accountId);
logger.info({ accountId }, 'Session removed');
}
}
}
```
- [ ] **Step 3: Confirm worker compiles (ignoring main.ts errors for now)**
```bash
pnpm --filter @tower/worker build 2>&1 | grep -v "main.ts"
```
Expected: no errors outside `main.ts` (it still uses old signatures — fixed in Task 7).
- [ ] **Step 4: Commit**
```bash
git add apps/worker/src/whatsapp/session-pool.ts apps/worker/src/whatsapp/group-sync.ts
git commit -m "feat(worker): WhatsAppSessionPool + group-sync accepts accountId"
```
---
### Task 5: Approval core logic
**Files:**
- Create: `apps/worker/src/core/approval.test.ts`
- Create: `apps/worker/src/core/approval.ts`
**Context:** When an admin reacts with ⭐ to a group message, `handleStarReaction` finds the message in DB, marks it `APPROVED`, creates an `Approval` record, then returns `ForwardJobData[]` — one entry per active `SyncRoute` from the message's source group. The caller (main.ts) enqueues those as forward jobs.
The Prisma relation graph: `Message.sourceGroup → Group.syncRoutesFrom → SyncRoute.targetGroup → Group`. The query must include `sourceGroup.syncRoutesFrom.targetGroup` — NOT a top-level `syncRoutesFrom` on Message (that relation doesn't exist on Message).
- [ ] **Step 1: Write the failing tests**
Create `apps/worker/src/core/approval.test.ts`:
```typescript
import { handleStarReaction } from './approval';
import { NormalizedReaction } from '@tower/types';
function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
return {
reactorJid: '919876543210@s.whatsapp.net',
targetMsgId: 'TARGET_MSG_123',
sourceGroupJid: '120363043312345678@g.us',
emoji: '⭐',
accountId: 'acc_1',
...overrides,
};
}
const adminJids = ['919876543210@s.whatsapp.net'];
describe('handleStarReaction', () => {
it('returns null for non-star emoji', async () => {
const result = await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any);
expect(result).toBeNull();
});
it('returns null when reactor is not an admin', async () => {
const result = await handleStarReaction(
makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }),
adminJids,
{} as any,
);
expect(result).toBeNull();
});
it('returns null when message not found', async () => {
const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toBeNull();
expect(prisma.message.findUnique).toHaveBeenCalledWith({
where: {
platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' },
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
});
});
it('returns null when message status is not PENDING', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'REJECTED',
approval: null,
sourceGroup: { name: 'Test Group', syncRoutesFrom: [] },
}),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('returns null when message is already approved (approval record exists)', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'APPROVED',
approval: { id: 'appr_1' },
sourceGroup: { name: 'Test Group', syncRoutesFrom: [] },
}),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('approves message and returns empty array when no sync routes', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'hello',
senderName: 'Alice',
sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
}),
update: jest.fn().mockResolvedValue({}),
},
approval: { create: jest.fn().mockResolvedValue({}) },
} as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toEqual([]);
expect(prisma.message.update).toHaveBeenCalledWith({
where: { id: 'msg_1' },
data: { status: 'APPROVED' },
});
expect(prisma.approval.create).toHaveBeenCalledWith({
data: {
messageId: 'msg_1',
adminId: '919876543210@s.whatsapp.net',
decision: 'APPROVED',
},
});
});
it('returns ForwardJobData for each active sync route', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'important announcement',
senderName: 'Bob',
sourceGroup: {
name: 'Source Group',
syncRoutesFrom: [
{ targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
{ targetGroup: { platformId: '888@g.us', accountId: null } },
],
},
}),
update: jest.fn().mockResolvedValue({}),
},
approval: { create: jest.fn().mockResolvedValue({}) },
} as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toHaveLength(2);
expect(result![0]).toMatchObject({
messageId: 'msg_1',
content: 'important announcement',
sourceGroupName: 'Source Group',
senderName: 'Bob',
toGroupJid: '999@g.us',
fromAccountId: 'acc_2',
});
// falls back to reaction.accountId when targetGroup.accountId is null
expect(result![1]).toMatchObject({
toGroupJid: '888@g.us',
fromAccountId: 'acc_1',
});
});
});
```
- [ ] **Step 2: Run tests to confirm they fail**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=approval
```
Expected: FAIL — "Cannot find module './approval'"
- [ ] **Step 3: Create `apps/worker/src/core/approval.ts`**
```typescript
import { NormalizedReaction, ForwardJobData } from '@tower/types';
export async function handleStarReaction(
reaction: NormalizedReaction,
adminJids: string[],
prisma: any,
): Promise<ForwardJobData[] | null> {
if (reaction.emoji !== '⭐') return null;
if (!adminJids.includes(reaction.reactorJid)) return null;
const message = await prisma.message.findUnique({
where: {
platform_platformMsgId: {
platform: 'whatsapp',
platformMsgId: reaction.targetMsgId,
},
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
});
if (!message) return null;
if (message.status !== 'PENDING') return null;
if (message.approval) return null;
await prisma.message.update({
where: { id: message.id },
data: { status: 'APPROVED' },
});
await prisma.approval.create({
data: {
messageId: message.id,
adminId: reaction.reactorJid,
decision: 'APPROVED',
},
});
const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom.map((route: any) => ({
messageId: message.id,
content: message.content,
sourceGroupName: message.sourceGroup.name,
senderName: message.senderName ?? undefined,
toGroupJid: route.targetGroup.platformId,
fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
}));
return jobs;
}
```
- [ ] **Step 4: Run tests to confirm they pass**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=approval
```
Expected: 7 tests pass.
- [ ] **Step 5: Commit**
```bash
git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts
git commit -m "feat(worker): handleStarReaction approval core with tests"
```
---
### Task 6: Forward queue and processor
**Files:**
- Create: `apps/worker/src/queues/forward.processor.test.ts`
- Create: `apps/worker/src/queues/forward.processor.ts`
- Create: `apps/worker/src/queues/forward.queue.ts`
The rate limiter (20 forwards/min) goes on the **Worker**, not the Queue — this is the BullMQ v5 pattern.
- [ ] **Step 1: Write the failing processor test**
Create `apps/worker/src/queues/forward.processor.test.ts`:
```typescript
import { processForwardJob } from './forward.processor';
import { ForwardJobData } from '@tower/types';
const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) };
const baseJob: ForwardJobData = {
messageId: 'msg_1',
content: 'Event this Saturday at the temple',
sourceGroupName: 'UP Parivar Dallas',
senderName: 'Rajesh',
toGroupJid: '120363099999@g.us',
fromAccountId: 'acc_1',
};
describe('processForwardJob', () => {
beforeEach(() => jest.clearAllMocks());
it('sends a formatted message via the pool', async () => {
await processForwardJob(baseJob, mockPool as any);
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'acc_1',
'120363099999@g.us',
expect.stringContaining('Event this Saturday at the temple'),
);
});
it('includes source group name in the forwarded text', async () => {
await processForwardJob(baseJob, mockPool as any);
const [, , text] = mockPool.sendMessage.mock.calls[0];
expect(text).toContain('UP Parivar Dallas');
});
it('includes sender name in the forwarded text', async () => {
await processForwardJob(baseJob, mockPool as any);
const [, , text] = mockPool.sendMessage.mock.calls[0];
expect(text).toContain('Rajesh');
});
it('handles missing senderName without throwing', async () => {
await processForwardJob({ ...baseJob, senderName: undefined }, mockPool as any);
expect(mockPool.sendMessage).toHaveBeenCalledTimes(1);
});
it('throws when pool has no session for the account', async () => {
const brokenPool = {
sendMessage: jest.fn().mockRejectedValue(new Error('No active session for account acc_99')),
};
await expect(
processForwardJob({ ...baseJob, fromAccountId: 'acc_99' }, brokenPool as any),
).rejects.toThrow('No active session');
});
});
```
- [ ] **Step 2: Run test to confirm it fails**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=forward.processor
```
Expected: FAIL — "Cannot find module './forward.processor'"
- [ ] **Step 3: Create `apps/worker/src/queues/forward.processor.ts`**
```typescript
import { Worker } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
import { WhatsAppSessionPool } from '../whatsapp/session-pool';
function formatForwardText(job: ForwardJobData): string {
const sender = job.senderName ? `_${job.senderName}_` : '_Unknown_';
return `📢 *${job.sourceGroupName}*\n${sender}:\n\n${job.content}\n\n_— Forwarded by TOWER_`;
}
export async function processForwardJob(
job: ForwardJobData,
pool: WhatsAppSessionPool,
): Promise<void> {
await pool.sendMessage(job.fromAccountId, job.toGroupJid, formatForwardText(job));
}
export function createForwardWorker(
redisUrl: string,
pool: WhatsAppSessionPool,
): Worker<ForwardJobData> {
return new Worker<ForwardJobData>(
'tower-forward',
async (job) => processForwardJob(job.data, pool),
{
connection: parseRedisUrl(redisUrl),
limiter: { max: 20, duration: 60_000 }, // 20 forwards per minute — avoids WhatsApp bans
},
);
}
```
- [ ] **Step 4: Create `apps/worker/src/queues/forward.queue.ts`**
```typescript
import { Queue } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
export function createForwardQueue(redisUrl: string): Queue<ForwardJobData> {
return new Queue<ForwardJobData>('tower-forward', {
connection: parseRedisUrl(redisUrl),
});
}
```
- [ ] **Step 5: Run processor tests to confirm they pass**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=forward.processor
```
Expected: 5 tests pass.
- [ ] **Step 6: Commit**
```bash
git add apps/worker/src/queues/forward.queue.ts \
apps/worker/src/queues/forward.processor.ts \
apps/worker/src/queues/forward.processor.test.ts
git commit -m "feat(worker): forward queue + processor with 20/min rate limiter"
```
---
### Task 7: Wire main.ts — multi-account pool, reactions, full pipeline
**Files:**
- Modify: `apps/worker/src/main.ts`
This is the final assembly. `main.ts` no longer imports anything from `@whiskeysockets/baileys`. It loads accounts from the DB, creates the pool, and wires: ingest message → tag detect → ingest queue; reaction → approval → forward queue.
- [ ] **Step 1: Replace `apps/worker/src/main.ts` entirely**
```typescript
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';
const logger = createLogger('tower-worker');
async function bootstrap() {
const env = validateEnv();
const prisma = new PrismaClient();
await prisma.$connect();
const adminJids = env.TOWER_ADMIN_JIDS
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: [];
const ingestQueue = createIngestQueue(env.REDIS_URL);
const forwardQueue = createForwardQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
// Load active accounts from DB — each becomes one WhatsApp session
const accounts = await prisma.account.findMany({
where: { status: 'ACTIVE', platform: 'whatsapp' },
});
if (accounts.length === 0) {
logger.warn('No active WhatsApp accounts found — add one via Prisma Studio (see Task 3 Step 5)');
}
// Per-account map of groupJid → DB Group id
const groupMaps = new Map<string, Map<string, string>>();
for (const account of accounts) {
groupMaps.set(account.id, new Map());
await pool.add(
account.id,
account.sessionPath,
async (msg, accountId) => {
const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return;
const groupMap = groupMaps.get(accountId) ?? new Map();
const sourceGroupId = groupMap.get(msg.sourceGroupJid);
if (!sourceGroupId) {
logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
return;
}
await ingestQueue.add(
'ingest',
{
platformMsgId: msg.platformMsgId,
platform: 'whatsapp',
accountId,
sourceGroupId,
senderJid: msg.senderJid,
senderName: msg.senderName,
content: msg.content,
tags,
},
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
);
logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
},
async (reaction) => {
const forwardJobs = await handleStarReaction(reaction, adminJids, prisma);
if (!forwardJobs || forwardJobs.length === 0) return;
for (const job of forwardJobs) {
await forwardQueue.add('forward', job, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
logger.info(
{ count: forwardJobs.length, messageId: forwardJobs[0]?.messageId },
'Forward jobs enqueued',
);
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
groupMaps.set(accountId, map);
},
);
}
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
const shutdown = async () => {
logger.info('Shutting down...');
await ingestWorker.close();
await forwardWorker.close();
await ingestQueue.close();
await forwardQueue.close();
await prisma.$disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
bootstrap().catch((err) => {
console.error('Worker failed to start', err);
process.exit(1);
});
```
- [ ] **Step 2: Build to confirm zero TypeScript errors**
```bash
pnpm --filter @tower/worker build
```
Expected: 0 errors. `dist/` created.
- [ ] **Step 3: Run all worker tests**
```bash
pnpm --filter @tower/worker test
```
Expected: all tests pass (normalizer ×13, approval ×7, forward.processor ×5).
- [ ] **Step 4: Start the worker and verify multi-account startup**
```bash
pnpm --filter @tower/worker dev
```
Expected log output (with one account seeded):
```
INFO (tower-worker): Tower worker ready {"accountCount": 1}
INFO (whatsapp-session): WhatsApp connected {"accountId": "acc_..."}
INFO (group-sync): Groups synced {"count": N, "accountId": "acc_..."}
```
If `accountCount` is 0, the account row wasn't inserted in Task 3 Step 5 — insert it now.
- [ ] **Step 5: End-to-end smoke test**
With the worker running and the WhatsApp session authenticated:
1. Send a message containing `#important` from a non-bot number to a group the bot is in.
2. Confirm the worker logs `"Message enqueued"` with the correct tags.
3. Check the DB: `psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT status, tags FROM "Message" ORDER BY "createdAt" DESC LIMIT 3;'`
Expected: one row with `status = PENDING` and `tags = {#important}`.
4. React to that message with ⭐ from an admin JID (the JID listed in `TOWER_ADMIN_JIDS` in `.env`).
5. Confirm the worker logs `"Forward jobs enqueued"`.
6. Check DB approvals: `SELECT decision, "adminId" FROM "Approval" ORDER BY "decidedAt" DESC LIMIT 1;`
Expected: one row with `decision = APPROVED`.
- [ ] **Step 6: Commit**
```bash
git add apps/worker/src/main.ts
git commit -m "feat(worker): multi-account session pool, reactions → approval → forward pipeline"
```
---
## What this plan does NOT cover (Plan 4+)
- Admin dashboard UI for approving messages (Next.js — Plan 5)
- Meilisearch archive indexing (Plan 4)
- Adding a second bot account at runtime without restart (future: webhook endpoint to trigger `pool.add`)
- `/tower` bot command handling
- Rejection workflow (admin removes ⭐ or uses a different emoji/command)