Files
tower/docs/superpowers/plans/2026-05-27-whatsapp-integration.md
maaz519 d33b4e40b8 fix: use type-only Baileys import and raw status code to fix Jest ESM issue
Replaces DisconnectReason enum import with type-only WASocket import and
uses 401 directly instead of DisconnectReason.loggedOut. Baileys is an ES
module that cannot be executed in Jest's CommonJS mode, so removing the
value import (keeping only type imports) prevents ts-jest from trying to
execute the module.

Also updated session-pool.test.ts to verify end() is called with the
expected Boom error object instead of undefined.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 17:40:24 +05:30

34 KiB

WhatsApp Integration Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Connect apps/worker to WhatsApp via Baileys, normalize and tag-detect incoming messages, sync groups to the DB, and persist flagged messages as PENDING records ready for the approval workflow in Plan 3.

Architecture: The worker process holds a long-lived Baileys WebSocket connection. Incoming messages are normalized to a canonical shape, checked for TOWER tags (hashtags, /tower command), and pushed to a BullMQ tower:ingest queue. A BullMQ processor in the same worker process consumes the queue and upserts Message records to PostgreSQL with PENDING status. The NestJS API is not involved — the worker writes directly to the DB via Prisma Client.

Tech Stack: @whiskeysockets/baileys ^6.0.0, bullmq ^5 (already in worker), ioredis ^5 (already in worker), @prisma/client ^6 (shared from apps/api schema), pino (via @tower/logger), Turborepo generate task


File Map

Action Path Purpose
Modify packages/config/src/index.ts Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS
Modify packages/config/src/index.test.ts Tests for new config fields
Modify packages/types/src/message.ts Add IngestJobData interface
Create apps/worker/src/whatsapp/tag-detector.ts Detect TOWER tags from message text + sender
Create apps/worker/src/whatsapp/tag-detector.test.ts Unit tests
Create apps/worker/src/whatsapp/normalizer.ts Convert Baileys proto → NormalizedMessage
Create apps/worker/src/whatsapp/normalizer.test.ts Unit tests
Create apps/worker/src/whatsapp/group-sync.ts Upsert WA groups into DB on connection
Create apps/worker/src/whatsapp/group-sync.test.ts Unit tests with mocked Prisma
Create apps/worker/src/queues/ingest.queue.ts BullMQ Queue producer factory
Create apps/worker/src/queues/ingest.processor.ts BullMQ Worker consumer — persists Message to DB
Create apps/worker/src/queues/ingest.processor.test.ts Unit tests with mocked Prisma
Create apps/worker/src/whatsapp/session.ts Baileys socket factory
Modify apps/worker/src/main.ts Wire session → normalizer → tag-detector → queue
Modify apps/worker/package.json Add @whiskeysockets/baileys, @prisma/client, prisma
Modify apps/worker/jest.config.js Load .env for Prisma
Modify turbo.json Add generate task
Modify .env.example Add WHATSAPP_SESSION_PATH, TOWER_ADMIN_JIDS
Modify .gitignore Ignore sessions/ directory

Task 1: Extend @tower/config and @tower/types

Files:

  • Modify: packages/config/src/index.ts

  • Modify: packages/config/src/index.test.ts

  • Modify: packages/types/src/message.ts

  • Step 1: Write failing tests for new config fields

Add these two tests to packages/config/src/index.test.ts inside the existing validateEnv describe block:

it('applies default WHATSAPP_SESSION_PATH of ./sessions when not set', () => {
  const env = {
    DATABASE_URL: 'postgresql://user:pass@localhost:5432/db',
    REDIS_URL: 'redis://localhost:6379',
    JWT_SECRET: 'a'.repeat(32),
  };
  const result = validateEnv(env as NodeJS.ProcessEnv);
  expect(result.WHATSAPP_SESSION_PATH).toBe('./sessions');
});

it('applies default TOWER_ADMIN_JIDS of empty string when not set', () => {
  const env = {
    DATABASE_URL: 'postgresql://user:pass@localhost:5432/db',
    REDIS_URL: 'redis://localhost:6379',
    JWT_SECRET: 'a'.repeat(32),
  };
  const result = validateEnv(env as NodeJS.ProcessEnv);
  expect(result.TOWER_ADMIN_JIDS).toBe('');
});
  • Step 2: Run to verify tests fail
