Replaces DisconnectReason enum import with type-only WASocket import and uses 401 directly instead of DisconnectReason.loggedOut. Baileys is an ES module that cannot be executed in Jest's CommonJS mode, so removing the value import (keeping only type imports) prevents ts-jest from trying to execute the module. Also updated session-pool.test.ts to verify end() is called with the expected Boom error object instead of undefined. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
38 KiB
Multi-Account Architecture, Adapter Boundary & Approval Workflow Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Seal the WhatsApp adapter so Baileys types never leak outside src/whatsapp/; add multi-account bot support via a session pool loaded from the DB; implement the ⭐ star reaction approval workflow with a rate-limited cross-group forward queue.
Architecture: NormalizedMessage and NormalizedReaction move to @tower/types (platform-neutral). session.ts normalizes raw Baileys events internally before calling any callback — main.ts never imports from @whiskeysockets/baileys. WhatsAppSessionPool manages N sessions indexed by accountId, loaded from the accounts DB table on startup. A ⭐ reaction from an admin JID triggers handleStarReaction → writes Approval record → enqueues ForwardJobData jobs into a rate-limited BullMQ queue (20 forwards/min) to avoid WhatsApp bans.
Tech Stack: BullMQ 5, Prisma 6, Baileys 7.0.0-rc13, TypeScript 5, Jest 29, pnpm workspaces
File Map
Modify:
packages/types/src/message.ts— addNormalizedMessage,NormalizedReaction,ForwardJobData; addaccountIdtoIngestJobDataapps/api/prisma/schema.prisma— addAccountmodel,AccountStatusenum; add optionalaccountIdtoGroupapps/worker/src/whatsapp/normalizer.ts— importNormalizedMessage/NormalizedReactionfrom@tower/types; acceptaccountIdparam; addnormalizeReactionapps/worker/src/whatsapp/normalizer.test.ts— add reaction testsapps/worker/src/whatsapp/group-sync.ts— acceptaccountIdparam, set on group upsertapps/worker/src/whatsapp/session.ts— normalize + react inside handler; change callback types; acceptaccountIdas first paramapps/worker/src/main.ts— use pool, load accounts from DB, wire reactions through approval
Create:
apps/worker/src/whatsapp/session-pool.ts—WhatsAppSessionPoolclassapps/worker/src/core/approval.ts—handleStarReactionapps/worker/src/core/approval.test.tsapps/worker/src/queues/forward.queue.tsapps/worker/src/queues/forward.processor.tsapps/worker/src/queues/forward.processor.test.ts
Task 1: Extend @tower/types
Files:
- Modify:
packages/types/src/message.ts
Add NormalizedMessage, NormalizedReaction, ForwardJobData as platform-neutral shared types. Add accountId to IngestJobData. These types must contain zero Baileys imports.
- Step 1: Replace
packages/types/src/message.tsentirely
export type Platform = 'whatsapp' | 'telegram' | 'discord';
export type MessageStatus =
| 'PENDING'
| 'APPROVED'
| 'REJECTED'
| 'DISTRIBUTED'
| 'ARCHIVED';
export type ApprovalDecision = 'APPROVED' | 'REJECTED';
// Platform-neutral normalized message — zero Baileys/Telegram types here
export interface NormalizedMessage {
platformMsgId: string;
sourceGroupJid: string;
senderJid: string;
senderName?: string;
content: string;
accountId: string; // which bot account received this message
}
// Platform-neutral normalized reaction (e.g. WhatsApp emoji reaction)
export interface NormalizedReaction {
reactorJid: string;
targetMsgId: string; // platformMsgId of the message being reacted to
sourceGroupJid: string;
emoji: string;
accountId: string; // which bot account received this reaction
}
export interface CanonicalMessage {
messageId: string;
platform: Platform;
platformMsgId: string;
sourceGroupId: string;
senderJid: string;
senderName?: string;
content: string;
mediaUrl?: string;
tags: string[];
status: MessageStatus;
createdAt: Date;
}
export interface Group {
id: string;
platform: Platform;
platformId: string;
name: string;
description?: string;
isActive: boolean;
}
export interface SyncRoute {
id: string;
sourceGroupId: string;
targetGroupId: string;
isActive: boolean;
}
export interface IngestJobData {
platformMsgId: string;
platform: Platform;
accountId: string; // which bot account received this message
sourceGroupId: string;
senderJid: string;
senderName?: string;
content: string;
tags: string[];
}
export interface ForwardJobData {
messageId: string; // DB id of the approved Message record
content: string;
sourceGroupName: string;
senderName?: string;
toGroupJid: string;
fromAccountId: string; // which bot account to send the forward from
}
- Step 2: Confirm types package compiles with zero errors
cd /path/to/repo && pnpm --filter @tower/types build
Expected: exits 0, no TypeScript errors.
- Step 3: Commit
git add packages/types/src/message.ts
git commit -m "feat(types): NormalizedMessage, NormalizedReaction, ForwardJobData; accountId on IngestJobData"
Task 2: Seal the WhatsApp adapter boundary
Files:
- Modify:
apps/worker/src/whatsapp/normalizer.ts - Modify:
apps/worker/src/whatsapp/normalizer.test.ts - Modify:
apps/worker/src/whatsapp/session.ts
After this task: main.ts will never import anything from @whiskeysockets/baileys. All Baileys types stay inside src/whatsapp/.
- Step 1: Replace
apps/worker/src/whatsapp/normalizer.ts
import { proto } from '@whiskeysockets/baileys';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
function extractText(msg: proto.IWebMessageInfo): string {
const m = msg.message;
if (!m) return '';
return (
m.conversation ||
m.extendedTextMessage?.text ||
m.imageMessage?.caption ||
m.videoMessage?.caption ||
m.documentMessage?.caption ||
''
);
}
export function normalizeMessage(
msg: proto.IWebMessageInfo,
accountId: string,
): NormalizedMessage | null {
const key = msg.key;
if (!key) return null;
const remoteJid = key.remoteJid ?? '';
if (!remoteJid.endsWith('@g.us')) return null;
if (key.fromMe) return null;
if (!msg.message) return null;
const platformMsgId = key.id;
if (!platformMsgId) return null;
const content = extractText(msg);
return {
platformMsgId,
sourceGroupJid: remoteJid,
senderJid: key.participant ?? '',
senderName: msg.pushName ?? undefined,
content,
accountId,
};
}
export function normalizeReaction(
msg: proto.IWebMessageInfo,
accountId: string,
): NormalizedReaction | null {
const key = msg.key;
if (!key) return null;
const remoteJid = key.remoteJid ?? '';
if (!remoteJid.endsWith('@g.us')) return null;
if (key.fromMe) return null;
const reaction = msg.message?.reactionMessage;
if (!reaction) return null;
const targetMsgId = reaction.key?.id;
if (!targetMsgId) return null;
return {
reactorJid: key.participant ?? '',
targetMsgId,
sourceGroupJid: remoteJid,
emoji: reaction.text ?? '',
accountId,
};
}
- Step 2: Write failing tests for normalizeReaction
Append to apps/worker/src/whatsapp/normalizer.test.ts (keep all existing tests, add below):
import { normalizeReaction } from './normalizer';
describe('normalizeReaction', () => {
it('normalizes a star reaction from a group participant', () => {
const msg = {
key: {
remoteJid: '120363043312345678@g.us',
fromMe: false,
id: 'REACTION_ID',
participant: '919876543210@s.whatsapp.net',
},
message: {
reactionMessage: {
key: { remoteJid: '120363043312345678@g.us', id: 'TARGET_MSG_ID' },
text: '⭐',
},
},
} as proto.IWebMessageInfo;
const result = normalizeReaction(msg, 'acc_1');
expect(result).toMatchObject({
reactorJid: '919876543210@s.whatsapp.net',
targetMsgId: 'TARGET_MSG_ID',
sourceGroupJid: '120363043312345678@g.us',
emoji: '⭐',
accountId: 'acc_1',
});
});
it('returns null when reactionMessage is missing (regular message)', () => {
const msg = {
key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'ID' },
message: { conversation: 'hello' },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
it('returns null for own reactions (fromMe=true)', () => {
const msg = {
key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'ID' },
message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
it('returns null for DM reactions (non-group jid)', () => {
const msg = {
key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'ID' },
message: { reactionMessage: { key: { id: 'TARGET' }, text: '⭐' } },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
it('returns null when targetMsgId is missing', () => {
const msg = {
key: {
remoteJid: '120363043312345678@g.us',
fromMe: false,
id: 'ID',
participant: '91@s.whatsapp.net',
},
message: { reactionMessage: { key: {}, text: '⭐' } },
} as proto.IWebMessageInfo;
expect(normalizeReaction(msg, 'acc_1')).toBeNull();
});
});
Also update the normalizeMessage tests: the function signature changed to accept accountId as second param. Update every normalizeMessage(makeMsg(...)) call to normalizeMessage(makeMsg(...), 'acc_1') and add accountId: 'acc_1' to the expected toMatchObject where relevant.
- Step 3: Run normalizer tests to confirm they pass
pnpm --filter @tower/worker test -- --testPathPattern=normalizer
Expected: all existing tests + 5 new reaction tests pass.
- Step 4: Replace
apps/worker/src/whatsapp/session.ts
import makeWASocket, {
useMultiFileAuthState,
fetchLatestBaileysVersion,
DisconnectReason,
WASocket,
GroupMetadata,
} from '@whiskeysockets/baileys';
import { Boom } from '@hapi/boom';
import qrcode from 'qrcode-terminal';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
import { normalizeMessage, normalizeReaction } from './normalizer';
import { createLogger } from '@tower/logger';
const logger = createLogger('whatsapp-session');
export type OnMessageCallback = (msg: NormalizedMessage) => Promise<void> | void;
export type OnReactionCallback = (reaction: NormalizedReaction) => Promise<void> | void;
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => Promise<void> | void;
export async function createWhatsAppSession(
accountId: string,
sessionPath: string,
onMessage: OnMessageCallback,
onReaction: OnReactionCallback,
onGroups: OnGroupsCallback,
): Promise<WASocket> {
const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
const { version } = await fetchLatestBaileysVersion();
const sock = makeWASocket({
version,
auth: state,
printQRInTerminal: false,
logger: logger as any,
});
sock.ev.on('creds.update', saveCreds);
sock.ev.on('connection.update', async ({ connection, qr, lastDisconnect }) => {
if (qr) {
qrcode.generate(qr, { small: true });
}
if (connection === 'close') {
const reason = (lastDisconnect?.error as Boom)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info({ reason, shouldReconnect }, 'Connection closed');
if (shouldReconnect) {
logger.info('Reconnecting in 5s...');
setTimeout(
() => createWhatsAppSession(accountId, sessionPath, onMessage, onReaction, onGroups),
5000,
);
}
} else if (connection === 'open') {
try {
logger.info({ accountId }, 'WhatsApp connected');
const groups = await sock.groupFetchAllParticipating();
await Promise.resolve(onGroups(groups)).catch((err) =>
logger.error({ err }, 'Group sync error'),
);
} catch (err) {
logger.error({ err }, 'Failed to fetch groups on connect');
}
}
});
sock.ev.on('messages.upsert', ({ messages, type }) => {
if (type !== 'notify') return;
for (const msg of messages) {
if (msg.message?.reactionMessage) {
const reaction = normalizeReaction(msg, accountId);
if (reaction) {
void Promise.resolve(onReaction(reaction)).catch((err) =>
logger.error({ err }, 'Error processing reaction'),
);
}
continue;
}
const normalized = normalizeMessage(msg, accountId);
if (!normalized) continue;
void Promise.resolve(onMessage(normalized)).catch((err) =>
logger.error({ err }, 'Error processing message'),
);
}
});
return sock;
}
- Step 5: Commit
git add apps/worker/src/whatsapp/normalizer.ts \
apps/worker/src/whatsapp/normalizer.test.ts \
apps/worker/src/whatsapp/session.ts
git commit -m "feat(worker): seal WhatsApp adapter — normalize inside session, reactions handled internally"
Task 3: Prisma — Account model and Group.accountId
Files:
-
Modify:
apps/api/prisma/schema.prisma -
Step 1: Add
AccountStatusenum andAccountmodel to the schema
Open apps/api/prisma/schema.prisma. Append after the ConsentRecord model block:
enum AccountStatus {
ACTIVE
DISCONNECTED
BANNED
}
model Account {
id String @id @default(cuid())
platform String
jid String
sessionPath String
displayName String?
status AccountStatus @default(ACTIVE)
groups Group[]
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([platform, jid])
}
- Step 2: Add optional
accountIdto theGroupmodel
The full updated Group model (replace the existing one):
model Group {
id String @id @default(cuid())
platform String
platformId String
name String
description String?
isActive Boolean @default(true)
accountId String?
account Account? @relation(fields: [accountId], references: [id])
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
messages Message[]
syncRoutesFrom SyncRoute[] @relation("sourceGroup")
syncRoutesTo SyncRoute[] @relation("targetGroup")
consentRecords ConsentRecord[]
@@unique([platform, platformId])
}
- Step 3: Run migration
cd apps/api && DATABASE_URL="postgresql://tower:tower@localhost:5433/tower" \
npx prisma migrate dev --name add-account-model
Expected: "Your database is now in sync with your schema."
- Step 4: Regenerate Prisma client in worker
pnpm --filter @tower/worker generate
Expected: "Generated Prisma Client..."
- Step 5: Seed the first Account record
Find your WhatsApp session's JID. It is stored in sessions/creds.json (the file at the path you set as WHATSAPP_SESSION_PATH in .env). Open it and look for "me": { "id": "..." } — that string is your JID.
Insert the account record (replace YOUR_JID and YOUR_SESSION_PATH):
psql "postgresql://tower:tower@localhost:5433/tower" <<'SQL'
INSERT INTO "Account" (id, platform, jid, "sessionPath", "displayName", status, "createdAt", "updatedAt")
VALUES (
'acc_' || substring(gen_random_uuid()::text, 1, 8),
'whatsapp',
'YOUR_JID',
'YOUR_SESSION_PATH',
'Bot 1',
'ACTIVE',
now(),
now()
)
ON CONFLICT DO NOTHING;
SQL
Verify it was inserted:
psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT id, jid, "sessionPath", status FROM "Account";'
Expected: one row with your JID and ACTIVE status.
- Step 6: Commit
git add apps/api/prisma/schema.prisma apps/api/prisma/migrations/
git commit -m "feat(schema): Account model with AccountStatus enum, optional Group.accountId"
Task 4: WhatsAppSessionPool and updated group-sync
Files:
-
Modify:
apps/worker/src/whatsapp/group-sync.ts -
Create:
apps/worker/src/whatsapp/session-pool.ts -
Step 1: Update
group-sync.tsto acceptaccountId
Replace apps/worker/src/whatsapp/group-sync.ts:
import { GroupMetadata } from '@whiskeysockets/baileys';
import { createLogger } from '@tower/logger';
const logger = createLogger('group-sync');
export async function syncGroups(
groups: Record<string, GroupMetadata>,
accountId: string,
prisma: any,
): Promise<Map<string, string>> {
const jidToDbId = new Map<string, string>();
for (const [jid, meta] of Object.entries(groups)) {
const group = await prisma.group.upsert({
where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
create: {
platform: 'whatsapp',
platformId: jid,
name: meta.subject,
description: meta.desc ?? undefined,
isActive: true,
accountId,
},
update: {
name: meta.subject,
description: meta.desc ?? undefined,
accountId,
},
});
jidToDbId.set(jid, group.id);
}
logger.info({ count: jidToDbId.size, accountId }, 'Groups synced');
return jidToDbId;
}
- Step 2: Create
apps/worker/src/whatsapp/session-pool.ts
import { WASocket } from '@whiskeysockets/baileys';
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
import { createWhatsAppSession } from './session';
import { createLogger } from '@tower/logger';
const logger = createLogger('session-pool');
// Callbacks the pool exposes to main.ts — accountId injected by the pool
export type PoolMessageCallback = (msg: NormalizedMessage, accountId: string) => Promise<void> | void;
export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: string) => Promise<void> | void;
// groups typed as `any` — GroupMetadata (Baileys type) stays inside whatsapp/ folder
export type PoolGroupsCallback = (groups: any, accountId: string) => Promise<void> | void;
export class WhatsAppSessionPool {
private sessions = new Map<string, WASocket>();
async add(
accountId: string,
sessionPath: string,
onMessage: PoolMessageCallback,
onReaction: PoolReactionCallback,
onGroups: PoolGroupsCallback,
): Promise<void> {
logger.info({ accountId }, 'Starting session');
const sock = await createWhatsAppSession(
accountId,
sessionPath,
(msg) => onMessage(msg, accountId),
(reaction) => onReaction(reaction, accountId),
(groups) => onGroups(groups, accountId),
);
this.sessions.set(accountId, sock);
}
get(accountId: string): WASocket | undefined {
return this.sessions.get(accountId);
}
getAll(): Map<string, WASocket> {
return this.sessions;
}
async sendMessage(accountId: string, groupJid: string, text: string): Promise<void> {
const sock = this.sessions.get(accountId);
if (!sock) throw new Error(`No active session for account ${accountId}`);
await sock.sendMessage(groupJid, { text });
}
async remove(accountId: string): Promise<void> {
const sock = this.sessions.get(accountId);
if (sock) {
await sock.logout().catch(() => {});
this.sessions.delete(accountId);
logger.info({ accountId }, 'Session removed');
}
}
}
- Step 3: Confirm worker compiles (ignoring main.ts errors for now)
pnpm --filter @tower/worker build 2>&1 | grep -v "main.ts"
Expected: no errors outside main.ts (it still uses old signatures — fixed in Task 7).
- Step 4: Commit
git add apps/worker/src/whatsapp/session-pool.ts apps/worker/src/whatsapp/group-sync.ts
git commit -m "feat(worker): WhatsAppSessionPool + group-sync accepts accountId"
Task 5: Approval core logic
Files:
- Create:
apps/worker/src/core/approval.test.ts - Create:
apps/worker/src/core/approval.ts
Context: When an admin reacts with ⭐ to a group message, handleStarReaction finds the message in DB, marks it APPROVED, creates an Approval record, then returns ForwardJobData[] — one entry per active SyncRoute from the message's source group. The caller (main.ts) enqueues those as forward jobs.
The Prisma relation graph: Message.sourceGroup → Group.syncRoutesFrom → SyncRoute.targetGroup → Group. The query must include sourceGroup.syncRoutesFrom.targetGroup — NOT a top-level syncRoutesFrom on Message (that relation doesn't exist on Message).
- Step 1: Write the failing tests
Create apps/worker/src/core/approval.test.ts:
import { handleStarReaction } from './approval';
import { NormalizedReaction } from '@tower/types';
function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
return {
reactorJid: '919876543210@s.whatsapp.net',
targetMsgId: 'TARGET_MSG_123',
sourceGroupJid: '120363043312345678@g.us',
emoji: '⭐',
accountId: 'acc_1',
...overrides,
};
}
const adminJids = ['919876543210@s.whatsapp.net'];
describe('handleStarReaction', () => {
it('returns null for non-star emoji', async () => {
const result = await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any);
expect(result).toBeNull();
});
it('returns null when reactor is not an admin', async () => {
const result = await handleStarReaction(
makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }),
adminJids,
{} as any,
);
expect(result).toBeNull();
});
it('returns null when message not found', async () => {
const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toBeNull();
expect(prisma.message.findUnique).toHaveBeenCalledWith({
where: {
platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' },
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
});
});
it('returns null when message status is not PENDING', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'REJECTED',
approval: null,
sourceGroup: { name: 'Test Group', syncRoutesFrom: [] },
}),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('returns null when message is already approved (approval record exists)', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'APPROVED',
approval: { id: 'appr_1' },
sourceGroup: { name: 'Test Group', syncRoutesFrom: [] },
}),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('approves message and returns empty array when no sync routes', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'hello',
senderName: 'Alice',
sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
}),
update: jest.fn().mockResolvedValue({}),
},
approval: { create: jest.fn().mockResolvedValue({}) },
} as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toEqual([]);
expect(prisma.message.update).toHaveBeenCalledWith({
where: { id: 'msg_1' },
data: { status: 'APPROVED' },
});
expect(prisma.approval.create).toHaveBeenCalledWith({
data: {
messageId: 'msg_1',
adminId: '919876543210@s.whatsapp.net',
decision: 'APPROVED',
},
});
});
it('returns ForwardJobData for each active sync route', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'important announcement',
senderName: 'Bob',
sourceGroup: {
name: 'Source Group',
syncRoutesFrom: [
{ targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
{ targetGroup: { platformId: '888@g.us', accountId: null } },
],
},
}),
update: jest.fn().mockResolvedValue({}),
},
approval: { create: jest.fn().mockResolvedValue({}) },
} as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toHaveLength(2);
expect(result![0]).toMatchObject({
messageId: 'msg_1',
content: 'important announcement',
sourceGroupName: 'Source Group',
senderName: 'Bob',
toGroupJid: '999@g.us',
fromAccountId: 'acc_2',
});
// falls back to reaction.accountId when targetGroup.accountId is null
expect(result![1]).toMatchObject({
toGroupJid: '888@g.us',
fromAccountId: 'acc_1',
});
});
});
- Step 2: Run tests to confirm they fail
pnpm --filter @tower/worker test -- --testPathPattern=approval
Expected: FAIL — "Cannot find module './approval'"
- Step 3: Create
apps/worker/src/core/approval.ts
import { NormalizedReaction, ForwardJobData } from '@tower/types';
export async function handleStarReaction(
reaction: NormalizedReaction,
adminJids: string[],
prisma: any,
): Promise<ForwardJobData[] | null> {
if (reaction.emoji !== '⭐') return null;
if (!adminJids.includes(reaction.reactorJid)) return null;
const message = await prisma.message.findUnique({
where: {
platform_platformMsgId: {
platform: 'whatsapp',
platformMsgId: reaction.targetMsgId,
},
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
});
if (!message) return null;
if (message.status !== 'PENDING') return null;
if (message.approval) return null;
await prisma.message.update({
where: { id: message.id },
data: { status: 'APPROVED' },
});
await prisma.approval.create({
data: {
messageId: message.id,
adminId: reaction.reactorJid,
decision: 'APPROVED',
},
});
const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom.map((route: any) => ({
messageId: message.id,
content: message.content,
sourceGroupName: message.sourceGroup.name,
senderName: message.senderName ?? undefined,
toGroupJid: route.targetGroup.platformId,
fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
}));
return jobs;
}
- Step 4: Run tests to confirm they pass
pnpm --filter @tower/worker test -- --testPathPattern=approval
Expected: 7 tests pass.
- Step 5: Commit
git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts
git commit -m "feat(worker): handleStarReaction approval core with tests"
Task 6: Forward queue and processor
Files:
- Create:
apps/worker/src/queues/forward.processor.test.ts - Create:
apps/worker/src/queues/forward.processor.ts - Create:
apps/worker/src/queues/forward.queue.ts
The rate limiter (20 forwards/min) goes on the Worker, not the Queue — this is the BullMQ v5 pattern.
- Step 1: Write the failing processor test
Create apps/worker/src/queues/forward.processor.test.ts:
import { processForwardJob } from './forward.processor';
import { ForwardJobData } from '@tower/types';
const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) };
const baseJob: ForwardJobData = {
messageId: 'msg_1',
content: 'Event this Saturday at the temple',
sourceGroupName: 'UP Parivar Dallas',
senderName: 'Rajesh',
toGroupJid: '120363099999@g.us',
fromAccountId: 'acc_1',
};
describe('processForwardJob', () => {
beforeEach(() => jest.clearAllMocks());
it('sends a formatted message via the pool', async () => {
await processForwardJob(baseJob, mockPool as any);
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'acc_1',
'120363099999@g.us',
expect.stringContaining('Event this Saturday at the temple'),
);
});
it('includes source group name in the forwarded text', async () => {
await processForwardJob(baseJob, mockPool as any);
const [, , text] = mockPool.sendMessage.mock.calls[0];
expect(text).toContain('UP Parivar Dallas');
});
it('includes sender name in the forwarded text', async () => {
await processForwardJob(baseJob, mockPool as any);
const [, , text] = mockPool.sendMessage.mock.calls[0];
expect(text).toContain('Rajesh');
});
it('handles missing senderName without throwing', async () => {
await processForwardJob({ ...baseJob, senderName: undefined }, mockPool as any);
expect(mockPool.sendMessage).toHaveBeenCalledTimes(1);
});
it('throws when pool has no session for the account', async () => {
const brokenPool = {
sendMessage: jest.fn().mockRejectedValue(new Error('No active session for account acc_99')),
};
await expect(
processForwardJob({ ...baseJob, fromAccountId: 'acc_99' }, brokenPool as any),
).rejects.toThrow('No active session');
});
});
- Step 2: Run test to confirm it fails
pnpm --filter @tower/worker test -- --testPathPattern=forward.processor
Expected: FAIL — "Cannot find module './forward.processor'"
- Step 3: Create
apps/worker/src/queues/forward.processor.ts
import { Worker } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
import { WhatsAppSessionPool } from '../whatsapp/session-pool';
function formatForwardText(job: ForwardJobData): string {
const sender = job.senderName ? `_${job.senderName}_` : '_Unknown_';
return `📢 *${job.sourceGroupName}*\n${sender}:\n\n${job.content}\n\n_— Forwarded by TOWER_`;
}
export async function processForwardJob(
job: ForwardJobData,
pool: WhatsAppSessionPool,
): Promise<void> {
await pool.sendMessage(job.fromAccountId, job.toGroupJid, formatForwardText(job));
}
export function createForwardWorker(
redisUrl: string,
pool: WhatsAppSessionPool,
): Worker<ForwardJobData> {
return new Worker<ForwardJobData>(
'tower-forward',
async (job) => processForwardJob(job.data, pool),
{
connection: parseRedisUrl(redisUrl),
limiter: { max: 20, duration: 60_000 }, // 20 forwards per minute — avoids WhatsApp bans
},
);
}
- Step 4: Create
apps/worker/src/queues/forward.queue.ts
import { Queue } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
export function createForwardQueue(redisUrl: string): Queue<ForwardJobData> {
return new Queue<ForwardJobData>('tower-forward', {
connection: parseRedisUrl(redisUrl),
});
}
- Step 5: Run processor tests to confirm they pass
pnpm --filter @tower/worker test -- --testPathPattern=forward.processor
Expected: 5 tests pass.
- Step 6: Commit
git add apps/worker/src/queues/forward.queue.ts \
apps/worker/src/queues/forward.processor.ts \
apps/worker/src/queues/forward.processor.test.ts
git commit -m "feat(worker): forward queue + processor with 20/min rate limiter"
Task 7: Wire main.ts — multi-account pool, reactions, full pipeline
Files:
- Modify:
apps/worker/src/main.ts
This is the final assembly. main.ts no longer imports anything from @whiskeysockets/baileys. It loads accounts from the DB, creates the pool, and wires: ingest message → tag detect → ingest queue; reaction → approval → forward queue.
- Step 1: Replace
apps/worker/src/main.tsentirely
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';
const logger = createLogger('tower-worker');
async function bootstrap() {
const env = validateEnv();
const prisma = new PrismaClient();
await prisma.$connect();
const adminJids = env.TOWER_ADMIN_JIDS
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: [];
const ingestQueue = createIngestQueue(env.REDIS_URL);
const forwardQueue = createForwardQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
// Load active accounts from DB — each becomes one WhatsApp session
const accounts = await prisma.account.findMany({
where: { status: 'ACTIVE', platform: 'whatsapp' },
});
if (accounts.length === 0) {
logger.warn('No active WhatsApp accounts found — add one via Prisma Studio (see Task 3 Step 5)');
}
// Per-account map of groupJid → DB Group id
const groupMaps = new Map<string, Map<string, string>>();
for (const account of accounts) {
groupMaps.set(account.id, new Map());
await pool.add(
account.id,
account.sessionPath,
async (msg, accountId) => {
const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return;
const groupMap = groupMaps.get(accountId) ?? new Map();
const sourceGroupId = groupMap.get(msg.sourceGroupJid);
if (!sourceGroupId) {
logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
return;
}
await ingestQueue.add(
'ingest',
{
platformMsgId: msg.platformMsgId,
platform: 'whatsapp',
accountId,
sourceGroupId,
senderJid: msg.senderJid,
senderName: msg.senderName,
content: msg.content,
tags,
},
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
);
logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
},
async (reaction) => {
const forwardJobs = await handleStarReaction(reaction, adminJids, prisma);
if (!forwardJobs || forwardJobs.length === 0) return;
for (const job of forwardJobs) {
await forwardQueue.add('forward', job, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
logger.info(
{ count: forwardJobs.length, messageId: forwardJobs[0]?.messageId },
'Forward jobs enqueued',
);
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
groupMaps.set(accountId, map);
},
);
}
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
const shutdown = async () => {
logger.info('Shutting down...');
await ingestWorker.close();
await forwardWorker.close();
await ingestQueue.close();
await forwardQueue.close();
await prisma.$disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
bootstrap().catch((err) => {
console.error('Worker failed to start', err);
process.exit(1);
});
- Step 2: Build to confirm zero TypeScript errors
pnpm --filter @tower/worker build
Expected: 0 errors. dist/ created.
- Step 3: Run all worker tests
pnpm --filter @tower/worker test
Expected: all tests pass (normalizer ×13, approval ×7, forward.processor ×5).
- Step 4: Start the worker and verify multi-account startup
pnpm --filter @tower/worker dev
Expected log output (with one account seeded):
INFO (tower-worker): Tower worker ready {"accountCount": 1}
INFO (whatsapp-session): WhatsApp connected {"accountId": "acc_..."}
INFO (group-sync): Groups synced {"count": N, "accountId": "acc_..."}
If accountCount is 0, the account row wasn't inserted in Task 3 Step 5 — insert it now.
- Step 5: End-to-end smoke test
With the worker running and the WhatsApp session authenticated:
- Send a message containing
#importantfrom a non-bot number to a group the bot is in. - Confirm the worker logs
"Message enqueued"with the correct tags. - Check the DB:
psql "postgresql://tower:tower@localhost:5433/tower" -c 'SELECT status, tags FROM "Message" ORDER BY "createdAt" DESC LIMIT 3;'Expected: one row withstatus = PENDINGandtags = {#important}. - React to that message with ⭐ from an admin JID (the JID listed in
TOWER_ADMIN_JIDSin.env). - Confirm the worker logs
"Forward jobs enqueued". - Check DB approvals:
SELECT decision, "adminId" FROM "Approval" ORDER BY "decidedAt" DESC LIMIT 1;Expected: one row withdecision = APPROVED.
- Step 6: Commit
git add apps/worker/src/main.ts
git commit -m "feat(worker): multi-account session pool, reactions → approval → forward pipeline"
What this plan does NOT cover (Plan 4+)
- Admin dashboard UI for approving messages (Next.js — Plan 5)
- Meilisearch archive indexing (Plan 4)
- Adding a second bot account at runtime without restart (future: webhook endpoint to trigger
pool.add) /towerbot command handling- Rejection workflow (admin removes ⭐ or uses a different emoji/command)