# WhatsApp Integration Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Connect `apps/worker` to WhatsApp via Baileys, normalize and tag-detect incoming messages, sync groups to the DB, and persist flagged messages as `PENDING` records ready for the approval workflow in Plan 3. **Architecture:** The worker process holds a long-lived Baileys WebSocket connection. Incoming messages are normalized to a canonical shape, checked for TOWER tags (hashtags, `/tower` command), and pushed to a BullMQ `tower:ingest` queue. A BullMQ processor in the same worker process consumes the queue and upserts `Message` records to PostgreSQL with `PENDING` status. The NestJS API is not involved — the worker writes directly to the DB via Prisma Client. **Tech Stack:** `@whiskeysockets/baileys ^6.0.0`, `bullmq ^5` (already in worker), `ioredis ^5` (already in worker), `@prisma/client ^6` (shared from apps/api schema), `pino` (via @tower/logger), Turborepo `generate` task --- ## File Map | Action | Path | Purpose | |--------|------|---------| | Modify | `packages/config/src/index.ts` | Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS | | Modify | `packages/config/src/index.test.ts` | Tests for new config fields | | Modify | `packages/types/src/message.ts` | Add IngestJobData interface | | Create | `apps/worker/src/whatsapp/tag-detector.ts` | Detect TOWER tags from message text + sender | | Create | `apps/worker/src/whatsapp/tag-detector.test.ts` | Unit tests | | Create | `apps/worker/src/whatsapp/normalizer.ts` | Convert Baileys proto → NormalizedMessage | | Create | `apps/worker/src/whatsapp/normalizer.test.ts` | Unit tests | | Create | `apps/worker/src/whatsapp/group-sync.ts` | Upsert WA groups into DB on connection | | Create | `apps/worker/src/whatsapp/group-sync.test.ts` | Unit tests with mocked Prisma | | Create | `apps/worker/src/queues/ingest.queue.ts` | BullMQ Queue producer factory | | Create | `apps/worker/src/queues/ingest.processor.ts` | BullMQ Worker consumer — persists Message to DB | | Create | `apps/worker/src/queues/ingest.processor.test.ts` | Unit tests with mocked Prisma | | Create | `apps/worker/src/whatsapp/session.ts` | Baileys socket factory | | Modify | `apps/worker/src/main.ts` | Wire session → normalizer → tag-detector → queue | | Modify | `apps/worker/package.json` | Add @whiskeysockets/baileys, @prisma/client, prisma | | Modify | `apps/worker/jest.config.js` | Load .env for Prisma | | Modify | `turbo.json` | Add generate task | | Modify | `.env.example` | Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS | | Modify | `.gitignore` | Ignore sessions/ directory | --- ## Task 1: Extend @tower/config and @tower/types **Files:** - Modify: `packages/config/src/index.ts` - Modify: `packages/config/src/index.test.ts` - Modify: `packages/types/src/message.ts` - [ ] **Step 1: Write failing tests for new config fields** Add these two tests to `packages/config/src/index.test.ts` inside the existing `validateEnv` describe block: ```typescript it('applies default WHATSAPP_SESSION_PATH of ./sessions when not set', () => { const env = { DATABASE_URL: 'postgresql://user:pass@localhost:5432/db', REDIS_URL: 'redis://localhost:6379', JWT_SECRET: 'a'.repeat(32), }; const result = validateEnv(env as NodeJS.ProcessEnv); expect(result.WHATSAPP_SESSION_PATH).toBe('./sessions'); }); it('applies default TOWER_ADMIN_JIDS of empty string when not set', () => { const env = { DATABASE_URL: 'postgresql://user:pass@localhost:5432/db', REDIS_URL: 'redis://localhost:6379', JWT_SECRET: 'a'.repeat(32), }; const result = validateEnv(env as NodeJS.ProcessEnv); expect(result.TOWER_ADMIN_JIDS).toBe(''); }); ``` - [ ] **Step 2: Run to verify tests fail** ```bash pnpm --filter @tower/config test ``` Expected: FAIL — `Property 'WHATSAPP_SESSION_PATH' does not exist on type 'Env'` - [ ] **Step 3: Update packages/config/src/index.ts** Replace the entire file content: ```typescript import { z } from 'zod'; const envSchema = z.object({ NODE_ENV: z.enum(['development', 'test', 'production']).default('development'), DATABASE_URL: z.string().url(), REDIS_URL: z.string().url(), API_PORT: z.coerce.number().default(3001), JWT_SECRET: z.string().min(32), MEILI_URL: z.string().url().default('http://localhost:7700'), MEILI_MASTER_KEY: z.string().default('tower_meili_dev_key'), LOG_LEVEL: z.enum(['trace', 'debug', 'info', 'warn', 'error']).default('info'), WHATSAPP_SESSION_PATH: z.string().default('./sessions'), TOWER_ADMIN_JIDS: z.string().default(''), }); export type Env = z.infer; export function validateEnv(env: NodeJS.ProcessEnv = process.env): Env { const result = envSchema.safeParse(env); if (!result.success) { console.error('Invalid environment variables:', result.error.format()); throw new Error('Invalid environment variables'); } return result.data; } ``` - [ ] **Step 4: Run tests to verify 7 pass** ```bash pnpm --filter @tower/config test ``` Expected: PASS — 7 tests total (5 existing + 2 new) - [ ] **Step 5: Add IngestJobData to packages/types/src/message.ts** Add at the end of the file: ```typescript export interface IngestJobData { platformMsgId: string; platform: Platform; sourceGroupId: string; senderJid: string; senderName?: string; content: string; tags: string[]; } ``` - [ ] **Step 6: Build both packages** ```bash pnpm --filter @tower/config build pnpm --filter @tower/types build ``` Expected: both exit 0 with no errors - [ ] **Step 7: Commit** ```bash git add packages/config/src/index.ts packages/config/src/index.test.ts packages/types/src/message.ts git commit -m "feat: add WhatsApp config fields and IngestJobData type" ``` --- ## Task 2: Tag Detector (TDD) **Files:** - Create: `apps/worker/src/whatsapp/tag-detector.ts` - Create: `apps/worker/src/whatsapp/tag-detector.test.ts` The tag detector is a pure function — no I/O, no network, no DB. It takes message text and a sender JID, and returns an array of string tags. A message is "flagged" (should be ingested) if the tags array is non-empty. Tag rules: - Text contains `#important` (case-insensitive) → tag `#important` - Text contains `#upparivar` (case-insensitive) → tag `#upparivar` - Text contains `#event` (case-insensitive) → tag `#event` - Text starts with `/tower` → tag `#tower-command` - Sender JID is in the admin list → tag `#admin` - [ ] **Step 1: Write failing tests** Create `apps/worker/src/whatsapp/tag-detector.test.ts`: ```typescript import { detectTags, isFlagged } from './tag-detector'; const ADMINS = ['1234567890@s.whatsapp.net', '0987654321@s.whatsapp.net']; describe('detectTags', () => { it('detects #important hashtag (case-insensitive)', () => { expect(detectTags('Check this #IMPORTANT update', 'user@s.whatsapp.net', ADMINS)) .toContain('#important'); }); it('detects #upparivar hashtag (case-insensitive)', () => { expect(detectTags('Welcome to #UPParivar community', 'user@s.whatsapp.net', ADMINS)) .toContain('#upparivar'); }); it('detects #event hashtag', () => { expect(detectTags('Join our #event on Saturday', 'user@s.whatsapp.net', ADMINS)) .toContain('#event'); }); it('detects /tower command prefix', () => { expect(detectTags('/tower save this message', 'user@s.whatsapp.net', ADMINS)) .toContain('#tower-command'); }); it('detects multiple tags in one message', () => { const tags = detectTags('#important #event happening', 'user@s.whatsapp.net', ADMINS); expect(tags).toContain('#important'); expect(tags).toContain('#event'); }); it('detects admin sender', () => { expect(detectTags('Regular message', '1234567890@s.whatsapp.net', ADMINS)) .toContain('#admin'); }); it('returns empty array for untagged message from non-admin', () => { expect(detectTags('Just a regular chat message', 'nobody@s.whatsapp.net', ADMINS)) .toEqual([]); }); it('returns empty array for empty text', () => { expect(detectTags('', 'nobody@s.whatsapp.net', ADMINS)).toEqual([]); }); }); describe('isFlagged', () => { it('returns true when tags array is non-empty', () => { expect(isFlagged(['#important'])).toBe(true); }); it('returns false when tags array is empty', () => { expect(isFlagged([])).toBe(false); }); }); ``` - [ ] **Step 2: Run to verify tests fail** ```bash pnpm --filter @tower/worker test ``` Expected: FAIL — `Cannot find module './tag-detector'` - [ ] **Step 3: Implement apps/worker/src/whatsapp/tag-detector.ts** ```typescript const HASHTAGS: Array<{ pattern: RegExp; tag: string }> = [ { pattern: /#important/i, tag: '#important' }, { pattern: /#upparivar/i, tag: '#upparivar' }, { pattern: /#event/i, tag: '#event' }, ]; export function detectTags(text: string, senderJid: string, adminJids: string[]): string[] { const tags: string[] = []; for (const { pattern, tag } of HASHTAGS) { if (pattern.test(text)) tags.push(tag); } if (text.trimStart().startsWith('/tower')) tags.push('#tower-command'); if (adminJids.includes(senderJid)) tags.push('#admin'); return tags; } export function isFlagged(tags: string[]): boolean { return tags.length > 0; } ``` - [ ] **Step 4: Run tests to verify all 9 pass** ```bash pnpm --filter @tower/worker test ``` Expected: PASS — 9 tests (1 existing smoke test + 8 new) - [ ] **Step 5: Commit** ```bash git add apps/worker/src/whatsapp/ git commit -m "feat: add tag detector for TOWER message flagging" ``` --- ## Task 3: Message Normalizer (TDD) **Files:** - Create: `apps/worker/src/whatsapp/normalizer.ts` - Create: `apps/worker/src/whatsapp/normalizer.test.ts` The normalizer converts a Baileys `proto.IWebMessageInfo` object into a plain `NormalizedMessage`. It returns `null` for messages that should be ignored (protocol messages, own messages, non-group messages). - [ ] **Step 1: Write failing tests** Create `apps/worker/src/whatsapp/normalizer.test.ts`: ```typescript import { proto } from '@whiskeysockets/baileys'; import { normalizeMessage } from './normalizer'; function makeMsg(overrides: Partial = {}): proto.IWebMessageInfo { return { key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: 'ABCDEF123456', participant: '919876543210@s.whatsapp.net', }, pushName: 'Alice', message: { conversation: 'Hello world' }, ...overrides, } as proto.IWebMessageInfo; } describe('normalizeMessage', () => { it('normalizes a plain text conversation message', () => { const result = normalizeMessage(makeMsg()); expect(result).toMatchObject({ platformMsgId: 'ABCDEF123456', sourceGroupJid: '120363043312345678@g.us', senderJid: '919876543210@s.whatsapp.net', senderName: 'Alice', content: 'Hello world', }); }); it('normalizes an extendedTextMessage', () => { const result = normalizeMessage(makeMsg({ message: { extendedTextMessage: { text: 'Extended text here' } }, })); expect(result?.content).toBe('Extended text here'); }); it('normalizes an imageMessage caption', () => { const result = normalizeMessage(makeMsg({ message: { imageMessage: { caption: 'Photo caption' } }, })); expect(result?.content).toBe('Photo caption'); }); it('normalizes a videoMessage caption', () => { const result = normalizeMessage(makeMsg({ message: { videoMessage: { caption: 'Video caption' } }, })); expect(result?.content).toBe('Video caption'); }); it('returns null for own messages (fromMe = true)', () => { const result = normalizeMessage(makeMsg({ key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'XYZ', participant: '919876543210@s.whatsapp.net' } })); expect(result).toBeNull(); }); it('returns null for non-group messages (DMs)', () => { const result = normalizeMessage(makeMsg({ key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'XYZ' }, })); expect(result).toBeNull(); }); it('returns null when message payload is missing', () => { const result = normalizeMessage(makeMsg({ message: null })); expect(result).toBeNull(); }); it('returns null when key is missing', () => { const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo); expect(result).toBeNull(); }); }); ``` - [ ] **Step 2: Run to verify tests fail** ```bash pnpm --filter @tower/worker test ``` Expected: FAIL — `Cannot find module './normalizer'` - [ ] **Step 3: Implement apps/worker/src/whatsapp/normalizer.ts** ```typescript import { proto } from '@whiskeysockets/baileys'; export interface NormalizedMessage { platformMsgId: string; sourceGroupJid: string; senderJid: string; senderName?: string; content: string; } function extractText(msg: proto.IWebMessageInfo): string { const m = msg.message; if (!m) return ''; return ( m.conversation || m.extendedTextMessage?.text || m.imageMessage?.caption || m.videoMessage?.caption || m.documentMessage?.caption || '' ); } export function normalizeMessage(msg: proto.IWebMessageInfo): NormalizedMessage | null { const key = msg.key; if (!key) return null; const remoteJid = key.remoteJid ?? ''; // Only process group messages (group JIDs end with @g.us) if (!remoteJid.endsWith('@g.us')) return null; // Skip our own outgoing messages if (key.fromMe) return null; if (!msg.message) return null; const content = extractText(msg); return { platformMsgId: key.id ?? '', sourceGroupJid: remoteJid, senderJid: key.participant ?? '', senderName: msg.pushName ?? undefined, content, }; } ``` - [ ] **Step 4: Install @whiskeysockets/baileys so types resolve** ```bash pnpm --filter @tower/worker add @whiskeysockets/baileys ``` Expected: adds baileys to apps/worker/package.json dependencies - [ ] **Step 5: Run tests to verify 8 pass (plus existing)** ```bash pnpm --filter @tower/worker test ``` Expected: PASS — 10 tests total - [ ] **Step 6: Commit** ```bash git add apps/worker/src/whatsapp/normalizer.ts apps/worker/src/whatsapp/normalizer.test.ts apps/worker/package.json pnpm-lock.yaml git commit -m "feat: add Baileys message normalizer" ``` --- ## Task 4: BullMQ Ingest Queue + Processor (TDD) **Files:** - Create: `apps/worker/src/queues/ingest.queue.ts` - Create: `apps/worker/src/queues/ingest.processor.ts` - Create: `apps/worker/src/queues/ingest.processor.test.ts` The processor receives an `IngestJobData` job and upserts it into the `Message` table as `PENDING`. The `sourceGroupId` in the job is the DB `Group.id` (already resolved before enqueueing). It uses `upsert` so duplicate messages (same platform + platformMsgId) are ignored. - [ ] **Step 1: Write failing test for the processor logic** Create `apps/worker/src/queues/ingest.processor.test.ts`: ```typescript import { processIngestJob } from './ingest.processor'; import { IngestJobData } from '@tower/types'; const mockPrisma = { message: { upsert: jest.fn(), }, }; const sampleJob: IngestJobData = { platformMsgId: 'WA_MSG_001', platform: 'whatsapp', sourceGroupId: 'clxxxxxx', senderJid: '919876543210@s.whatsapp.net', senderName: 'Alice', content: '#important update from the committee', tags: ['#important'], }; describe('processIngestJob', () => { beforeEach(() => jest.clearAllMocks()); it('upserts a message with PENDING status', async () => { mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' }); await processIngestJob(sampleJob, mockPrisma as any); expect(mockPrisma.message.upsert).toHaveBeenCalledWith({ where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } }, create: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001', sourceGroupId: 'clxxxxxx', senderJid: '919876543210@s.whatsapp.net', senderName: 'Alice', content: '#important update from the committee', tags: ['#important'], status: 'PENDING', }, update: {}, }); }); it('does not throw when message already exists (idempotent upsert)', async () => { mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' }); await expect(processIngestJob(sampleJob, mockPrisma as any)).resolves.not.toThrow(); }); it('propagates DB errors', async () => { mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost')); await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost'); }); }); ``` - [ ] **Step 2: Run to verify tests fail** ```bash pnpm --filter @tower/worker test ``` Expected: FAIL — `Cannot find module './ingest.processor'` - [ ] **Step 3: Implement apps/worker/src/queues/ingest.processor.ts** ```typescript import { Worker } from 'bullmq'; import { PrismaClient } from '@prisma/client'; import IORedis from 'ioredis'; import { IngestJobData } from '@tower/types'; export async function processIngestJob(job: IngestJobData, prisma: PrismaClient): Promise { await prisma.message.upsert({ where: { platform_platformMsgId: { platform: job.platform, platformMsgId: job.platformMsgId, }, }, create: { platform: job.platform, platformMsgId: job.platformMsgId, sourceGroupId: job.sourceGroupId, senderJid: job.senderJid, senderName: job.senderName, content: job.content, tags: job.tags, status: 'PENDING', }, update: {}, }); } export function createIngestWorker(redisUrl: string, prisma: PrismaClient): Worker { const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null }); return new Worker( 'tower:ingest', async (job) => processIngestJob(job.data, prisma), { connection }, ); } ``` - [ ] **Step 4: Implement apps/worker/src/queues/ingest.queue.ts** ```typescript import { Queue } from 'bullmq'; import IORedis from 'ioredis'; import { IngestJobData } from '@tower/types'; export function createIngestQueue(redisUrl: string): Queue { const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null }); return new Queue('tower:ingest', { connection }); } ``` - [ ] **Step 5: Run tests to verify 3 new pass + existing** ```bash pnpm --filter @tower/worker test ``` Expected: PASS — 13 tests total - [ ] **Step 6: Commit** ```bash git add apps/worker/src/queues/ git commit -m "feat: add BullMQ ingest queue and processor" ``` --- ## Task 5: Group Sync (TDD) **Files:** - Create: `apps/worker/src/whatsapp/group-sync.ts` - Create: `apps/worker/src/whatsapp/group-sync.test.ts` On WhatsApp connection, Baileys gives us all groups the bot is in via `sock.groupFetchAllParticipating()`. We upsert each one to the `Group` table. The function returns a `Map` used by the message listener to resolve group IDs. - [ ] **Step 1: Write failing tests** Create `apps/worker/src/whatsapp/group-sync.test.ts`: ```typescript import { syncGroups } from './group-sync'; import { GroupMetadata } from '@whiskeysockets/baileys'; const mockGroups: Record = { '120363043312345678@g.us': { id: '120363043312345678@g.us', subject: 'UP Parivar Dallas', desc: 'Main community group', participants: [], creation: 0, owner: undefined, restrict: false, announce: false, subjectOwner: undefined, subjectTime: 0, size: 0, ephemeralDuration: 0, inviteCode: undefined, }, '999999999@g.us': { id: '999999999@g.us', subject: 'Events Committee', desc: undefined, participants: [], creation: 0, owner: undefined, restrict: false, announce: false, subjectOwner: undefined, subjectTime: 0, size: 0, ephemeralDuration: 0, inviteCode: undefined, }, }; const mockPrisma = { group: { upsert: jest.fn(), }, }; describe('syncGroups', () => { beforeEach(() => jest.clearAllMocks()); it('upserts each group and returns jid→id map', async () => { mockPrisma.group.upsert .mockResolvedValueOnce({ id: 'db-group-1' }) .mockResolvedValueOnce({ id: 'db-group-2' }); const result = await syncGroups(mockGroups, mockPrisma as any); expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2); expect(result.get('120363043312345678@g.us')).toBe('db-group-1'); expect(result.get('999999999@g.us')).toBe('db-group-2'); }); it('calls upsert with correct create payload', async () => { mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' }); await syncGroups( { '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] }, mockPrisma as any, ); expect(mockPrisma.group.upsert).toHaveBeenCalledWith({ where: { platform_platformId: { platform: 'whatsapp', platformId: '120363043312345678@g.us' } }, create: { platform: 'whatsapp', platformId: '120363043312345678@g.us', name: 'UP Parivar Dallas', description: 'Main community group', isActive: true, }, update: { name: 'UP Parivar Dallas', description: 'Main community group' }, }); }); it('handles groups with no description', async () => { mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-2' }); await syncGroups( { '999999999@g.us': mockGroups['999999999@g.us'] }, mockPrisma as any, ); expect(mockPrisma.group.upsert).toHaveBeenCalledWith( expect.objectContaining({ create: expect.objectContaining({ description: undefined }), }), ); }); it('returns an empty map when given empty groups', async () => { const result = await syncGroups({}, mockPrisma as any); expect(result.size).toBe(0); expect(mockPrisma.group.upsert).not.toHaveBeenCalled(); }); }); ``` - [ ] **Step 2: Run to verify tests fail** ```bash pnpm --filter @tower/worker test ``` Expected: FAIL — `Cannot find module './group-sync'` - [ ] **Step 3: Implement apps/worker/src/whatsapp/group-sync.ts** ```typescript import { GroupMetadata } from '@whiskeysockets/baileys'; import { PrismaClient } from '@prisma/client'; export async function syncGroups( groups: Record, prisma: PrismaClient, ): Promise> { const jidToDbId = new Map(); for (const [jid, meta] of Object.entries(groups)) { const group = await prisma.group.upsert({ where: { platform_platformId: { platform: 'whatsapp', platformId: jid } }, create: { platform: 'whatsapp', platformId: jid, name: meta.subject, description: meta.desc ?? undefined, isActive: true, }, update: { name: meta.subject, description: meta.desc ?? undefined }, }); jidToDbId.set(jid, group.id); } return jidToDbId; } ``` - [ ] **Step 4: Run tests to verify 4 new pass + existing** ```bash pnpm --filter @tower/worker test ``` Expected: PASS — 17 tests total - [ ] **Step 5: Commit** ```bash git add apps/worker/src/whatsapp/group-sync.ts apps/worker/src/whatsapp/group-sync.test.ts git commit -m "feat: add WhatsApp group sync to database" ``` --- ## Task 6: WhatsApp Session **Files:** - Create: `apps/worker/src/whatsapp/session.ts` The session module wraps Baileys' `makeWASocket`. It manages auth state, reconnection on disconnect, and calls provided callbacks for groups (on connection) and messages (on upsert). No unit test for the session itself — it wraps a live network connection. The integration is verified end-to-end in Task 8. - [ ] **Step 1: Create apps/worker/src/whatsapp/session.ts** ```typescript import makeWASocket, { useMultiFileAuthState, fetchLatestBaileysVersion, DisconnectReason, WASocket, proto, GroupMetadata, } from '@whiskeysockets/baileys'; import { Boom } from '@hapi/boom'; import { createLogger } from '@tower/logger'; const logger = createLogger('whatsapp-session'); export type OnMessageCallback = (msg: proto.IWebMessageInfo) => void; export type OnGroupsCallback = (groups: Record) => void; export async function createWhatsAppSession( sessionPath: string, onMessage: OnMessageCallback, onGroups: OnGroupsCallback, ): Promise { const { state, saveCreds } = await useMultiFileAuthState(sessionPath); const { version } = await fetchLatestBaileysVersion(); const sock = makeWASocket({ version, auth: state, printQRInTerminal: true, logger: logger as any, }); sock.ev.on('creds.update', saveCreds); sock.ev.on('connection.update', async ({ connection, lastDisconnect }) => { if (connection === 'close') { const reason = (lastDisconnect?.error as Boom)?.output?.statusCode; const shouldReconnect = reason !== DisconnectReason.loggedOut; logger.info({ reason, shouldReconnect }, 'Connection closed'); if (shouldReconnect) { logger.info('Reconnecting in 5s...'); setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000); } } else if (connection === 'open') { logger.info('WhatsApp connected'); const groups = await sock.groupFetchAllParticipating(); onGroups(groups); } }); sock.ev.on('messages.upsert', ({ messages, type }) => { if (type !== 'notify') return; for (const msg of messages) { onMessage(msg); } }); return sock; } ``` - [ ] **Step 2: Verify TypeScript compiles** ```bash pnpm --filter @tower/worker build ``` Expected: exit 0 with no type errors - [ ] **Step 3: Commit** ```bash git add apps/worker/src/whatsapp/session.ts git commit -m "feat: add Baileys WhatsApp session with reconnect logic" ``` --- ## Task 7: Add Prisma to Worker + Update .env **Files:** - Modify: `apps/worker/package.json` - Modify: `apps/worker/jest.config.js` - Modify: `turbo.json` - Modify: `.env.example` - Modify: `.gitignore` - [ ] **Step 1: Add @prisma/client and prisma to worker** ```bash pnpm --filter @tower/worker add @prisma/client pnpm --filter @tower/worker add --save-dev prisma ``` - [ ] **Step 2: Add generate script to apps/worker/package.json** The `generate` script tells Prisma where to find the schema (in apps/api). After this step, `package.json` scripts section should be: ```json "scripts": { "generate": "prisma generate --schema=../api/prisma/schema.prisma", "build": "tsc", "dev": "ts-node src/main.ts", "start": "node dist/main.js", "test": "jest" } ``` - [ ] **Step 3: Add generate task to turbo.json** Replace `turbo.json` content: ```json { "$schema": "https://turbo.build/schema.json", "tasks": { "generate": { "cache": false }, "build": { "dependsOn": ["generate", "^build"], "outputs": ["dist/**", ".next/**"] }, "dev": { "cache": false, "persistent": true }, "test": { "dependsOn": ["^build"], "outputs": ["coverage/**"] }, "lint": {} } } ``` - [ ] **Step 4: Run prisma generate for worker** ```bash pnpm --filter @tower/worker generate ``` Expected: `✔ Generated Prisma Client` with no errors - [ ] **Step 5: Update apps/worker/jest.config.js to load .env** Replace the entire file: ```javascript const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '../../.env') }); module.exports = { preset: 'ts-jest', testEnvironment: 'node', testMatch: ['**/*.test.ts'], rootDir: 'src', }; ``` - [ ] **Step 6: Add dotenv dev dependency to worker** ```bash pnpm --filter @tower/worker add --save-dev dotenv ``` - [ ] **Step 7: Update .env.example — add new vars** Add these lines to `/Users/maaz/Documents/insignia-work/tower/.env.example`: ```bash # WhatsApp WHATSAPP_SESSION_PATH=./sessions TOWER_ADMIN_JIDS= ``` Also add to `.env` (the actual file, NOT committed): ```bash WHATSAPP_SESSION_PATH=./sessions TOWER_ADMIN_JIDS= ``` - [ ] **Step 8: Add sessions/ to .gitignore** Add this line to `.gitignore`: ``` sessions/ ``` - [ ] **Step 9: Run worker tests to verify still passing** ```bash pnpm --filter @tower/worker test ``` Expected: PASS — 17 tests passing - [ ] **Step 10: Commit** ```bash git add apps/worker/package.json apps/worker/jest.config.js turbo.json .env.example .gitignore pnpm-lock.yaml git commit -m "chore: add Prisma client to worker, turbo generate task, update env" ``` --- ## Task 8: Wire Worker Bootstrap **Files:** - Modify: `apps/worker/src/main.ts` Wire together session → normalizer → tag-detector → queue so the worker fully processes incoming messages. - [ ] **Step 1: Replace apps/worker/src/main.ts** ```typescript import { PrismaClient } from '@prisma/client'; import { createLogger } from '@tower/logger'; import { validateEnv } from '@tower/config'; import { IngestJobData } from '@tower/types'; import { createWhatsAppSession } from './whatsapp/session'; import { normalizeMessage } from './whatsapp/normalizer'; import { detectTags, isFlagged } from './whatsapp/tag-detector'; import { syncGroups } from './whatsapp/group-sync'; import { createIngestQueue } from './queues/ingest.queue'; import { createIngestWorker } from './queues/ingest.processor'; const logger = createLogger('tower-worker'); async function bootstrap() { const env = validateEnv(); const prisma = new PrismaClient(); await prisma.$connect(); const adminJids = env.TOWER_ADMIN_JIDS ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) : []; const ingestQueue = createIngestQueue(env.REDIS_URL); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); ingestWorker.on('completed', (job) => { logger.info({ jobId: job.id }, 'Ingest job completed'); }); ingestWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, err }, 'Ingest job failed'); }); // jid→dbId map populated on WA connection let groupMap = new Map(); await createWhatsAppSession( env.WHATSAPP_SESSION_PATH, async (msg) => { const normalized = normalizeMessage(msg); if (!normalized) return; const tags = detectTags(normalized.content, normalized.senderJid, adminJids); if (!isFlagged(tags)) return; const sourceGroupId = groupMap.get(normalized.sourceGroupJid); if (!sourceGroupId) { logger.warn({ jid: normalized.sourceGroupJid }, 'Unknown group — skipping message'); return; } const jobData: IngestJobData = { platformMsgId: normalized.platformMsgId, platform: 'whatsapp', sourceGroupId, senderJid: normalized.senderJid, senderName: normalized.senderName, content: normalized.content, tags, }; await ingestQueue.add('ingest', jobData, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, }); logger.info({ platformMsgId: normalized.platformMsgId, tags }, 'Message enqueued'); }, async (groups) => { logger.info({ count: Object.keys(groups).length }, 'Syncing groups'); groupMap = await syncGroups(groups, prisma); logger.info({ count: groupMap.size }, 'Groups synced'); }, ); logger.info('Tower worker ready'); const shutdown = async () => { logger.info('Shutting down...'); await ingestWorker.close(); await ingestQueue.close(); await prisma.$disconnect(); process.exit(0); }; process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); } bootstrap().catch((err) => { console.error('Worker failed to start', err); process.exit(1); }); ``` - [ ] **Step 2: Build worker to verify no type errors** ```bash pnpm --filter @tower/worker build ``` Expected: exit 0 with no TypeScript errors - [ ] **Step 3: Run worker tests** ```bash pnpm --filter @tower/worker test ``` Expected: PASS — all tests still passing - [ ] **Step 4: Commit** ```bash git add apps/worker/src/main.ts git commit -m "feat: wire worker bootstrap — session → normalizer → queue pipeline" ``` --- ## Task 9: Turborepo Full Smoke Test **Files:** None new — verify the full pipeline - [ ] **Step 1: Ensure Docker is running** ```bash docker compose up -d ``` Verify postgres (port 5433) and redis (port 6379) are healthy: ```bash docker compose ps ``` Expected: postgres, redis, meilisearch all show `Up` - [ ] **Step 2: Run full build** ```bash pnpm build ``` Expected: ``` Tasks: 8 successful, 8 total ``` - [ ] **Step 3: Run full test suite** ```bash pnpm test ``` Expected: ``` Tasks: 8 successful, 8 total ``` All packages must pass. If `@tower/api#test` fails with `DATABASE_URL not found`, verify `.env` has the correct `DATABASE_URL=postgresql://tower:tower_dev@localhost:5433/tower_dev`. - [ ] **Step 4: Commit any fixes, then tag the milestone** ```bash git add -A git commit -m "chore: turborepo smoke test — all 8 packages build and test clean" ``` --- ## Self-Review **Spec coverage:** - ✅ Baileys connection with QR-based auth and session persistence - ✅ Group discovery on connect → upserted to `Group` table - ✅ Message normalization (text, extended text, image/video captions) - ✅ Tag detection: `#important`, `#upparivar`, `#event`, `/tower` command, admin sender - ✅ BullMQ `tower:ingest` queue — durability + retry logic - ✅ Message persistence as `PENDING` records (idempotent via upsert) - ✅ Graceful shutdown (SIGTERM/SIGINT) - ⏭️ ⭐ admin reaction handling — deferred to Plan 3 (requires message store) - ⏭️ Media download — deferred to Plan 4 (requires Cloudflare R2 / MinIO) **Type consistency check:** - `NormalizedMessage.sourceGroupJid` → used in `groupMap.get()` in main.ts ✅ - `IngestJobData.sourceGroupId` → DB Group.id (resolved from groupMap) ✅ - `syncGroups` returns `Map` (jid → db id) ✅ - `processIngestJob` uses `platform_platformMsgId` compound unique key (matches Prisma schema `@@unique([platform, platformMsgId])`) ✅ - `group.upsert` uses `platform_platformId` compound key (matches `@@unique([platform, platformId])`) ✅