pnpm --filter @tower/config test

Expected: FAIL — Property 'WHATSAPP_SESSION_PATH' does not exist on type 'Env'

  • Step 3: Update packages/config/src/index.ts

Replace the entire file content:

import { z } from 'zod';

const envSchema = z.object({
  NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
  DATABASE_URL: z.string().url(),
  REDIS_URL: z.string().url(),
  API_PORT: z.coerce.number().default(3001),
  JWT_SECRET: z.string().min(32),
  MEILI_URL: z.string().url().default('http://localhost:7700'),
  MEILI_MASTER_KEY: z.string().default('tower_meili_dev_key'),
  LOG_LEVEL: z.enum(['trace', 'debug', 'info', 'warn', 'error']).default('info'),
  WHATSAPP_SESSION_PATH: z.string().default('./sessions'),
  TOWER_ADMIN_JIDS: z.string().default(''),
});

export type Env = z.infer<typeof envSchema>;

export function validateEnv(env: NodeJS.ProcessEnv = process.env): Env {
  const result = envSchema.safeParse(env);
  if (!result.success) {
    console.error('Invalid environment variables:', result.error.format());
    throw new Error('Invalid environment variables');
  }
  return result.data;
}
  • Step 4: Run tests to verify 7 pass
pnpm --filter @tower/config test

Expected: PASS — 7 tests total (5 existing + 2 new)

  • Step 5: Add IngestJobData to packages/types/src/message.ts

Add at the end of the file:

export interface IngestJobData {
  platformMsgId: string;
  platform: Platform;
  sourceGroupId: string;
  senderJid: string;
  senderName?: string;
  content: string;
  tags: string[];
}
  • Step 6: Build both packages
pnpm --filter @tower/config build
pnpm --filter @tower/types build

Expected: both exit 0 with no errors

  • Step 7: Commit
git add packages/config/src/index.ts packages/config/src/index.test.ts packages/types/src/message.ts
git commit -m "feat: add WhatsApp config fields and IngestJobData type"

Task 2: Tag Detector (TDD)

Files:

  • Create: apps/worker/src/whatsapp/tag-detector.ts
  • Create: apps/worker/src/whatsapp/tag-detector.test.ts

The tag detector is a pure function — no I/O, no network, no DB. It takes message text and a sender JID, and returns an array of string tags. A message is "flagged" (should be ingested) if the tags array is non-empty.

Tag rules:

  • Text contains #important (case-insensitive) → tag #important

  • Text contains #upparivar (case-insensitive) → tag #upparivar

  • Text contains #event (case-insensitive) → tag #event

  • Text starts with /tower → tag #tower-command

  • Sender JID is in the admin list → tag #admin

  • Step 1: Write failing tests

Create apps/worker/src/whatsapp/tag-detector.test.ts:

import { detectTags, isFlagged } from './tag-detector';

const ADMINS = ['1234567890@s.whatsapp.net', '0987654321@s.whatsapp.net'];

describe('detectTags', () => {
  it('detects #important hashtag (case-insensitive)', () => {
    expect(detectTags('Check this #IMPORTANT update', 'user@s.whatsapp.net', ADMINS))
      .toContain('#important');
  });

  it('detects #upparivar hashtag (case-insensitive)', () => {
    expect(detectTags('Welcome to #UPParivar community', 'user@s.whatsapp.net', ADMINS))
      .toContain('#upparivar');
  });

  it('detects #event hashtag', () => {
    expect(detectTags('Join our #event on Saturday', 'user@s.whatsapp.net', ADMINS))
      .toContain('#event');
  });

  it('detects /tower command prefix', () => {
    expect(detectTags('/tower save this message', 'user@s.whatsapp.net', ADMINS))
      .toContain('#tower-command');
  });

  it('detects multiple tags in one message', () => {
    const tags = detectTags('#important #event happening', 'user@s.whatsapp.net', ADMINS);
    expect(tags).toContain('#important');
    expect(tags).toContain('#event');
  });

  it('detects admin sender', () => {
    expect(detectTags('Regular message', '1234567890@s.whatsapp.net', ADMINS))
      .toContain('#admin');
  });

  it('returns empty array for untagged message from non-admin', () => {
    expect(detectTags('Just a regular chat message', 'nobody@s.whatsapp.net', ADMINS))
      .toEqual([]);
  });

  it('returns empty array for empty text', () => {
    expect(detectTags('', 'nobody@s.whatsapp.net', ADMINS)).toEqual([]);
  });
});

