d33b4e40b8
Replaces DisconnectReason enum import with type-only WASocket import and uses 401 directly instead of DisconnectReason.loggedOut. Baileys is an ES module that cannot be executed in Jest's CommonJS mode, so removing the value import (keeping only type imports) prevents ts-jest from trying to execute the module. Also updated session-pool.test.ts to verify end() is called with the expected Boom error object instead of undefined. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1191 lines
34 KiB
Markdown
1191 lines
34 KiB
Markdown
# WhatsApp Integration Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Connect `apps/worker` to WhatsApp via Baileys, normalize and tag-detect incoming messages, sync groups to the DB, and persist flagged messages as `PENDING` records ready for the approval workflow in Plan 3.
|
|
|
|
**Architecture:** The worker process holds a long-lived Baileys WebSocket connection. Incoming messages are normalized to a canonical shape, checked for TOWER tags (hashtags, `/tower` command), and pushed to a BullMQ `tower:ingest` queue. A BullMQ processor in the same worker process consumes the queue and upserts `Message` records to PostgreSQL with `PENDING` status. The NestJS API is not involved — the worker writes directly to the DB via Prisma Client.
|
|
|
|
**Tech Stack:** `@whiskeysockets/baileys ^6.0.0`, `bullmq ^5` (already in worker), `ioredis ^5` (already in worker), `@prisma/client ^6` (shared from apps/api schema), `pino` (via @tower/logger), Turborepo `generate` task
|
|
|
|
---
|
|
|
|
## File Map
|
|
|
|
| Action | Path | Purpose |
|
|
|--------|------|---------|
|
|
| Modify | `packages/config/src/index.ts` | Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS |
|
|
| Modify | `packages/config/src/index.test.ts` | Tests for new config fields |
|
|
| Modify | `packages/types/src/message.ts` | Add IngestJobData interface |
|
|
| Create | `apps/worker/src/whatsapp/tag-detector.ts` | Detect TOWER tags from message text + sender |
|
|
| Create | `apps/worker/src/whatsapp/tag-detector.test.ts` | Unit tests |
|
|
| Create | `apps/worker/src/whatsapp/normalizer.ts` | Convert Baileys proto → NormalizedMessage |
|
|
| Create | `apps/worker/src/whatsapp/normalizer.test.ts` | Unit tests |
|
|
| Create | `apps/worker/src/whatsapp/group-sync.ts` | Upsert WA groups into DB on connection |
|
|
| Create | `apps/worker/src/whatsapp/group-sync.test.ts` | Unit tests with mocked Prisma |
|
|
| Create | `apps/worker/src/queues/ingest.queue.ts` | BullMQ Queue producer factory |
|
|
| Create | `apps/worker/src/queues/ingest.processor.ts` | BullMQ Worker consumer — persists Message to DB |
|
|
| Create | `apps/worker/src/queues/ingest.processor.test.ts` | Unit tests with mocked Prisma |
|
|
| Create | `apps/worker/src/whatsapp/session.ts` | Baileys socket factory |
|
|
| Modify | `apps/worker/src/main.ts` | Wire session → normalizer → tag-detector → queue |
|
|
| Modify | `apps/worker/package.json` | Add @whiskeysockets/baileys, @prisma/client, prisma |
|
|
| Modify | `apps/worker/jest.config.js` | Load .env for Prisma |
|
|
| Modify | `turbo.json` | Add generate task |
|
|
| Modify | `.env.example` | Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS |
|
|
| Modify | `.gitignore` | Ignore sessions/ directory |
|
|
|
|
---
|
|
|
|
## Task 1: Extend @tower/config and @tower/types
|
|
|
|
**Files:**
|
|
- Modify: `packages/config/src/index.ts`
|
|
- Modify: `packages/config/src/index.test.ts`
|
|
- Modify: `packages/types/src/message.ts`
|
|
|
|
- [ ] **Step 1: Write failing tests for new config fields**
|
|
|
|
Add these two tests to `packages/config/src/index.test.ts` inside the existing `validateEnv` describe block:
|
|
|
|
```typescript
|
|
it('applies default WHATSAPP_SESSION_PATH of ./sessions when not set', () => {
|
|
const env = {
|
|
DATABASE_URL: 'postgresql://user:pass@localhost:5432/db',
|
|
REDIS_URL: 'redis://localhost:6379',
|
|
JWT_SECRET: 'a'.repeat(32),
|
|
};
|
|
const result = validateEnv(env as NodeJS.ProcessEnv);
|
|
expect(result.WHATSAPP_SESSION_PATH).toBe('./sessions');
|
|
});
|
|
|
|
it('applies default TOWER_ADMIN_JIDS of empty string when not set', () => {
|
|
const env = {
|
|
DATABASE_URL: 'postgresql://user:pass@localhost:5432/db',
|
|
REDIS_URL: 'redis://localhost:6379',
|
|
JWT_SECRET: 'a'.repeat(32),
|
|
};
|
|
const result = validateEnv(env as NodeJS.ProcessEnv);
|
|
expect(result.TOWER_ADMIN_JIDS).toBe('');
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify tests fail**
|
|
|
|
```bash
|
|
pnpm --filter @tower/config test
|
|
```
|
|
|
|
Expected: FAIL — `Property 'WHATSAPP_SESSION_PATH' does not exist on type 'Env'`
|
|
|
|
- [ ] **Step 3: Update packages/config/src/index.ts**
|
|
|
|
Replace the entire file content:
|
|
|
|
```typescript
|
|
import { z } from 'zod';
|
|
|
|
const envSchema = z.object({
|
|
NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
|
|
DATABASE_URL: z.string().url(),
|
|
REDIS_URL: z.string().url(),
|
|
API_PORT: z.coerce.number().default(3001),
|
|
JWT_SECRET: z.string().min(32),
|
|
MEILI_URL: z.string().url().default('http://localhost:7700'),
|
|
MEILI_MASTER_KEY: z.string().default('tower_meili_dev_key'),
|
|
LOG_LEVEL: z.enum(['trace', 'debug', 'info', 'warn', 'error']).default('info'),
|
|
WHATSAPP_SESSION_PATH: z.string().default('./sessions'),
|
|
TOWER_ADMIN_JIDS: z.string().default(''),
|
|
});
|
|
|
|
export type Env = z.infer<typeof envSchema>;
|
|
|
|
export function validateEnv(env: NodeJS.ProcessEnv = process.env): Env {
|
|
const result = envSchema.safeParse(env);
|
|
if (!result.success) {
|
|
console.error('Invalid environment variables:', result.error.format());
|
|
throw new Error('Invalid environment variables');
|
|
}
|
|
return result.data;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify 7 pass**
|
|
|
|
```bash
|
|
pnpm --filter @tower/config test
|
|
```
|
|
|
|
Expected: PASS — 7 tests total (5 existing + 2 new)
|
|
|
|
- [ ] **Step 5: Add IngestJobData to packages/types/src/message.ts**
|
|
|
|
Add at the end of the file:
|
|
|
|
```typescript
|
|
export interface IngestJobData {
|
|
platformMsgId: string;
|
|
platform: Platform;
|
|
sourceGroupId: string;
|
|
senderJid: string;
|
|
senderName?: string;
|
|
content: string;
|
|
tags: string[];
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6: Build both packages**
|
|
|
|
```bash
|
|
pnpm --filter @tower/config build
|
|
pnpm --filter @tower/types build
|
|
```
|
|
|
|
Expected: both exit 0 with no errors
|
|
|
|
- [ ] **Step 7: Commit**
|
|
|
|
```bash
|
|
git add packages/config/src/index.ts packages/config/src/index.test.ts packages/types/src/message.ts
|
|
git commit -m "feat: add WhatsApp config fields and IngestJobData type"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 2: Tag Detector (TDD)
|
|
|
|
**Files:**
|
|
- Create: `apps/worker/src/whatsapp/tag-detector.ts`
|
|
- Create: `apps/worker/src/whatsapp/tag-detector.test.ts`
|
|
|
|
The tag detector is a pure function — no I/O, no network, no DB. It takes message text and a sender JID, and returns an array of string tags. A message is "flagged" (should be ingested) if the tags array is non-empty.
|
|
|
|
Tag rules:
|
|
- Text contains `#important` (case-insensitive) → tag `#important`
|
|
- Text contains `#upparivar` (case-insensitive) → tag `#upparivar`
|
|
- Text contains `#event` (case-insensitive) → tag `#event`
|
|
- Text starts with `/tower` → tag `#tower-command`
|
|
- Sender JID is in the admin list → tag `#admin`
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
Create `apps/worker/src/whatsapp/tag-detector.test.ts`:
|
|
|
|
```typescript
|
|
import { detectTags, isFlagged } from './tag-detector';
|
|
|
|
const ADMINS = ['1234567890@s.whatsapp.net', '0987654321@s.whatsapp.net'];
|
|
|
|
describe('detectTags', () => {
|
|
it('detects #important hashtag (case-insensitive)', () => {
|
|
expect(detectTags('Check this #IMPORTANT update', 'user@s.whatsapp.net', ADMINS))
|
|
.toContain('#important');
|
|
});
|
|
|
|
it('detects #upparivar hashtag (case-insensitive)', () => {
|
|
expect(detectTags('Welcome to #UPParivar community', 'user@s.whatsapp.net', ADMINS))
|
|
.toContain('#upparivar');
|
|
});
|
|
|
|
it('detects #event hashtag', () => {
|
|
expect(detectTags('Join our #event on Saturday', 'user@s.whatsapp.net', ADMINS))
|
|
.toContain('#event');
|
|
});
|
|
|
|
it('detects /tower command prefix', () => {
|
|
expect(detectTags('/tower save this message', 'user@s.whatsapp.net', ADMINS))
|
|
.toContain('#tower-command');
|
|
});
|
|
|
|
it('detects multiple tags in one message', () => {
|
|
const tags = detectTags('#important #event happening', 'user@s.whatsapp.net', ADMINS);
|
|
expect(tags).toContain('#important');
|
|
expect(tags).toContain('#event');
|
|
});
|
|
|
|
it('detects admin sender', () => {
|
|
expect(detectTags('Regular message', '1234567890@s.whatsapp.net', ADMINS))
|
|
.toContain('#admin');
|
|
});
|
|
|
|
it('returns empty array for untagged message from non-admin', () => {
|
|
expect(detectTags('Just a regular chat message', 'nobody@s.whatsapp.net', ADMINS))
|
|
.toEqual([]);
|
|
});
|
|
|
|
it('returns empty array for empty text', () => {
|
|
expect(detectTags('', 'nobody@s.whatsapp.net', ADMINS)).toEqual([]);
|
|
});
|
|
});
|
|
|
|
describe('isFlagged', () => {
|
|
it('returns true when tags array is non-empty', () => {
|
|
expect(isFlagged(['#important'])).toBe(true);
|
|
});
|
|
|
|
it('returns false when tags array is empty', () => {
|
|
expect(isFlagged([])).toBe(false);
|
|
});
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify tests fail**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: FAIL — `Cannot find module './tag-detector'`
|
|
|
|
- [ ] **Step 3: Implement apps/worker/src/whatsapp/tag-detector.ts**
|
|
|
|
```typescript
|
|
const HASHTAGS: Array<{ pattern: RegExp; tag: string }> = [
|
|
{ pattern: /#important/i, tag: '#important' },
|
|
{ pattern: /#upparivar/i, tag: '#upparivar' },
|
|
{ pattern: /#event/i, tag: '#event' },
|
|
];
|
|
|
|
export function detectTags(text: string, senderJid: string, adminJids: string[]): string[] {
|
|
const tags: string[] = [];
|
|
|
|
for (const { pattern, tag } of HASHTAGS) {
|
|
if (pattern.test(text)) tags.push(tag);
|
|
}
|
|
|
|
if (text.trimStart().startsWith('/tower')) tags.push('#tower-command');
|
|
|
|
if (adminJids.includes(senderJid)) tags.push('#admin');
|
|
|
|
return tags;
|
|
}
|
|
|
|
export function isFlagged(tags: string[]): boolean {
|
|
return tags.length > 0;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify all 9 pass**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: PASS — 9 tests (1 existing smoke test + 8 new)
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/src/whatsapp/
|
|
git commit -m "feat: add tag detector for TOWER message flagging"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 3: Message Normalizer (TDD)
|
|
|
|
**Files:**
|
|
- Create: `apps/worker/src/whatsapp/normalizer.ts`
|
|
- Create: `apps/worker/src/whatsapp/normalizer.test.ts`
|
|
|
|
The normalizer converts a Baileys `proto.IWebMessageInfo` object into a plain `NormalizedMessage`. It returns `null` for messages that should be ignored (protocol messages, own messages, non-group messages).
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
Create `apps/worker/src/whatsapp/normalizer.test.ts`:
|
|
|
|
```typescript
|
|
import { proto } from '@whiskeysockets/baileys';
|
|
import { normalizeMessage } from './normalizer';
|
|
|
|
function makeMsg(overrides: Partial<proto.IWebMessageInfo> = {}): proto.IWebMessageInfo {
|
|
return {
|
|
key: {
|
|
remoteJid: '120363043312345678@g.us',
|
|
fromMe: false,
|
|
id: 'ABCDEF123456',
|
|
participant: '919876543210@s.whatsapp.net',
|
|
},
|
|
pushName: 'Alice',
|
|
message: { conversation: 'Hello world' },
|
|
...overrides,
|
|
} as proto.IWebMessageInfo;
|
|
}
|
|
|
|
describe('normalizeMessage', () => {
|
|
it('normalizes a plain text conversation message', () => {
|
|
const result = normalizeMessage(makeMsg());
|
|
expect(result).toMatchObject({
|
|
platformMsgId: 'ABCDEF123456',
|
|
sourceGroupJid: '120363043312345678@g.us',
|
|
senderJid: '919876543210@s.whatsapp.net',
|
|
senderName: 'Alice',
|
|
content: 'Hello world',
|
|
});
|
|
});
|
|
|
|
it('normalizes an extendedTextMessage', () => {
|
|
const result = normalizeMessage(makeMsg({
|
|
message: { extendedTextMessage: { text: 'Extended text here' } },
|
|
}));
|
|
expect(result?.content).toBe('Extended text here');
|
|
});
|
|
|
|
it('normalizes an imageMessage caption', () => {
|
|
const result = normalizeMessage(makeMsg({
|
|
message: { imageMessage: { caption: 'Photo caption' } },
|
|
}));
|
|
expect(result?.content).toBe('Photo caption');
|
|
});
|
|
|
|
it('normalizes a videoMessage caption', () => {
|
|
const result = normalizeMessage(makeMsg({
|
|
message: { videoMessage: { caption: 'Video caption' } },
|
|
}));
|
|
expect(result?.content).toBe('Video caption');
|
|
});
|
|
|
|
it('returns null for own messages (fromMe = true)', () => {
|
|
const result = normalizeMessage(makeMsg({ key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'XYZ', participant: '919876543210@s.whatsapp.net' } }));
|
|
expect(result).toBeNull();
|
|
});
|
|
|
|
it('returns null for non-group messages (DMs)', () => {
|
|
const result = normalizeMessage(makeMsg({
|
|
key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'XYZ' },
|
|
}));
|
|
expect(result).toBeNull();
|
|
});
|
|
|
|
it('returns null when message payload is missing', () => {
|
|
const result = normalizeMessage(makeMsg({ message: null }));
|
|
expect(result).toBeNull();
|
|
});
|
|
|
|
it('returns null when key is missing', () => {
|
|
const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo);
|
|
expect(result).toBeNull();
|
|
});
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify tests fail**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: FAIL — `Cannot find module './normalizer'`
|
|
|
|
- [ ] **Step 3: Implement apps/worker/src/whatsapp/normalizer.ts**
|
|
|
|
```typescript
|
|
import { proto } from '@whiskeysockets/baileys';
|
|
|
|
export interface NormalizedMessage {
|
|
platformMsgId: string;
|
|
sourceGroupJid: string;
|
|
senderJid: string;
|
|
senderName?: string;
|
|
content: string;
|
|
}
|
|
|
|
function extractText(msg: proto.IWebMessageInfo): string {
|
|
const m = msg.message;
|
|
if (!m) return '';
|
|
return (
|
|
m.conversation ||
|
|
m.extendedTextMessage?.text ||
|
|
m.imageMessage?.caption ||
|
|
m.videoMessage?.caption ||
|
|
m.documentMessage?.caption ||
|
|
''
|
|
);
|
|
}
|
|
|
|
export function normalizeMessage(msg: proto.IWebMessageInfo): NormalizedMessage | null {
|
|
const key = msg.key;
|
|
if (!key) return null;
|
|
|
|
const remoteJid = key.remoteJid ?? '';
|
|
|
|
// Only process group messages (group JIDs end with @g.us)
|
|
if (!remoteJid.endsWith('@g.us')) return null;
|
|
|
|
// Skip our own outgoing messages
|
|
if (key.fromMe) return null;
|
|
|
|
if (!msg.message) return null;
|
|
|
|
const content = extractText(msg);
|
|
|
|
return {
|
|
platformMsgId: key.id ?? '',
|
|
sourceGroupJid: remoteJid,
|
|
senderJid: key.participant ?? '',
|
|
senderName: msg.pushName ?? undefined,
|
|
content,
|
|
};
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Install @whiskeysockets/baileys so types resolve**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker add @whiskeysockets/baileys
|
|
```
|
|
|
|
Expected: adds baileys to apps/worker/package.json dependencies
|
|
|
|
- [ ] **Step 5: Run tests to verify 8 pass (plus existing)**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: PASS — 10 tests total
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/src/whatsapp/normalizer.ts apps/worker/src/whatsapp/normalizer.test.ts apps/worker/package.json pnpm-lock.yaml
|
|
git commit -m "feat: add Baileys message normalizer"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 4: BullMQ Ingest Queue + Processor (TDD)
|
|
|
|
**Files:**
|
|
- Create: `apps/worker/src/queues/ingest.queue.ts`
|
|
- Create: `apps/worker/src/queues/ingest.processor.ts`
|
|
- Create: `apps/worker/src/queues/ingest.processor.test.ts`
|
|
|
|
The processor receives an `IngestJobData` job and upserts it into the `Message` table as `PENDING`. The `sourceGroupId` in the job is the DB `Group.id` (already resolved before enqueueing). It uses `upsert` so duplicate messages (same platform + platformMsgId) are ignored.
|
|
|
|
- [ ] **Step 1: Write failing test for the processor logic**
|
|
|
|
Create `apps/worker/src/queues/ingest.processor.test.ts`:
|
|
|
|
```typescript
|
|
import { processIngestJob } from './ingest.processor';
|
|
import { IngestJobData } from '@tower/types';
|
|
|
|
const mockPrisma = {
|
|
message: {
|
|
upsert: jest.fn(),
|
|
},
|
|
};
|
|
|
|
const sampleJob: IngestJobData = {
|
|
platformMsgId: 'WA_MSG_001',
|
|
platform: 'whatsapp',
|
|
sourceGroupId: 'clxxxxxx',
|
|
senderJid: '919876543210@s.whatsapp.net',
|
|
senderName: 'Alice',
|
|
content: '#important update from the committee',
|
|
tags: ['#important'],
|
|
};
|
|
|
|
describe('processIngestJob', () => {
|
|
beforeEach(() => jest.clearAllMocks());
|
|
|
|
it('upserts a message with PENDING status', async () => {
|
|
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
|
|
|
|
await processIngestJob(sampleJob, mockPrisma as any);
|
|
|
|
expect(mockPrisma.message.upsert).toHaveBeenCalledWith({
|
|
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } },
|
|
create: {
|
|
platform: 'whatsapp',
|
|
platformMsgId: 'WA_MSG_001',
|
|
sourceGroupId: 'clxxxxxx',
|
|
senderJid: '919876543210@s.whatsapp.net',
|
|
senderName: 'Alice',
|
|
content: '#important update from the committee',
|
|
tags: ['#important'],
|
|
status: 'PENDING',
|
|
},
|
|
update: {},
|
|
});
|
|
});
|
|
|
|
it('does not throw when message already exists (idempotent upsert)', async () => {
|
|
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
|
|
await expect(processIngestJob(sampleJob, mockPrisma as any)).resolves.not.toThrow();
|
|
});
|
|
|
|
it('propagates DB errors', async () => {
|
|
mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost'));
|
|
await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost');
|
|
});
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify tests fail**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: FAIL — `Cannot find module './ingest.processor'`
|
|
|
|
- [ ] **Step 3: Implement apps/worker/src/queues/ingest.processor.ts**
|
|
|
|
```typescript
|
|
import { Worker } from 'bullmq';
|
|
import { PrismaClient } from '@prisma/client';
|
|
import IORedis from 'ioredis';
|
|
import { IngestJobData } from '@tower/types';
|
|
|
|
export async function processIngestJob(job: IngestJobData, prisma: PrismaClient): Promise<void> {
|
|
await prisma.message.upsert({
|
|
where: {
|
|
platform_platformMsgId: {
|
|
platform: job.platform,
|
|
platformMsgId: job.platformMsgId,
|
|
},
|
|
},
|
|
create: {
|
|
platform: job.platform,
|
|
platformMsgId: job.platformMsgId,
|
|
sourceGroupId: job.sourceGroupId,
|
|
senderJid: job.senderJid,
|
|
senderName: job.senderName,
|
|
content: job.content,
|
|
tags: job.tags,
|
|
status: 'PENDING',
|
|
},
|
|
update: {},
|
|
});
|
|
}
|
|
|
|
export function createIngestWorker(redisUrl: string, prisma: PrismaClient): Worker<IngestJobData> {
|
|
const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null });
|
|
return new Worker<IngestJobData>(
|
|
'tower:ingest',
|
|
async (job) => processIngestJob(job.data, prisma),
|
|
{ connection },
|
|
);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Implement apps/worker/src/queues/ingest.queue.ts**
|
|
|
|
```typescript
|
|
import { Queue } from 'bullmq';
|
|
import IORedis from 'ioredis';
|
|
import { IngestJobData } from '@tower/types';
|
|
|
|
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
|
|
const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null });
|
|
return new Queue<IngestJobData>('tower:ingest', { connection });
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5: Run tests to verify 3 new pass + existing**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: PASS — 13 tests total
|
|
|
|
- [ ] **Step 6: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/src/queues/
|
|
git commit -m "feat: add BullMQ ingest queue and processor"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 5: Group Sync (TDD)
|
|
|
|
**Files:**
|
|
- Create: `apps/worker/src/whatsapp/group-sync.ts`
|
|
- Create: `apps/worker/src/whatsapp/group-sync.test.ts`
|
|
|
|
On WhatsApp connection, Baileys gives us all groups the bot is in via `sock.groupFetchAllParticipating()`. We upsert each one to the `Group` table. The function returns a `Map<waGroupJid, dbGroupId>` used by the message listener to resolve group IDs.
|
|
|
|
- [ ] **Step 1: Write failing tests**
|
|
|
|
Create `apps/worker/src/whatsapp/group-sync.test.ts`:
|
|
|
|
```typescript
|
|
import { syncGroups } from './group-sync';
|
|
import { GroupMetadata } from '@whiskeysockets/baileys';
|
|
|
|
const mockGroups: Record<string, GroupMetadata> = {
|
|
'120363043312345678@g.us': {
|
|
id: '120363043312345678@g.us',
|
|
subject: 'UP Parivar Dallas',
|
|
desc: 'Main community group',
|
|
participants: [],
|
|
creation: 0,
|
|
owner: undefined,
|
|
restrict: false,
|
|
announce: false,
|
|
subjectOwner: undefined,
|
|
subjectTime: 0,
|
|
size: 0,
|
|
ephemeralDuration: 0,
|
|
inviteCode: undefined,
|
|
},
|
|
'999999999@g.us': {
|
|
id: '999999999@g.us',
|
|
subject: 'Events Committee',
|
|
desc: undefined,
|
|
participants: [],
|
|
creation: 0,
|
|
owner: undefined,
|
|
restrict: false,
|
|
announce: false,
|
|
subjectOwner: undefined,
|
|
subjectTime: 0,
|
|
size: 0,
|
|
ephemeralDuration: 0,
|
|
inviteCode: undefined,
|
|
},
|
|
};
|
|
|
|
const mockPrisma = {
|
|
group: {
|
|
upsert: jest.fn(),
|
|
},
|
|
};
|
|
|
|
describe('syncGroups', () => {
|
|
beforeEach(() => jest.clearAllMocks());
|
|
|
|
it('upserts each group and returns jid→id map', async () => {
|
|
mockPrisma.group.upsert
|
|
.mockResolvedValueOnce({ id: 'db-group-1' })
|
|
.mockResolvedValueOnce({ id: 'db-group-2' });
|
|
|
|
const result = await syncGroups(mockGroups, mockPrisma as any);
|
|
|
|
expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2);
|
|
expect(result.get('120363043312345678@g.us')).toBe('db-group-1');
|
|
expect(result.get('999999999@g.us')).toBe('db-group-2');
|
|
});
|
|
|
|
it('calls upsert with correct create payload', async () => {
|
|
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
|
|
|
|
await syncGroups(
|
|
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
|
|
mockPrisma as any,
|
|
);
|
|
|
|
expect(mockPrisma.group.upsert).toHaveBeenCalledWith({
|
|
where: { platform_platformId: { platform: 'whatsapp', platformId: '120363043312345678@g.us' } },
|
|
create: {
|
|
platform: 'whatsapp',
|
|
platformId: '120363043312345678@g.us',
|
|
name: 'UP Parivar Dallas',
|
|
description: 'Main community group',
|
|
isActive: true,
|
|
},
|
|
update: { name: 'UP Parivar Dallas', description: 'Main community group' },
|
|
});
|
|
});
|
|
|
|
it('handles groups with no description', async () => {
|
|
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-2' });
|
|
|
|
await syncGroups(
|
|
{ '999999999@g.us': mockGroups['999999999@g.us'] },
|
|
mockPrisma as any,
|
|
);
|
|
|
|
expect(mockPrisma.group.upsert).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
create: expect.objectContaining({ description: undefined }),
|
|
}),
|
|
);
|
|
});
|
|
|
|
it('returns an empty map when given empty groups', async () => {
|
|
const result = await syncGroups({}, mockPrisma as any);
|
|
expect(result.size).toBe(0);
|
|
expect(mockPrisma.group.upsert).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2: Run to verify tests fail**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: FAIL — `Cannot find module './group-sync'`
|
|
|
|
- [ ] **Step 3: Implement apps/worker/src/whatsapp/group-sync.ts**
|
|
|
|
```typescript
|
|
import { GroupMetadata } from '@whiskeysockets/baileys';
|
|
import { PrismaClient } from '@prisma/client';
|
|
|
|
export async function syncGroups(
|
|
groups: Record<string, GroupMetadata>,
|
|
prisma: PrismaClient,
|
|
): Promise<Map<string, string>> {
|
|
const jidToDbId = new Map<string, string>();
|
|
|
|
for (const [jid, meta] of Object.entries(groups)) {
|
|
const group = await prisma.group.upsert({
|
|
where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
|
|
create: {
|
|
platform: 'whatsapp',
|
|
platformId: jid,
|
|
name: meta.subject,
|
|
description: meta.desc ?? undefined,
|
|
isActive: true,
|
|
},
|
|
update: { name: meta.subject, description: meta.desc ?? undefined },
|
|
});
|
|
jidToDbId.set(jid, group.id);
|
|
}
|
|
|
|
return jidToDbId;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify 4 new pass + existing**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: PASS — 17 tests total
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/src/whatsapp/group-sync.ts apps/worker/src/whatsapp/group-sync.test.ts
|
|
git commit -m "feat: add WhatsApp group sync to database"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 6: WhatsApp Session
|
|
|
|
**Files:**
|
|
- Create: `apps/worker/src/whatsapp/session.ts`
|
|
|
|
The session module wraps Baileys' `makeWASocket`. It manages auth state, reconnection on disconnect, and calls provided callbacks for groups (on connection) and messages (on upsert).
|
|
|
|
No unit test for the session itself — it wraps a live network connection. The integration is verified end-to-end in Task 8.
|
|
|
|
- [ ] **Step 1: Create apps/worker/src/whatsapp/session.ts**
|
|
|
|
```typescript
|
|
import makeWASocket, {
|
|
useMultiFileAuthState,
|
|
fetchLatestBaileysVersion,
|
|
DisconnectReason,
|
|
WASocket,
|
|
proto,
|
|
GroupMetadata,
|
|
} from '@whiskeysockets/baileys';
|
|
import { Boom } from '@hapi/boom';
|
|
import { createLogger } from '@tower/logger';
|
|
|
|
const logger = createLogger('whatsapp-session');
|
|
|
|
export type OnMessageCallback = (msg: proto.IWebMessageInfo) => void;
|
|
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => void;
|
|
|
|
export async function createWhatsAppSession(
|
|
sessionPath: string,
|
|
onMessage: OnMessageCallback,
|
|
onGroups: OnGroupsCallback,
|
|
): Promise<WASocket> {
|
|
const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
|
|
const { version } = await fetchLatestBaileysVersion();
|
|
|
|
const sock = makeWASocket({
|
|
version,
|
|
auth: state,
|
|
printQRInTerminal: true,
|
|
logger: logger as any,
|
|
});
|
|
|
|
sock.ev.on('creds.update', saveCreds);
|
|
|
|
sock.ev.on('connection.update', async ({ connection, lastDisconnect }) => {
|
|
if (connection === 'close') {
|
|
const reason = (lastDisconnect?.error as Boom)?.output?.statusCode;
|
|
const shouldReconnect = reason !== DisconnectReason.loggedOut;
|
|
logger.info({ reason, shouldReconnect }, 'Connection closed');
|
|
if (shouldReconnect) {
|
|
logger.info('Reconnecting in 5s...');
|
|
setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000);
|
|
}
|
|
} else if (connection === 'open') {
|
|
logger.info('WhatsApp connected');
|
|
const groups = await sock.groupFetchAllParticipating();
|
|
onGroups(groups);
|
|
}
|
|
});
|
|
|
|
sock.ev.on('messages.upsert', ({ messages, type }) => {
|
|
if (type !== 'notify') return;
|
|
for (const msg of messages) {
|
|
onMessage(msg);
|
|
}
|
|
});
|
|
|
|
return sock;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Verify TypeScript compiles**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker build
|
|
```
|
|
|
|
Expected: exit 0 with no type errors
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/src/whatsapp/session.ts
|
|
git commit -m "feat: add Baileys WhatsApp session with reconnect logic"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 7: Add Prisma to Worker + Update .env
|
|
|
|
**Files:**
|
|
- Modify: `apps/worker/package.json`
|
|
- Modify: `apps/worker/jest.config.js`
|
|
- Modify: `turbo.json`
|
|
- Modify: `.env.example`
|
|
- Modify: `.gitignore`
|
|
|
|
- [ ] **Step 1: Add @prisma/client and prisma to worker**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker add @prisma/client
|
|
pnpm --filter @tower/worker add --save-dev prisma
|
|
```
|
|
|
|
- [ ] **Step 2: Add generate script to apps/worker/package.json**
|
|
|
|
The `generate` script tells Prisma where to find the schema (in apps/api). After this step, `package.json` scripts section should be:
|
|
|
|
```json
|
|
"scripts": {
|
|
"generate": "prisma generate --schema=../api/prisma/schema.prisma",
|
|
"build": "tsc",
|
|
"dev": "ts-node src/main.ts",
|
|
"start": "node dist/main.js",
|
|
"test": "jest"
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Add generate task to turbo.json**
|
|
|
|
Replace `turbo.json` content:
|
|
|
|
```json
|
|
{
|
|
"$schema": "https://turbo.build/schema.json",
|
|
"tasks": {
|
|
"generate": {
|
|
"cache": false
|
|
},
|
|
"build": {
|
|
"dependsOn": ["generate", "^build"],
|
|
"outputs": ["dist/**", ".next/**"]
|
|
},
|
|
"dev": {
|
|
"cache": false,
|
|
"persistent": true
|
|
},
|
|
"test": {
|
|
"dependsOn": ["^build"],
|
|
"outputs": ["coverage/**"]
|
|
},
|
|
"lint": {}
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run prisma generate for worker**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker generate
|
|
```
|
|
|
|
Expected: `✔ Generated Prisma Client` with no errors
|
|
|
|
- [ ] **Step 5: Update apps/worker/jest.config.js to load .env**
|
|
|
|
Replace the entire file:
|
|
|
|
```javascript
|
|
const path = require('path');
|
|
require('dotenv').config({ path: path.resolve(__dirname, '../../.env') });
|
|
|
|
module.exports = {
|
|
preset: 'ts-jest',
|
|
testEnvironment: 'node',
|
|
testMatch: ['**/*.test.ts'],
|
|
rootDir: 'src',
|
|
};
|
|
```
|
|
|
|
- [ ] **Step 6: Add dotenv dev dependency to worker**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker add --save-dev dotenv
|
|
```
|
|
|
|
- [ ] **Step 7: Update .env.example — add new vars**
|
|
|
|
Add these lines to `/Users/maaz/Documents/insignia-work/tower/.env.example`:
|
|
|
|
```bash
|
|
# WhatsApp
|
|
WHATSAPP_SESSION_PATH=./sessions
|
|
TOWER_ADMIN_JIDS=
|
|
```
|
|
|
|
Also add to `.env` (the actual file, NOT committed):
|
|
|
|
```bash
|
|
WHATSAPP_SESSION_PATH=./sessions
|
|
TOWER_ADMIN_JIDS=
|
|
```
|
|
|
|
- [ ] **Step 8: Add sessions/ to .gitignore**
|
|
|
|
Add this line to `.gitignore`:
|
|
|
|
```
|
|
sessions/
|
|
```
|
|
|
|
- [ ] **Step 9: Run worker tests to verify still passing**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: PASS — 17 tests passing
|
|
|
|
- [ ] **Step 10: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/package.json apps/worker/jest.config.js turbo.json .env.example .gitignore pnpm-lock.yaml
|
|
git commit -m "chore: add Prisma client to worker, turbo generate task, update env"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 8: Wire Worker Bootstrap
|
|
|
|
**Files:**
|
|
- Modify: `apps/worker/src/main.ts`
|
|
|
|
Wire together session → normalizer → tag-detector → queue so the worker fully processes incoming messages.
|
|
|
|
- [ ] **Step 1: Replace apps/worker/src/main.ts**
|
|
|
|
```typescript
|
|
import { PrismaClient } from '@prisma/client';
|
|
import { createLogger } from '@tower/logger';
|
|
import { validateEnv } from '@tower/config';
|
|
import { IngestJobData } from '@tower/types';
|
|
import { createWhatsAppSession } from './whatsapp/session';
|
|
import { normalizeMessage } from './whatsapp/normalizer';
|
|
import { detectTags, isFlagged } from './whatsapp/tag-detector';
|
|
import { syncGroups } from './whatsapp/group-sync';
|
|
import { createIngestQueue } from './queues/ingest.queue';
|
|
import { createIngestWorker } from './queues/ingest.processor';
|
|
|
|
const logger = createLogger('tower-worker');
|
|
|
|
async function bootstrap() {
|
|
const env = validateEnv();
|
|
const prisma = new PrismaClient();
|
|
await prisma.$connect();
|
|
|
|
const adminJids = env.TOWER_ADMIN_JIDS
|
|
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
|
|
: [];
|
|
|
|
const ingestQueue = createIngestQueue(env.REDIS_URL);
|
|
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
|
|
|
|
ingestWorker.on('completed', (job) => {
|
|
logger.info({ jobId: job.id }, 'Ingest job completed');
|
|
});
|
|
ingestWorker.on('failed', (job, err) => {
|
|
logger.error({ jobId: job?.id, err }, 'Ingest job failed');
|
|
});
|
|
|
|
// jid→dbId map populated on WA connection
|
|
let groupMap = new Map<string, string>();
|
|
|
|
await createWhatsAppSession(
|
|
env.WHATSAPP_SESSION_PATH,
|
|
async (msg) => {
|
|
const normalized = normalizeMessage(msg);
|
|
if (!normalized) return;
|
|
|
|
const tags = detectTags(normalized.content, normalized.senderJid, adminJids);
|
|
if (!isFlagged(tags)) return;
|
|
|
|
const sourceGroupId = groupMap.get(normalized.sourceGroupJid);
|
|
if (!sourceGroupId) {
|
|
logger.warn({ jid: normalized.sourceGroupJid }, 'Unknown group — skipping message');
|
|
return;
|
|
}
|
|
|
|
const jobData: IngestJobData = {
|
|
platformMsgId: normalized.platformMsgId,
|
|
platform: 'whatsapp',
|
|
sourceGroupId,
|
|
senderJid: normalized.senderJid,
|
|
senderName: normalized.senderName,
|
|
content: normalized.content,
|
|
tags,
|
|
};
|
|
|
|
await ingestQueue.add('ingest', jobData, {
|
|
attempts: 3,
|
|
backoff: { type: 'exponential', delay: 1000 },
|
|
});
|
|
|
|
logger.info({ platformMsgId: normalized.platformMsgId, tags }, 'Message enqueued');
|
|
},
|
|
async (groups) => {
|
|
logger.info({ count: Object.keys(groups).length }, 'Syncing groups');
|
|
groupMap = await syncGroups(groups, prisma);
|
|
logger.info({ count: groupMap.size }, 'Groups synced');
|
|
},
|
|
);
|
|
|
|
logger.info('Tower worker ready');
|
|
|
|
const shutdown = async () => {
|
|
logger.info('Shutting down...');
|
|
await ingestWorker.close();
|
|
await ingestQueue.close();
|
|
await prisma.$disconnect();
|
|
process.exit(0);
|
|
};
|
|
|
|
process.on('SIGTERM', shutdown);
|
|
process.on('SIGINT', shutdown);
|
|
}
|
|
|
|
bootstrap().catch((err) => {
|
|
console.error('Worker failed to start', err);
|
|
process.exit(1);
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2: Build worker to verify no type errors**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker build
|
|
```
|
|
|
|
Expected: exit 0 with no TypeScript errors
|
|
|
|
- [ ] **Step 3: Run worker tests**
|
|
|
|
```bash
|
|
pnpm --filter @tower/worker test
|
|
```
|
|
|
|
Expected: PASS — all tests still passing
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add apps/worker/src/main.ts
|
|
git commit -m "feat: wire worker bootstrap — session → normalizer → queue pipeline"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 9: Turborepo Full Smoke Test
|
|
|
|
**Files:** None new — verify the full pipeline
|
|
|
|
- [ ] **Step 1: Ensure Docker is running**
|
|
|
|
```bash
|
|
docker compose up -d
|
|
```
|
|
|
|
Verify postgres (port 5433) and redis (port 6379) are healthy:
|
|
|
|
```bash
|
|
docker compose ps
|
|
```
|
|
|
|
Expected: postgres, redis, meilisearch all show `Up`
|
|
|
|
- [ ] **Step 2: Run full build**
|
|
|
|
```bash
|
|
pnpm build
|
|
```
|
|
|
|
Expected:
|
|
```
|
|
Tasks: 8 successful, 8 total
|
|
```
|
|
|
|
- [ ] **Step 3: Run full test suite**
|
|
|
|
```bash
|
|
pnpm test
|
|
```
|
|
|
|
Expected:
|
|
```
|
|
Tasks: 8 successful, 8 total
|
|
```
|
|
|
|
All packages must pass. If `@tower/api#test` fails with `DATABASE_URL not found`, verify `.env` has the correct `DATABASE_URL=postgresql://tower:tower_dev@localhost:5433/tower_dev`.
|
|
|
|
- [ ] **Step 4: Commit any fixes, then tag the milestone**
|
|
|
|
```bash
|
|
git add -A
|
|
git commit -m "chore: turborepo smoke test — all 8 packages build and test clean"
|
|
```
|
|
|
|
---
|
|
|
|
## Self-Review
|
|
|
|
**Spec coverage:**
|
|
- ✅ Baileys connection with QR-based auth and session persistence
|
|
- ✅ Group discovery on connect → upserted to `Group` table
|
|
- ✅ Message normalization (text, extended text, image/video captions)
|
|
- ✅ Tag detection: `#important`, `#upparivar`, `#event`, `/tower` command, admin sender
|
|
- ✅ BullMQ `tower:ingest` queue — durability + retry logic
|
|
- ✅ Message persistence as `PENDING` records (idempotent via upsert)
|
|
- ✅ Graceful shutdown (SIGTERM/SIGINT)
|
|
- ⏭️ ⭐ admin reaction handling — deferred to Plan 3 (requires message store)
|
|
- ⏭️ Media download — deferred to Plan 4 (requires Cloudflare R2 / MinIO)
|
|
|
|
**Type consistency check:**
|
|
- `NormalizedMessage.sourceGroupJid` → used in `groupMap.get()` in main.ts ✅
|
|
- `IngestJobData.sourceGroupId` → DB Group.id (resolved from groupMap) ✅
|
|
- `syncGroups` returns `Map<string, string>` (jid → db id) ✅
|
|
- `processIngestJob` uses `platform_platformMsgId` compound unique key (matches Prisma schema `@@unique([platform, platformMsgId])`) ✅
|
|
- `group.upsert` uses `platform_platformId` compound key (matches `@@unique([platform, platformId])`) ✅
|