Files
tower/docs/superpowers/plans/2026-05-27-whatsapp-integration.md
maaz519 d33b4e40b8 fix: use type-only Baileys import and raw status code to fix Jest ESM issue
Replaces DisconnectReason enum import with type-only WASocket import and
uses 401 directly instead of DisconnectReason.loggedOut. Baileys is an ES
module that cannot be executed in Jest's CommonJS mode, so removing the
value import (keeping only type imports) prevents ts-jest from trying to
execute the module.

Also updated session-pool.test.ts to verify end() is called with the
expected Boom error object instead of undefined.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 17:40:24 +05:30

1191 lines
34 KiB
Markdown

# WhatsApp Integration Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Connect `apps/worker` to WhatsApp via Baileys, normalize and tag-detect incoming messages, sync groups to the DB, and persist flagged messages as `PENDING` records ready for the approval workflow in Plan 3.
**Architecture:** The worker process holds a long-lived Baileys WebSocket connection. Incoming messages are normalized to a canonical shape, checked for TOWER tags (hashtags, `/tower` command), and pushed to a BullMQ `tower:ingest` queue. A BullMQ processor in the same worker process consumes the queue and upserts `Message` records to PostgreSQL with `PENDING` status. The NestJS API is not involved — the worker writes directly to the DB via Prisma Client.
**Tech Stack:** `@whiskeysockets/baileys ^6.0.0`, `bullmq ^5` (already in worker), `ioredis ^5` (already in worker), `@prisma/client ^6` (shared from apps/api schema), `pino` (via @tower/logger), Turborepo `generate` task
---
## File Map
| Action | Path | Purpose |
|--------|------|---------|
| Modify | `packages/config/src/index.ts` | Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS |
| Modify | `packages/config/src/index.test.ts` | Tests for new config fields |
| Modify | `packages/types/src/message.ts` | Add IngestJobData interface |
| Create | `apps/worker/src/whatsapp/tag-detector.ts` | Detect TOWER tags from message text + sender |
| Create | `apps/worker/src/whatsapp/tag-detector.test.ts` | Unit tests |
| Create | `apps/worker/src/whatsapp/normalizer.ts` | Convert Baileys proto → NormalizedMessage |
| Create | `apps/worker/src/whatsapp/normalizer.test.ts` | Unit tests |
| Create | `apps/worker/src/whatsapp/group-sync.ts` | Upsert WA groups into DB on connection |
| Create | `apps/worker/src/whatsapp/group-sync.test.ts` | Unit tests with mocked Prisma |
| Create | `apps/worker/src/queues/ingest.queue.ts` | BullMQ Queue producer factory |
| Create | `apps/worker/src/queues/ingest.processor.ts` | BullMQ Worker consumer — persists Message to DB |
| Create | `apps/worker/src/queues/ingest.processor.test.ts` | Unit tests with mocked Prisma |
| Create | `apps/worker/src/whatsapp/session.ts` | Baileys socket factory |
| Modify | `apps/worker/src/main.ts` | Wire session → normalizer → tag-detector → queue |
| Modify | `apps/worker/package.json` | Add @whiskeysockets/baileys, @prisma/client, prisma |
| Modify | `apps/worker/jest.config.js` | Load .env for Prisma |
| Modify | `turbo.json` | Add generate task |
| Modify | `.env.example` | Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS |
| Modify | `.gitignore` | Ignore sessions/ directory |
---
## Task 1: Extend @tower/config and @tower/types
**Files:**
- Modify: `packages/config/src/index.ts`
- Modify: `packages/config/src/index.test.ts`
- Modify: `packages/types/src/message.ts`
- [ ] **Step 1: Write failing tests for new config fields**
Add these two tests to `packages/config/src/index.test.ts` inside the existing `validateEnv` describe block:
```typescript
it('applies default WHATSAPP_SESSION_PATH of ./sessions when not set', () => {
const env = {
DATABASE_URL: 'postgresql://user:pass@localhost:5432/db',
REDIS_URL: 'redis://localhost:6379',
JWT_SECRET: 'a'.repeat(32),
};
const result = validateEnv(env as NodeJS.ProcessEnv);
expect(result.WHATSAPP_SESSION_PATH).toBe('./sessions');
});
it('applies default TOWER_ADMIN_JIDS of empty string when not set', () => {
const env = {
DATABASE_URL: 'postgresql://user:pass@localhost:5432/db',
REDIS_URL: 'redis://localhost:6379',
JWT_SECRET: 'a'.repeat(32),
};
const result = validateEnv(env as NodeJS.ProcessEnv);
expect(result.TOWER_ADMIN_JIDS).toBe('');
});
```
- [ ] **Step 2: Run to verify tests fail**
```bash
pnpm --filter @tower/config test
```
Expected: FAIL — `Property 'WHATSAPP_SESSION_PATH' does not exist on type 'Env'`
- [ ] **Step 3: Update packages/config/src/index.ts**
Replace the entire file content:
```typescript
import { z } from 'zod';
const envSchema = z.object({
NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
DATABASE_URL: z.string().url(),
REDIS_URL: z.string().url(),
API_PORT: z.coerce.number().default(3001),
JWT_SECRET: z.string().min(32),
MEILI_URL: z.string().url().default('http://localhost:7700'),
MEILI_MASTER_KEY: z.string().default('tower_meili_dev_key'),
LOG_LEVEL: z.enum(['trace', 'debug', 'info', 'warn', 'error']).default('info'),
WHATSAPP_SESSION_PATH: z.string().default('./sessions'),
TOWER_ADMIN_JIDS: z.string().default(''),
});
export type Env = z.infer<typeof envSchema>;
export function validateEnv(env: NodeJS.ProcessEnv = process.env): Env {
const result = envSchema.safeParse(env);
if (!result.success) {
console.error('Invalid environment variables:', result.error.format());
throw new Error('Invalid environment variables');
}
return result.data;
}
```
- [ ] **Step 4: Run tests to verify 7 pass**
```bash
pnpm --filter @tower/config test
```
Expected: PASS — 7 tests total (5 existing + 2 new)
- [ ] **Step 5: Add IngestJobData to packages/types/src/message.ts**
Add at the end of the file:
```typescript
export interface IngestJobData {
platformMsgId: string;
platform: Platform;
sourceGroupId: string;
senderJid: string;
senderName?: string;
content: string;
tags: string[];
}
```
- [ ] **Step 6: Build both packages**
```bash
pnpm --filter @tower/config build
pnpm --filter @tower/types build
```
Expected: both exit 0 with no errors
- [ ] **Step 7: Commit**
```bash
git add packages/config/src/index.ts packages/config/src/index.test.ts packages/types/src/message.ts
git commit -m "feat: add WhatsApp config fields and IngestJobData type"
```
---
## Task 2: Tag Detector (TDD)
**Files:**
- Create: `apps/worker/src/whatsapp/tag-detector.ts`
- Create: `apps/worker/src/whatsapp/tag-detector.test.ts`
The tag detector is a pure function — no I/O, no network, no DB. It takes message text and a sender JID, and returns an array of string tags. A message is "flagged" (should be ingested) if the tags array is non-empty.
Tag rules:
- Text contains `#important` (case-insensitive) → tag `#important`
- Text contains `#upparivar` (case-insensitive) → tag `#upparivar`
- Text contains `#event` (case-insensitive) → tag `#event`
- Text starts with `/tower` → tag `#tower-command`
- Sender JID is in the admin list → tag `#admin`
- [ ] **Step 1: Write failing tests**
Create `apps/worker/src/whatsapp/tag-detector.test.ts`:
```typescript
import { detectTags, isFlagged } from './tag-detector';
const ADMINS = ['1234567890@s.whatsapp.net', '0987654321@s.whatsapp.net'];
describe('detectTags', () => {
it('detects #important hashtag (case-insensitive)', () => {
expect(detectTags('Check this #IMPORTANT update', 'user@s.whatsapp.net', ADMINS))
.toContain('#important');
});
it('detects #upparivar hashtag (case-insensitive)', () => {
expect(detectTags('Welcome to #UPParivar community', 'user@s.whatsapp.net', ADMINS))
.toContain('#upparivar');
});
it('detects #event hashtag', () => {
expect(detectTags('Join our #event on Saturday', 'user@s.whatsapp.net', ADMINS))
.toContain('#event');
});
it('detects /tower command prefix', () => {
expect(detectTags('/tower save this message', 'user@s.whatsapp.net', ADMINS))
.toContain('#tower-command');
});
it('detects multiple tags in one message', () => {
const tags = detectTags('#important #event happening', 'user@s.whatsapp.net', ADMINS);
expect(tags).toContain('#important');
expect(tags).toContain('#event');
});
it('detects admin sender', () => {
expect(detectTags('Regular message', '1234567890@s.whatsapp.net', ADMINS))
.toContain('#admin');
});
it('returns empty array for untagged message from non-admin', () => {
expect(detectTags('Just a regular chat message', 'nobody@s.whatsapp.net', ADMINS))
.toEqual([]);
});
it('returns empty array for empty text', () => {
expect(detectTags('', 'nobody@s.whatsapp.net', ADMINS)).toEqual([]);
});
});
describe('isFlagged', () => {
it('returns true when tags array is non-empty', () => {
expect(isFlagged(['#important'])).toBe(true);
});
it('returns false when tags array is empty', () => {
expect(isFlagged([])).toBe(false);
});
});
```
- [ ] **Step 2: Run to verify tests fail**
```bash
pnpm --filter @tower/worker test
```
Expected: FAIL — `Cannot find module './tag-detector'`
- [ ] **Step 3: Implement apps/worker/src/whatsapp/tag-detector.ts**
```typescript
const HASHTAGS: Array<{ pattern: RegExp; tag: string }> = [
{ pattern: /#important/i, tag: '#important' },
{ pattern: /#upparivar/i, tag: '#upparivar' },
{ pattern: /#event/i, tag: '#event' },
];
export function detectTags(text: string, senderJid: string, adminJids: string[]): string[] {
const tags: string[] = [];
for (const { pattern, tag } of HASHTAGS) {
if (pattern.test(text)) tags.push(tag);
}
if (text.trimStart().startsWith('/tower')) tags.push('#tower-command');
if (adminJids.includes(senderJid)) tags.push('#admin');
return tags;
}
export function isFlagged(tags: string[]): boolean {
return tags.length > 0;
}
```
- [ ] **Step 4: Run tests to verify all 9 pass**
```bash
pnpm --filter @tower/worker test
```
Expected: PASS — 9 tests (1 existing smoke test + 8 new)
- [ ] **Step 5: Commit**
```bash
git add apps/worker/src/whatsapp/
git commit -m "feat: add tag detector for TOWER message flagging"
```
---
## Task 3: Message Normalizer (TDD)
**Files:**
- Create: `apps/worker/src/whatsapp/normalizer.ts`
- Create: `apps/worker/src/whatsapp/normalizer.test.ts`
The normalizer converts a Baileys `proto.IWebMessageInfo` object into a plain `NormalizedMessage`. It returns `null` for messages that should be ignored (protocol messages, own messages, non-group messages).
- [ ] **Step 1: Write failing tests**
Create `apps/worker/src/whatsapp/normalizer.test.ts`:
```typescript
import { proto } from '@whiskeysockets/baileys';
import { normalizeMessage } from './normalizer';
function makeMsg(overrides: Partial<proto.IWebMessageInfo> = {}): proto.IWebMessageInfo {
return {
key: {
remoteJid: '120363043312345678@g.us',
fromMe: false,
id: 'ABCDEF123456',
participant: '919876543210@s.whatsapp.net',
},
pushName: 'Alice',
message: { conversation: 'Hello world' },
...overrides,
} as proto.IWebMessageInfo;
}
describe('normalizeMessage', () => {
it('normalizes a plain text conversation message', () => {
const result = normalizeMessage(makeMsg());
expect(result).toMatchObject({
platformMsgId: 'ABCDEF123456',
sourceGroupJid: '120363043312345678@g.us',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: 'Hello world',
});
});
it('normalizes an extendedTextMessage', () => {
const result = normalizeMessage(makeMsg({
message: { extendedTextMessage: { text: 'Extended text here' } },
}));
expect(result?.content).toBe('Extended text here');
});
it('normalizes an imageMessage caption', () => {
const result = normalizeMessage(makeMsg({
message: { imageMessage: { caption: 'Photo caption' } },
}));
expect(result?.content).toBe('Photo caption');
});
it('normalizes a videoMessage caption', () => {
const result = normalizeMessage(makeMsg({
message: { videoMessage: { caption: 'Video caption' } },
}));
expect(result?.content).toBe('Video caption');
});
it('returns null for own messages (fromMe = true)', () => {
const result = normalizeMessage(makeMsg({ key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'XYZ', participant: '919876543210@s.whatsapp.net' } }));
expect(result).toBeNull();
});
it('returns null for non-group messages (DMs)', () => {
const result = normalizeMessage(makeMsg({
key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'XYZ' },
}));
expect(result).toBeNull();
});
it('returns null when message payload is missing', () => {
const result = normalizeMessage(makeMsg({ message: null }));
expect(result).toBeNull();
});
it('returns null when key is missing', () => {
const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo);
expect(result).toBeNull();
});
});
```
- [ ] **Step 2: Run to verify tests fail**
```bash
pnpm --filter @tower/worker test
```
Expected: FAIL — `Cannot find module './normalizer'`
- [ ] **Step 3: Implement apps/worker/src/whatsapp/normalizer.ts**
```typescript
import { proto } from '@whiskeysockets/baileys';
export interface NormalizedMessage {
platformMsgId: string;
sourceGroupJid: string;
senderJid: string;
senderName?: string;
content: string;
}
function extractText(msg: proto.IWebMessageInfo): string {
const m = msg.message;
if (!m) return '';
return (
m.conversation ||
m.extendedTextMessage?.text ||
m.imageMessage?.caption ||
m.videoMessage?.caption ||
m.documentMessage?.caption ||
''
);
}
export function normalizeMessage(msg: proto.IWebMessageInfo): NormalizedMessage | null {
const key = msg.key;
if (!key) return null;
const remoteJid = key.remoteJid ?? '';
// Only process group messages (group JIDs end with @g.us)
if (!remoteJid.endsWith('@g.us')) return null;
// Skip our own outgoing messages
if (key.fromMe) return null;
if (!msg.message) return null;
const content = extractText(msg);
return {
platformMsgId: key.id ?? '',
sourceGroupJid: remoteJid,
senderJid: key.participant ?? '',
senderName: msg.pushName ?? undefined,
content,
};
}
```
- [ ] **Step 4: Install @whiskeysockets/baileys so types resolve**
```bash
pnpm --filter @tower/worker add @whiskeysockets/baileys
```
Expected: adds baileys to apps/worker/package.json dependencies
- [ ] **Step 5: Run tests to verify 8 pass (plus existing)**
```bash
pnpm --filter @tower/worker test
```
Expected: PASS — 10 tests total
- [ ] **Step 6: Commit**
```bash
git add apps/worker/src/whatsapp/normalizer.ts apps/worker/src/whatsapp/normalizer.test.ts apps/worker/package.json pnpm-lock.yaml
git commit -m "feat: add Baileys message normalizer"
```
---
## Task 4: BullMQ Ingest Queue + Processor (TDD)
**Files:**
- Create: `apps/worker/src/queues/ingest.queue.ts`
- Create: `apps/worker/src/queues/ingest.processor.ts`
- Create: `apps/worker/src/queues/ingest.processor.test.ts`
The processor receives an `IngestJobData` job and upserts it into the `Message` table as `PENDING`. The `sourceGroupId` in the job is the DB `Group.id` (already resolved before enqueueing). It uses `upsert` so duplicate messages (same platform + platformMsgId) are ignored.
- [ ] **Step 1: Write failing test for the processor logic**
Create `apps/worker/src/queues/ingest.processor.test.ts`:
```typescript
import { processIngestJob } from './ingest.processor';
import { IngestJobData } from '@tower/types';
const mockPrisma = {
message: {
upsert: jest.fn(),
},
};
const sampleJob: IngestJobData = {
platformMsgId: 'WA_MSG_001',
platform: 'whatsapp',
sourceGroupId: 'clxxxxxx',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: '#important update from the committee',
tags: ['#important'],
};
describe('processIngestJob', () => {
beforeEach(() => jest.clearAllMocks());
it('upserts a message with PENDING status', async () => {
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
await processIngestJob(sampleJob, mockPrisma as any);
expect(mockPrisma.message.upsert).toHaveBeenCalledWith({
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } },
create: {
platform: 'whatsapp',
platformMsgId: 'WA_MSG_001',
sourceGroupId: 'clxxxxxx',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: '#important update from the committee',
tags: ['#important'],
status: 'PENDING',
},
update: {},
});
});
it('does not throw when message already exists (idempotent upsert)', async () => {
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
await expect(processIngestJob(sampleJob, mockPrisma as any)).resolves.not.toThrow();
});
it('propagates DB errors', async () => {
mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost'));
await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost');
});
});
```
- [ ] **Step 2: Run to verify tests fail**
```bash
pnpm --filter @tower/worker test
```
Expected: FAIL — `Cannot find module './ingest.processor'`
- [ ] **Step 3: Implement apps/worker/src/queues/ingest.processor.ts**
```typescript
import { Worker } from 'bullmq';
import { PrismaClient } from '@prisma/client';
import IORedis from 'ioredis';
import { IngestJobData } from '@tower/types';
export async function processIngestJob(job: IngestJobData, prisma: PrismaClient): Promise<void> {
await prisma.message.upsert({
where: {
platform_platformMsgId: {
platform: job.platform,
platformMsgId: job.platformMsgId,
},
},
create: {
platform: job.platform,
platformMsgId: job.platformMsgId,
sourceGroupId: job.sourceGroupId,
senderJid: job.senderJid,
senderName: job.senderName,
content: job.content,
tags: job.tags,
status: 'PENDING',
},
update: {},
});
}
export function createIngestWorker(redisUrl: string, prisma: PrismaClient): Worker<IngestJobData> {
const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null });
return new Worker<IngestJobData>(
'tower:ingest',
async (job) => processIngestJob(job.data, prisma),
{ connection },
);
}
```
- [ ] **Step 4: Implement apps/worker/src/queues/ingest.queue.ts**
```typescript
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import { IngestJobData } from '@tower/types';
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null });
return new Queue<IngestJobData>('tower:ingest', { connection });
}
```
- [ ] **Step 5: Run tests to verify 3 new pass + existing**
```bash
pnpm --filter @tower/worker test
```
Expected: PASS — 13 tests total
- [ ] **Step 6: Commit**
```bash
git add apps/worker/src/queues/
git commit -m "feat: add BullMQ ingest queue and processor"
```
---
## Task 5: Group Sync (TDD)
**Files:**
- Create: `apps/worker/src/whatsapp/group-sync.ts`
- Create: `apps/worker/src/whatsapp/group-sync.test.ts`
On WhatsApp connection, Baileys gives us all groups the bot is in via `sock.groupFetchAllParticipating()`. We upsert each one to the `Group` table. The function returns a `Map<waGroupJid, dbGroupId>` used by the message listener to resolve group IDs.
- [ ] **Step 1: Write failing tests**
Create `apps/worker/src/whatsapp/group-sync.test.ts`:
```typescript
import { syncGroups } from './group-sync';
import { GroupMetadata } from '@whiskeysockets/baileys';
const mockGroups: Record<string, GroupMetadata> = {
'120363043312345678@g.us': {
id: '120363043312345678@g.us',
subject: 'UP Parivar Dallas',
desc: 'Main community group',
participants: [],
creation: 0,
owner: undefined,
restrict: false,
announce: false,
subjectOwner: undefined,
subjectTime: 0,
size: 0,
ephemeralDuration: 0,
inviteCode: undefined,
},
'999999999@g.us': {
id: '999999999@g.us',
subject: 'Events Committee',
desc: undefined,
participants: [],
creation: 0,
owner: undefined,
restrict: false,
announce: false,
subjectOwner: undefined,
subjectTime: 0,
size: 0,
ephemeralDuration: 0,
inviteCode: undefined,
},
};
const mockPrisma = {
group: {
upsert: jest.fn(),
},
};
describe('syncGroups', () => {
beforeEach(() => jest.clearAllMocks());
it('upserts each group and returns jid→id map', async () => {
mockPrisma.group.upsert
.mockResolvedValueOnce({ id: 'db-group-1' })
.mockResolvedValueOnce({ id: 'db-group-2' });
const result = await syncGroups(mockGroups, mockPrisma as any);
expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2);
expect(result.get('120363043312345678@g.us')).toBe('db-group-1');
expect(result.get('999999999@g.us')).toBe('db-group-2');
});
it('calls upsert with correct create payload', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
await syncGroups(
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
mockPrisma as any,
);
expect(mockPrisma.group.upsert).toHaveBeenCalledWith({
where: { platform_platformId: { platform: 'whatsapp', platformId: '120363043312345678@g.us' } },
create: {
platform: 'whatsapp',
platformId: '120363043312345678@g.us',
name: 'UP Parivar Dallas',
description: 'Main community group',
isActive: true,
},
update: { name: 'UP Parivar Dallas', description: 'Main community group' },
});
});
it('handles groups with no description', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-2' });
await syncGroups(
{ '999999999@g.us': mockGroups['999999999@g.us'] },
mockPrisma as any,
);
expect(mockPrisma.group.upsert).toHaveBeenCalledWith(
expect.objectContaining({
create: expect.objectContaining({ description: undefined }),
}),
);
});
it('returns an empty map when given empty groups', async () => {
const result = await syncGroups({}, mockPrisma as any);
expect(result.size).toBe(0);
expect(mockPrisma.group.upsert).not.toHaveBeenCalled();
});
});
```
- [ ] **Step 2: Run to verify tests fail**
```bash
pnpm --filter @tower/worker test
```
Expected: FAIL — `Cannot find module './group-sync'`
- [ ] **Step 3: Implement apps/worker/src/whatsapp/group-sync.ts**
```typescript
import { GroupMetadata } from '@whiskeysockets/baileys';
import { PrismaClient } from '@prisma/client';
export async function syncGroups(
groups: Record<string, GroupMetadata>,
prisma: PrismaClient,
): Promise<Map<string, string>> {
const jidToDbId = new Map<string, string>();
for (const [jid, meta] of Object.entries(groups)) {
const group = await prisma.group.upsert({
where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
create: {
platform: 'whatsapp',
platformId: jid,
name: meta.subject,
description: meta.desc ?? undefined,
isActive: true,
},
update: { name: meta.subject, description: meta.desc ?? undefined },
});
jidToDbId.set(jid, group.id);
}
return jidToDbId;
}
```
- [ ] **Step 4: Run tests to verify 4 new pass + existing**
```bash
pnpm --filter @tower/worker test
```
Expected: PASS — 17 tests total
- [ ] **Step 5: Commit**
```bash
git add apps/worker/src/whatsapp/group-sync.ts apps/worker/src/whatsapp/group-sync.test.ts
git commit -m "feat: add WhatsApp group sync to database"
```
---
## Task 6: WhatsApp Session
**Files:**
- Create: `apps/worker/src/whatsapp/session.ts`
The session module wraps Baileys' `makeWASocket`. It manages auth state, reconnection on disconnect, and calls provided callbacks for groups (on connection) and messages (on upsert).
No unit test for the session itself — it wraps a live network connection. The integration is verified end-to-end in Task 8.
- [ ] **Step 1: Create apps/worker/src/whatsapp/session.ts**
```typescript
import makeWASocket, {
useMultiFileAuthState,
fetchLatestBaileysVersion,
DisconnectReason,
WASocket,
proto,
GroupMetadata,
} from '@whiskeysockets/baileys';
import { Boom } from '@hapi/boom';
import { createLogger } from '@tower/logger';
const logger = createLogger('whatsapp-session');
export type OnMessageCallback = (msg: proto.IWebMessageInfo) => void;
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => void;
export async function createWhatsAppSession(
sessionPath: string,
onMessage: OnMessageCallback,
onGroups: OnGroupsCallback,
): Promise<WASocket> {
const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
const { version } = await fetchLatestBaileysVersion();
const sock = makeWASocket({
version,
auth: state,
printQRInTerminal: true,
logger: logger as any,
});
sock.ev.on('creds.update', saveCreds);
sock.ev.on('connection.update', async ({ connection, lastDisconnect }) => {
if (connection === 'close') {
const reason = (lastDisconnect?.error as Boom)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info({ reason, shouldReconnect }, 'Connection closed');
if (shouldReconnect) {
logger.info('Reconnecting in 5s...');
setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000);
}
} else if (connection === 'open') {
logger.info('WhatsApp connected');
const groups = await sock.groupFetchAllParticipating();
onGroups(groups);
}
});
sock.ev.on('messages.upsert', ({ messages, type }) => {
if (type !== 'notify') return;
for (const msg of messages) {
onMessage(msg);
}
});
return sock;
}
```
- [ ] **Step 2: Verify TypeScript compiles**
```bash
pnpm --filter @tower/worker build
```
Expected: exit 0 with no type errors
- [ ] **Step 3: Commit**
```bash
git add apps/worker/src/whatsapp/session.ts
git commit -m "feat: add Baileys WhatsApp session with reconnect logic"
```
---
## Task 7: Add Prisma to Worker + Update .env
**Files:**
- Modify: `apps/worker/package.json`
- Modify: `apps/worker/jest.config.js`
- Modify: `turbo.json`
- Modify: `.env.example`
- Modify: `.gitignore`
- [ ] **Step 1: Add @prisma/client and prisma to worker**
```bash
pnpm --filter @tower/worker add @prisma/client
pnpm --filter @tower/worker add --save-dev prisma
```
- [ ] **Step 2: Add generate script to apps/worker/package.json**
The `generate` script tells Prisma where to find the schema (in apps/api). After this step, `package.json` scripts section should be:
```json
"scripts": {
"generate": "prisma generate --schema=../api/prisma/schema.prisma",
"build": "tsc",
"dev": "ts-node src/main.ts",
"start": "node dist/main.js",
"test": "jest"
}
```
- [ ] **Step 3: Add generate task to turbo.json**
Replace `turbo.json` content:
```json
{
"$schema": "https://turbo.build/schema.json",
"tasks": {
"generate": {
"cache": false
},
"build": {
"dependsOn": ["generate", "^build"],
"outputs": ["dist/**", ".next/**"]
},
"dev": {
"cache": false,
"persistent": true
},
"test": {
"dependsOn": ["^build"],
"outputs": ["coverage/**"]
},
"lint": {}
}
}
```
- [ ] **Step 4: Run prisma generate for worker**
```bash
pnpm --filter @tower/worker generate
```
Expected: `✔ Generated Prisma Client` with no errors
- [ ] **Step 5: Update apps/worker/jest.config.js to load .env**
Replace the entire file:
```javascript
const path = require('path');
require('dotenv').config({ path: path.resolve(__dirname, '../../.env') });
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/*.test.ts'],
rootDir: 'src',
};
```
- [ ] **Step 6: Add dotenv dev dependency to worker**
```bash
pnpm --filter @tower/worker add --save-dev dotenv
```
- [ ] **Step 7: Update .env.example — add new vars**
Add these lines to `/Users/maaz/Documents/insignia-work/tower/.env.example`:
```bash
# WhatsApp
WHATSAPP_SESSION_PATH=./sessions
TOWER_ADMIN_JIDS=
```
Also add to `.env` (the actual file, NOT committed):
```bash
WHATSAPP_SESSION_PATH=./sessions
TOWER_ADMIN_JIDS=
```
- [ ] **Step 8: Add sessions/ to .gitignore**
Add this line to `.gitignore`:
```
sessions/
```
- [ ] **Step 9: Run worker tests to verify still passing**
```bash
pnpm --filter @tower/worker test
```
Expected: PASS — 17 tests passing
- [ ] **Step 10: Commit**
```bash
git add apps/worker/package.json apps/worker/jest.config.js turbo.json .env.example .gitignore pnpm-lock.yaml
git commit -m "chore: add Prisma client to worker, turbo generate task, update env"
```
---
## Task 8: Wire Worker Bootstrap
**Files:**
- Modify: `apps/worker/src/main.ts`
Wire together session → normalizer → tag-detector → queue so the worker fully processes incoming messages.
- [ ] **Step 1: Replace apps/worker/src/main.ts**
```typescript
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { IngestJobData } from '@tower/types';
import { createWhatsAppSession } from './whatsapp/session';
import { normalizeMessage } from './whatsapp/normalizer';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
const logger = createLogger('tower-worker');
async function bootstrap() {
const env = validateEnv();
const prisma = new PrismaClient();
await prisma.$connect();
const adminJids = env.TOWER_ADMIN_JIDS
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: [];
const ingestQueue = createIngestQueue(env.REDIS_URL);
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
ingestWorker.on('completed', (job) => {
logger.info({ jobId: job.id }, 'Ingest job completed');
});
ingestWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, err }, 'Ingest job failed');
});
// jid→dbId map populated on WA connection
let groupMap = new Map<string, string>();
await createWhatsAppSession(
env.WHATSAPP_SESSION_PATH,
async (msg) => {
const normalized = normalizeMessage(msg);
if (!normalized) return;
const tags = detectTags(normalized.content, normalized.senderJid, adminJids);
if (!isFlagged(tags)) return;
const sourceGroupId = groupMap.get(normalized.sourceGroupJid);
if (!sourceGroupId) {
logger.warn({ jid: normalized.sourceGroupJid }, 'Unknown group — skipping message');
return;
}
const jobData: IngestJobData = {
platformMsgId: normalized.platformMsgId,
platform: 'whatsapp',
sourceGroupId,
senderJid: normalized.senderJid,
senderName: normalized.senderName,
content: normalized.content,
tags,
};
await ingestQueue.add('ingest', jobData, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
logger.info({ platformMsgId: normalized.platformMsgId, tags }, 'Message enqueued');
},
async (groups) => {
logger.info({ count: Object.keys(groups).length }, 'Syncing groups');
groupMap = await syncGroups(groups, prisma);
logger.info({ count: groupMap.size }, 'Groups synced');
},
);
logger.info('Tower worker ready');
const shutdown = async () => {
logger.info('Shutting down...');
await ingestWorker.close();
await ingestQueue.close();
await prisma.$disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
bootstrap().catch((err) => {
console.error('Worker failed to start', err);
process.exit(1);
});
```
- [ ] **Step 2: Build worker to verify no type errors**
```bash
pnpm --filter @tower/worker build
```
Expected: exit 0 with no TypeScript errors
- [ ] **Step 3: Run worker tests**
```bash
pnpm --filter @tower/worker test
```
Expected: PASS — all tests still passing
- [ ] **Step 4: Commit**
```bash
git add apps/worker/src/main.ts
git commit -m "feat: wire worker bootstrap — session → normalizer → queue pipeline"
```
---
## Task 9: Turborepo Full Smoke Test
**Files:** None new — verify the full pipeline
- [ ] **Step 1: Ensure Docker is running**
```bash
docker compose up -d
```
Verify postgres (port 5433) and redis (port 6379) are healthy:
```bash
docker compose ps
```
Expected: postgres, redis, meilisearch all show `Up`
- [ ] **Step 2: Run full build**
```bash
pnpm build
```
Expected:
```
Tasks: 8 successful, 8 total
```
- [ ] **Step 3: Run full test suite**
```bash
pnpm test
```
Expected:
```
Tasks: 8 successful, 8 total
```
All packages must pass. If `@tower/api#test` fails with `DATABASE_URL not found`, verify `.env` has the correct `DATABASE_URL=postgresql://tower:tower_dev@localhost:5433/tower_dev`.
- [ ] **Step 4: Commit any fixes, then tag the milestone**
```bash
git add -A
git commit -m "chore: turborepo smoke test — all 8 packages build and test clean"
```
---
## Self-Review
**Spec coverage:**
- ✅ Baileys connection with QR-based auth and session persistence
- ✅ Group discovery on connect → upserted to `Group` table
- ✅ Message normalization (text, extended text, image/video captions)
- ✅ Tag detection: `#important`, `#upparivar`, `#event`, `/tower` command, admin sender
- ✅ BullMQ `tower:ingest` queue — durability + retry logic
- ✅ Message persistence as `PENDING` records (idempotent via upsert)
- ✅ Graceful shutdown (SIGTERM/SIGINT)
- ⏭️ ⭐ admin reaction handling — deferred to Plan 3 (requires message store)
- ⏭️ Media download — deferred to Plan 4 (requires Cloudflare R2 / MinIO)
**Type consistency check:**
- `NormalizedMessage.sourceGroupJid` → used in `groupMap.get()` in main.ts ✅
- `IngestJobData.sourceGroupId` → DB Group.id (resolved from groupMap) ✅
- `syncGroups` returns `Map<string, string>` (jid → db id) ✅
- `processIngestJob` uses `platform_platformMsgId` compound unique key (matches Prisma schema `@@unique([platform, platformMsgId])`) ✅
- `group.upsert` uses `platform_platformId` compound key (matches `@@unique([platform, platformId])`) ✅