describe('isFlagged', () => {
  it('returns true when tags array is non-empty', () => {
    expect(isFlagged(['#important'])).toBe(true);
  });

  it('returns false when tags array is empty', () => {
    expect(isFlagged([])).toBe(false);
  });
});
  • Step 2: Run to verify tests fail
pnpm --filter @tower/worker test

Expected: FAIL — Cannot find module './tag-detector'

  • Step 3: Implement apps/worker/src/whatsapp/tag-detector.ts
const HASHTAGS: Array<{ pattern: RegExp; tag: string }> = [
  { pattern: /#important/i, tag: '#important' },
  { pattern: /#upparivar/i, tag: '#upparivar' },
  { pattern: /#event/i, tag: '#event' },
];

export function detectTags(text: string, senderJid: string, adminJids: string[]): string[] {
  const tags: string[] = [];

  for (const { pattern, tag } of HASHTAGS) {
    if (pattern.test(text)) tags.push(tag);
  }

  if (text.trimStart().startsWith('/tower')) tags.push('#tower-command');

  if (adminJids.includes(senderJid)) tags.push('#admin');

  return tags;
}

export function isFlagged(tags: string[]): boolean {
  return tags.length > 0;
}
  • Step 4: Run tests to verify all 9 pass
pnpm --filter @tower/worker test

Expected: PASS — 9 tests (1 existing smoke test + 8 new)

  • Step 5: Commit
git add apps/worker/src/whatsapp/
git commit -m "feat: add tag detector for TOWER message flagging"

Task 3: Message Normalizer (TDD)

Files:

  • Create: apps/worker/src/whatsapp/normalizer.ts
  • Create: apps/worker/src/whatsapp/normalizer.test.ts

The normalizer converts a Baileys proto.IWebMessageInfo object into a plain NormalizedMessage. It returns null for messages that should be ignored (protocol messages, own messages, non-group messages).

  • Step 1: Write failing tests

Create apps/worker/src/whatsapp/normalizer.test.ts:

import { proto } from '@whiskeysockets/baileys';
import { normalizeMessage } from './normalizer';

function makeMsg(overrides: Partial<proto.IWebMessageInfo> = {}): proto.IWebMessageInfo {
  return {
    key: {
      remoteJid: '120363043312345678@g.us',
      fromMe: false,
      id: 'ABCDEF123456',
      participant: '919876543210@s.whatsapp.net',
    },
    pushName: 'Alice',
    message: { conversation: 'Hello world' },
    ...overrides,
  } as proto.IWebMessageInfo;
}

describe('normalizeMessage', () => {
  it('normalizes a plain text conversation message', () => {
    const result = normalizeMessage(makeMsg());
    expect(result).toMatchObject({
      platformMsgId: 'ABCDEF123456',
      sourceGroupJid: '120363043312345678@g.us',
      senderJid: '919876543210@s.whatsapp.net',
      senderName: 'Alice',
      content: 'Hello world',
    });
  });

  it('normalizes an extendedTextMessage', () => {
    const result = normalizeMessage(makeMsg({
      message: { extendedTextMessage: { text: 'Extended text here' } },
    }));
    expect(result?.content).toBe('Extended text here');
  });

  it('normalizes an imageMessage caption', () => {
    const result = normalizeMessage(makeMsg({
      message: { imageMessage: { caption: 'Photo caption' } },
    }));
    expect(result?.content).toBe('Photo caption');
  });

  it('normalizes a videoMessage caption', () => {
    const result = normalizeMessage(makeMsg({
      message: { videoMessage: { caption: 'Video caption' } },
    }));
    expect(result?.content).toBe('Video caption');
  });

  it('returns null for own messages (fromMe = true)', () => {
    const result = normalizeMessage(makeMsg({ key: { remoteJid: '120363043312345678@g.us', fromMe: true, id: 'XYZ', participant: '919876543210@s.whatsapp.net' } }));
    expect(result).toBeNull();
  });

  it('returns null for non-group messages (DMs)', () => {
    const result = normalizeMessage(makeMsg({
      key: { remoteJid: '919876543210@s.whatsapp.net', fromMe: false, id: 'XYZ' },
    }));
    expect(result).toBeNull();
  });

  it('returns null when message payload is missing', () => {
    const result = normalizeMessage(makeMsg({ message: null }));
    expect(result).toBeNull();
  });

  it('returns null when key is missing', () => {
    const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo);
    expect(result).toBeNull();
  });
});
  • Step 2: Run to verify tests fail
pnpm --filter @tower/worker test

Expected: FAIL — Cannot find module './normalizer'

  • Step 3: Implement apps/worker/src/whatsapp/normalizer.ts
import { proto } from '@whiskeysockets/baileys';

export interface NormalizedMessage {
  platformMsgId: string;
  sourceGroupJid: string;
  senderJid: string;
  senderName?: string;
  content: string;
}

function extractText(msg: proto.IWebMessageInfo): string {
  const m = msg.message;
  if (!m) return '';
  return (
    m.conversation ||
    m.extendedTextMessage?.text ||
    m.imageMessage?.caption ||
    m.videoMessage?.caption ||
    m.documentMessage?.caption ||
    ''
  );
}

export function normalizeMessage(msg: proto.IWebMessageInfo): NormalizedMessage | null {
  const key = msg.key;
  if (!key) return null;

  const remoteJid = key.remoteJid ?? '';

  // Only process group messages (group JIDs end with @g.us)
  if (!remoteJid.endsWith('@g.us')) return null;

  // Skip our own outgoing messages
  if (key.fromMe) return null;

  if (!msg.message) return null;

  const content = extractText(msg);

  return {
    platformMsgId: key.id ?? '',
    sourceGroupJid: remoteJid,
    senderJid: key.participant ?? '',
    senderName: msg.pushName ?? undefined,
    content,
  };
}
  • Step 4: Install @whiskeysockets/baileys so types resolve
pnpm --filter @tower/worker add @whiskeysockets/baileys

Expected: adds baileys to apps/worker/package.json dependencies

  • Step 5: Run tests to verify 8 pass (plus existing)
pnpm --filter @tower/worker test

Expected: PASS — 10 tests total

  • Step 6: Commit
git add apps/worker/src/whatsapp/normalizer.ts apps/worker/src/whatsapp/normalizer.test.ts apps/worker/package.json pnpm-lock.yaml
git commit -m "feat: add Baileys message normalizer"

Task 4: BullMQ Ingest Queue + Processor (TDD)

Files:

  • Create: apps/worker/src/queues/ingest.queue.ts
  • Create: apps/worker/src/queues/ingest.processor.ts
  • Create: apps/worker/src/queues/ingest.processor.test.ts

The processor receives an IngestJobData job and upserts it into the Message table as PENDING. The sourceGroupId in the job is the DB Group.id (already resolved before enqueueing). It uses upsert so duplicate messages (same platform + platformMsgId) are ignored.

  • Step 1: Write failing test for the processor logic

Create apps/worker/src/queues/ingest.processor.test.ts:

import { processIngestJob } from './ingest.processor';
import { IngestJobData } from '@tower/types';

const mockPrisma = {
  message: {
    upsert: jest.fn(),
  },
};

const sampleJob: IngestJobData = {
  platformMsgId: 'WA_MSG_001',
  platform: 'whatsapp',
  sourceGroupId: 'clxxxxxx',
  senderJid: '919876543210@s.whatsapp.net',
  senderName: 'Alice',
  content: '#important update from the committee',
  tags: ['#important'],
};

describe('processIngestJob', () => {
  beforeEach(() => jest.clearAllMocks());

  it('upserts a message with PENDING status', async () => {
    mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });

    await processIngestJob(sampleJob, mockPrisma as any);

    expect(mockPrisma.message.upsert).toHaveBeenCalledWith({
      where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } },
      create: {
        platform: 'whatsapp',
        platformMsgId: 'WA_MSG_001',
        sourceGroupId: 'clxxxxxx',
        senderJid: '919876543210@s.whatsapp.net',
        senderName: 'Alice',
        content: '#important update from the committee',
        tags: ['#important'],
        status: 'PENDING',
      },
      update: {},
    });
  });

  it('does not throw when message already exists (idempotent upsert)', async () => {
    mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
    await expect(processIngestJob(sampleJob, mockPrisma as any)).resolves.not.toThrow();
  });

  it('propagates DB errors', async () => {
    mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost'));
    await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost');
  });
});
  • Step 2: Run to verify tests fail
pnpm --filter @tower/worker test

Expected: FAIL — Cannot find module './ingest.processor'

  • Step 3: Implement apps/worker/src/queues/ingest.processor.ts
import { Worker } from 'bullmq';
import { PrismaClient } from '@prisma/client';
import IORedis from 'ioredis';
import { IngestJobData } from '@tower/types';

export async function processIngestJob(job: IngestJobData, prisma: PrismaClient): Promise<void> {
  await prisma.message.upsert({
    where: {
      platform_platformMsgId: {
        platform: job.platform,
        platformMsgId: job.platformMsgId,
      },
    },
    create: {
      platform: job.platform,
      platformMsgId: job.platformMsgId,
      sourceGroupId: job.sourceGroupId,
      senderJid: job.senderJid,
      senderName: job.senderName,
      content: job.content,
      tags: job.tags,
      status: 'PENDING',
    },
    update: {},
  });
}

export function createIngestWorker(redisUrl: string, prisma: PrismaClient): Worker<IngestJobData> {
  const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null });
  return new Worker<IngestJobData>(
    'tower:ingest',
    async (job) => processIngestJob(job.data, prisma),
    { connection },
  );
}
  • Step 4: Implement apps/worker/src/queues/ingest.queue.ts
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
import { IngestJobData } from '@tower/types';

export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
  const connection = new IORedis(redisUrl, { maxRetriesPerRequest: null });
  return new Queue<IngestJobData>('tower:ingest', { connection });
}
  • Step 5: Run tests to verify 3 new pass + existing
pnpm --filter @tower/worker test

Expected: PASS — 13 tests total

  • Step 6: Commit
git add apps/worker/src/queues/
git commit -m "feat: add BullMQ ingest queue and processor"

Task 5: Group Sync (TDD)

Files:

  • Create: apps/worker/src/whatsapp/group-sync.ts
  • Create: apps/worker/src/whatsapp/group-sync.test.ts

On WhatsApp connection, Baileys gives us all groups the bot is in via sock.groupFetchAllParticipating(). We upsert each one to the Group table. The function returns a Map<waGroupJid, dbGroupId> used by the message listener to resolve group IDs.

  • Step 1: Write failing tests

Create apps/worker/src/whatsapp/group-sync.test.ts:

import { syncGroups } from './group-sync';
import { GroupMetadata } from '@whiskeysockets/baileys';

const mockGroups: Record<string, GroupMetadata> = {
  '120363043312345678@g.us': {
    id: '120363043312345678@g.us',
    subject: 'UP Parivar Dallas',
    desc: 'Main community group',
    participants: [],
    creation: 0,
    owner: undefined,
    restrict: false,
    announce: false,
    subjectOwner: undefined,
    subjectTime: 0,
    size: 0,
    ephemeralDuration: 0,
    inviteCode: undefined,
  },
  '999999999@g.us': {
    id: '999999999@g.us',
    subject: 'Events Committee',
    desc: undefined,
    participants: [],
    creation: 0,
    owner: undefined,
    restrict: false,
    announce: false,
    subjectOwner: undefined,
    subjectTime: 0,
    size: 0,
    ephemeralDuration: 0,
    inviteCode: undefined,
  },
};

const mockPrisma = {
  group: {
    upsert: jest.fn(),
  },
};

describe('syncGroups', () => {
  beforeEach(() => jest.clearAllMocks());

  it('upserts each group and returns jid→id map', async () => {
    mockPrisma.group.upsert
      .mockResolvedValueOnce({ id: 'db-group-1' })
      .mockResolvedValueOnce({ id: 'db-group-2' });

    const result = await syncGroups(mockGroups, mockPrisma as any);

    expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2);
    expect(result.get('120363043312345678@g.us')).toBe('db-group-1');
    expect(result.get('999999999@g.us')).toBe('db-group-2');
  });

  it('calls upsert with correct create payload', async () => {
    mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });

    await syncGroups(
      { '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
      mockPrisma as any,
    );

    expect(mockPrisma.group.upsert).toHaveBeenCalledWith({
      where: { platform_platformId: { platform: 'whatsapp', platformId: '120363043312345678@g.us' } },
      create: {
        platform: 'whatsapp',
        platformId: '120363043312345678@g.us',
        name: 'UP Parivar Dallas',
        description: 'Main community group',
        isActive: true,
      },
      update: { name: 'UP Parivar Dallas', description: 'Main community group' },
    });
  });

  it('handles groups with no description', async () => {
    mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-2' });

    await syncGroups(
      { '999999999@g.us': mockGroups['999999999@g.us'] },
      mockPrisma as any,
    );

    expect(mockPrisma.group.upsert).toHaveBeenCalledWith(
      expect.objectContaining({
        create: expect.objectContaining({ description: undefined }),
      }),
    );
  });

  it('returns an empty map when given empty groups', async () => {
    const result = await syncGroups({}, mockPrisma as any);
    expect(result.size).toBe(0);
    expect(mockPrisma.group.upsert).not.toHaveBeenCalled();
  });
});
  • Step 2: Run to verify tests fail
pnpm --filter @tower/worker test

Expected: FAIL — Cannot find module './group-sync'

  • Step 3: Implement apps/worker/src/whatsapp/group-sync.ts
import { GroupMetadata } from '@whiskeysockets/baileys';
import { PrismaClient } from '@prisma/client';

export async function syncGroups(
  groups: Record<string, GroupMetadata>,
  prisma: PrismaClient,
): Promise<Map<string, string>> {
  const jidToDbId = new Map<string, string>();

  for (const [jid, meta] of Object.entries(groups)) {
    const group = await prisma.group.upsert({
      where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
      create: {
        platform: 'whatsapp',
        platformId: jid,
        name: meta.subject,
        description: meta.desc ?? undefined,
        isActive: true,
      },
      update: { name: meta.subject, description: meta.desc ?? undefined },
    });
    jidToDbId.set(jid, group.id);
  }

  return jidToDbId;
}
  • Step 4: Run tests to verify 4 new pass + existing
pnpm --filter @tower/worker test

Expected: PASS — 17 tests total

  • Step 5: Commit
git add apps/worker/src/whatsapp/group-sync.ts apps/worker/src/whatsapp/group-sync.test.ts
git commit -m "feat: add WhatsApp group sync to database"

Task 6: WhatsApp Session

Files:

  • Create: apps/worker/src/whatsapp/session.ts

The session module wraps Baileys' makeWASocket. It manages auth state, reconnection on disconnect, and calls provided callbacks for groups (on connection) and messages (on upsert).

No unit test for the session itself — it wraps a live network connection. The integration is verified end-to-end in Task 8.

  • Step 1: Create apps/worker/src/whatsapp/session.ts
import makeWASocket, {
  useMultiFileAuthState,
  fetchLatestBaileysVersion,
  DisconnectReason,
  WASocket,
  proto,
  GroupMetadata,
} from '@whiskeysockets/baileys';
import { Boom } from '@hapi/boom';
import { createLogger } from '@tower/logger';

const logger = createLogger('whatsapp-session');

export type OnMessageCallback = (msg: proto.IWebMessageInfo) => void;
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => void;

export async function createWhatsAppSession(
  sessionPath: string,
  onMessage: OnMessageCallback,
  onGroups: OnGroupsCallback,
): Promise<WASocket> {
  const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
  const { version } = await fetchLatestBaileysVersion();

  const sock = makeWASocket({
    version,
    auth: state,
    printQRInTerminal: true,
    logger: logger as any,
  });

  sock.ev.on('creds.update', saveCreds);

  sock.ev.on('connection.update', async ({ connection, lastDisconnect }) => {
    if (connection === 'close') {
      const reason = (lastDisconnect?.error as Boom)?.output?.statusCode;
      const shouldReconnect = reason !== DisconnectReason.loggedOut;
      logger.info({ reason, shouldReconnect }, 'Connection closed');
      if (shouldReconnect) {
        logger.info('Reconnecting in 5s...');
        setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000);
      }
    } else if (connection === 'open') {
      logger.info('WhatsApp connected');
      const groups = await sock.groupFetchAllParticipating();
      onGroups(groups);
    }
  });

  sock.ev.on('messages.upsert', ({ messages, type }) => {
    if (type !== 'notify') return;
    for (const msg of messages) {
      onMessage(msg);
    }
  });

  return sock;
}
  • Step 2: Verify TypeScript compiles
pnpm --filter @tower/worker build

Expected: exit 0 with no type errors

  • Step 3: Commit
git add apps/worker/src/whatsapp/session.ts
git commit -m "feat: add Baileys WhatsApp session with reconnect logic"

Task 7: Add Prisma to Worker + Update .env

Files:

  • Modify: apps/worker/package.json

  • Modify: apps/worker/jest.config.js

  • Modify: turbo.json

  • Modify: .env.example

  • Modify: .gitignore

  • Step 1: Add @prisma/client and prisma to worker

pnpm --filter @tower/worker add @prisma/client
pnpm --filter @tower/worker add --save-dev prisma
  • Step 2: Add generate script to apps/worker/package.json

The generate script tells Prisma where to find the schema (in apps/api). After this step, package.json scripts section should be:

"scripts": {
  "generate": "prisma generate --schema=../api/prisma/schema.prisma",
  "build": "tsc",
  "dev": "ts-node src/main.ts",
  "start": "node dist/main.js",
  "test": "jest"
}
  • Step 3: Add generate task to turbo.json

Replace turbo.json content:

{
  "$schema": "https://turbo.build/schema.json",
  "tasks": {
    "generate": {
      "cache": false
    },
    "build": {
      "dependsOn": ["generate", "^build"],
      "outputs": ["dist/**", ".next/**"]
    },
    "dev": {
      "cache": false,
      "persistent": true
    },
    "test": {
      "dependsOn": ["^build"],
      "outputs": ["coverage/**"]
    },
    "lint": {}
  }
}
  • Step 4: Run prisma generate for worker
pnpm --filter @tower/worker generate

Expected: ✔ Generated Prisma Client with no errors

  • Step 5: Update apps/worker/jest.config.js to load .env

Replace the entire file:

const path = require('path');
require('dotenv').config({ path: path.resolve(__dirname, '../../.env') });

module.exports = {
  preset: 'ts-jest',
  testEnvironment: 'node',
  testMatch: ['**/*.test.ts'],
  rootDir: 'src',
};
  • Step 6: Add dotenv dev dependency to worker
pnpm --filter @tower/worker add --save-dev dotenv
  • Step 7: Update .env.example — add new vars

Add these lines to /Users/maaz/Documents/insignia-work/tower/.env.example:

# WhatsApp
WHATSAPP_SESSION_PATH=./sessions
TOWER_ADMIN_JIDS=

Also add to .env (the actual file, NOT committed):

WHATSAPP_SESSION_PATH=./sessions
TOWER_ADMIN_JIDS=
  • Step 8: Add sessions/ to .gitignore

Add this line to .gitignore:

sessions/
  • Step 9: Run worker tests to verify still passing
pnpm --filter @tower/worker test

Expected: PASS — 17 tests passing

  • Step 10: Commit
git add apps/worker/package.json apps/worker/jest.config.js turbo.json .env.example .gitignore pnpm-lock.yaml
git commit -m "chore: add Prisma client to worker, turbo generate task, update env"

Task 8: Wire Worker Bootstrap

Files:

  • Modify: apps/worker/src/main.ts

Wire together session → normalizer → tag-detector → queue so the worker fully processes incoming messages.

  • Step 1: Replace apps/worker/src/main.ts
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { IngestJobData } from '@tower/types';
import { createWhatsAppSession } from './whatsapp/session';
import { normalizeMessage } from './whatsapp/normalizer';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';

const logger = createLogger('tower-worker');

async function bootstrap() {
  const env = validateEnv();
  const prisma = new PrismaClient();
  await prisma.$connect();

  const adminJids = env.TOWER_ADMIN_JIDS
    ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
    : [];

  const ingestQueue = createIngestQueue(env.REDIS_URL);
  const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);

  ingestWorker.on('completed', (job) => {
    logger.info({ jobId: job.id }, 'Ingest job completed');
  });
  ingestWorker.on('failed', (job, err) => {
    logger.error({ jobId: job?.id, err }, 'Ingest job failed');
  });

  // jid→dbId map populated on WA connection
  let groupMap = new Map<string, string>();

  await createWhatsAppSession(
    env.WHATSAPP_SESSION_PATH,
    async (msg) => {
      const normalized = normalizeMessage(msg);
      if (!normalized) return;

      const tags = detectTags(normalized.content, normalized.senderJid, adminJids);
      if (!isFlagged(tags)) return;

      const sourceGroupId = groupMap.get(normalized.sourceGroupJid);
      if (!sourceGroupId) {
        logger.warn({ jid: normalized.sourceGroupJid }, 'Unknown group — skipping message');
        return;
      }

      const jobData: IngestJobData = {
        platformMsgId: normalized.platformMsgId,
        platform: 'whatsapp',
        sourceGroupId,
        senderJid: normalized.senderJid,
        senderName: normalized.senderName,
        content: normalized.content,
        tags,
      };

      await ingestQueue.add('ingest', jobData, {
        attempts: 3,
        backoff: { type: 'exponential', delay: 1000 },
      });

      logger.info({ platformMsgId: normalized.platformMsgId, tags }, 'Message enqueued');
    },
    async (groups) => {
      logger.info({ count: Object.keys(groups).length }, 'Syncing groups');
      groupMap = await syncGroups(groups, prisma);
      logger.info({ count: groupMap.size }, 'Groups synced');
    },
  );

  logger.info('Tower worker ready');

  const shutdown = async () => {
    logger.info('Shutting down...');
    await ingestWorker.close();
    await ingestQueue.close();
    await prisma.$disconnect();
    process.exit(0);
  };

  process.on('SIGTERM', shutdown);
  process.on('SIGINT', shutdown);
}

bootstrap().catch((err) => {
  console.error('Worker failed to start', err);
  process.exit(1);
});
  • Step 2: Build worker to verify no type errors
pnpm --filter @tower/worker build

Expected: exit 0 with no TypeScript errors

  • Step 3: Run worker tests
pnpm --filter @tower/worker test

Expected: PASS — all tests still passing

  • Step 4: Commit
git add apps/worker/src/main.ts
git commit -m "feat: wire worker bootstrap — session → normalizer → queue pipeline"

Task 9: Turborepo Full Smoke Test

Files: None new — verify the full pipeline

  • Step 1: Ensure Docker is running
docker compose up -d

Verify postgres (port 5433) and redis (port 6379) are healthy:

docker compose ps

Expected: postgres, redis, meilisearch all show Up

  • Step 2: Run full build
pnpm build

Expected:

Tasks:    8 successful, 8 total
  • Step 3: Run full test suite
pnpm test

Expected:

Tasks:    8 successful, 8 total

All packages must pass. If @tower/api#test fails with DATABASE_URL not found, verify .env has the correct DATABASE_URL=postgresql://tower:tower_dev@localhost:5433/tower_dev.

  • Step 4: Commit any fixes, then tag the milestone
git add -A
git commit -m "chore: turborepo smoke test — all 8 packages build and test clean"

Self-Review

Spec coverage:

  • Baileys connection with QR-based auth and session persistence
  • Group discovery on connect → upserted to Group table
  • Message normalization (text, extended text, image/video captions)
  • Tag detection: #important, #upparivar, #event, /tower command, admin sender
  • BullMQ tower:ingest queue — durability + retry logic
  • Message persistence as PENDING records (idempotent via upsert)
  • Graceful shutdown (SIGTERM/SIGINT)
  • ⏭️ admin reaction handling — deferred to Plan 3 (requires message store)
  • ⏭️ Media download — deferred to Plan 4 (requires Cloudflare R2 / MinIO)

Type consistency check:

  • NormalizedMessage.sourceGroupJid → used in groupMap.get() in main.ts
  • IngestJobData.sourceGroupId → DB Group.id (resolved from groupMap)
  • syncGroups returns Map<string, string> (jid → db id)
  • processIngestJob uses platform_platformMsgId compound unique key (matches Prisma schema @@unique([platform, platformMsgId]))
  • group.upsert uses platform_platformId compound key (matches @@unique([platform, platformId]))