Files
tower/docs/superpowers/plans/2026-05-27-archive-and-search.md
maaz519 1d6e1fb4da fix(search): tighten mock type cast for better type safety
Replace `as any` cast with `as unknown as ReturnType<typeof createMeiliClient>`
in the mock client factory. This preserves type safety without requiring the mock
to implement the full SDK interface.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 23:53:32 +05:30

39 KiB

Archive & Search Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Index every -approved message into Meilisearch and expose a paginated full-text search endpoint on the API.

Architecture: A shared @tower/search package owns the Meilisearch client factory, index name constant, settings configuration, and document type — consumed by both the worker (indexing) and the API (querying). handleStarReaction is extended to return an ApprovalResult containing both forward jobs and an IndexJobData document. The worker enqueues that document into a BullMQ tower-index queue immediately after approval; a processor converts it to a MeiliDocument (Unix ms timestamp) and calls indexMessage. The NestJS API gets a SearchModule with a GET /search endpoint that filters by sourceGroupId and tags.

Tech Stack: Meilisearch v1.11 (already in docker-compose), meilisearch npm SDK ^0.44.0, BullMQ 5, NestJS 11, @nestjs/config, TypeScript 5, Jest 29


File Map

Create:

  • packages/search/package.json@tower/search package manifest
  • packages/search/tsconfig.json — extends base tsconfig
  • packages/search/jest.config.js — ts-jest preset
  • packages/search/src/index.tsMeiliDocument, MESSAGES_INDEX, createMeiliClient, configureIndex, indexMessage, deleteMessage, re-export MeiliSearch type
  • packages/search/src/index.test.ts — unit tests for all exports
  • apps/worker/src/queues/index.queue.ts — BullMQ Queue for IndexJobData
  • apps/worker/src/queues/index.processor.ts — Worker that calls indexMessage
  • apps/worker/src/queues/index.processor.test.ts — unit tests
  • apps/api/src/modules/search/search.module.ts
  • apps/api/src/modules/search/search.service.ts — wraps Meilisearch, onModuleInit calls configureIndex
  • apps/api/src/modules/search/search.controller.tsGET /search
  • apps/api/src/modules/search/search.controller.spec.ts
  • apps/api/src/modules/search/search.service.spec.ts

Modify:

  • packages/types/src/message.ts — add IndexJobData interface
  • apps/worker/src/core/approval.ts — new ApprovalResult return type; build indexDoc
  • apps/worker/src/core/approval.test.ts — update assertions to result.forwardJobs / result.indexDoc; add missing mock fields
  • apps/worker/src/main.ts — create meiliClient + indexQueue + indexWorker; call configureIndex; enqueue index jobs after approval
  • apps/worker/package.json — add @tower/search: workspace:*
  • apps/api/package.json — add @tower/search: workspace:*
  • apps/api/src/app.module.ts — import SearchModule

Task 1: Add IndexJobData to @tower/types

Files:

  • Modify: packages/types/src/message.ts

  • Step 1: Write the failing test

Add to end of packages/types/src/message.ts — first verify the type compiles by adding a test file. Actually @tower/types has no test runner; type correctness is verified at build time. Skip to Step 2.

  • Step 2: Add IndexJobData to packages/types/src/message.ts

Append after the ForwardJobData interface (keep every existing line intact):

export interface IndexJobData {
  messageId: string;       // DB Message.id (cuid)
  content: string;
  senderName: string | null;
  sourceGroupId: string;   // DB Group.id
  sourceGroupName: string;
  tags: string[];
  platform: string;
  approvedAt: string;      // ISO 8601 — converted to Unix ms in the processor
}
  • Step 3: Build the package to verify no compile errors
cd /path/to/tower
pnpm --filter @tower/types build

Expected: dist/ updated, zero errors.

  • Step 4: Commit
git add packages/types/src/message.ts
git commit -m "feat(types): add IndexJobData for Meilisearch indexing"

Task 2: Create packages/search package

Files:

  • Create: packages/search/package.json

  • Create: packages/search/tsconfig.json

  • Create: packages/search/jest.config.js

  • Create: packages/search/src/index.ts

  • Create: packages/search/src/index.test.ts

  • Step 1: Write the failing tests

Create packages/search/src/index.test.ts:

import {
  MESSAGES_INDEX,
  configureIndex,
  indexMessage,
  deleteMessage,
  MeiliDocument,
} from './index';

function makeMockClient() {
  const mockUpdateSettings = jest.fn().mockResolvedValue({});
  const mockAddDocuments = jest.fn().mockResolvedValue({});
  const mockDeleteDocument = jest.fn().mockResolvedValue({});
  const mockIndex = jest.fn().mockReturnValue({
    updateSettings: mockUpdateSettings,
    addDocuments: mockAddDocuments,
    deleteDocument: mockDeleteDocument,
  });
  return { client: { index: mockIndex } as any, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument };
}

describe('MESSAGES_INDEX', () => {
  it('is the string tower-messages', () => {
    expect(MESSAGES_INDEX).toBe('tower-messages');
  });
});

describe('configureIndex', () => {
  it('sets searchable, filterable, and sortable attributes on the correct index', async () => {
    const { client, mockIndex, mockUpdateSettings } = makeMockClient();
    await configureIndex(client);
    expect(mockIndex).toHaveBeenCalledWith('tower-messages');
    expect(mockUpdateSettings).toHaveBeenCalledWith({
      searchableAttributes: ['content', 'senderName', 'sourceGroupName'],
      filterableAttributes: ['sourceGroupId', 'tags', 'platform'],
      sortableAttributes: ['approvedAt'],
    });
  });
});

describe('indexMessage', () => {
  it('adds the document to the messages index', async () => {
    const { client, mockIndex, mockAddDocuments } = makeMockClient();
    const doc: MeiliDocument = {
      id: 'msg-1',
      content: 'Hello community',
      senderName: 'Alice',
      sourceGroupId: 'grp-1',
      sourceGroupName: 'UP Parivar',
      tags: ['#important'],
      platform: 'whatsapp',
      approvedAt: 1716825600000,
    };
    await indexMessage(client, doc);
    expect(mockIndex).toHaveBeenCalledWith('tower-messages');
    expect(mockAddDocuments).toHaveBeenCalledWith([doc]);
  });
});

describe('deleteMessage', () => {
  it('deletes document by id from the messages index', async () => {
    const { client, mockIndex, mockDeleteDocument } = makeMockClient();
    await deleteMessage(client, 'msg-1');
    expect(mockIndex).toHaveBeenCalledWith('tower-messages');
    expect(mockDeleteDocument).toHaveBeenCalledWith('msg-1');
  });
});
  • Step 2: Create packages/search/src/index.ts
import { MeiliSearch } from 'meilisearch';

export { MeiliSearch } from 'meilisearch';

export interface MeiliDocument {
  id: string;              // DB Message.id
  content: string;
  senderName: string;      // empty string when null
  sourceGroupId: string;
  sourceGroupName: string;
  tags: string[];
  platform: string;
  approvedAt: number;      // Unix ms — Meilisearch sorts numbers, not ISO strings
}

export const MESSAGES_INDEX = 'tower-messages';

export function createMeiliClient(url: string, masterKey: string): MeiliSearch {
  return new MeiliSearch({ host: url, apiKey: masterKey });
}

export async function configureIndex(client: MeiliSearch): Promise<void> {
  await client.index(MESSAGES_INDEX).updateSettings({
    searchableAttributes: ['content', 'senderName', 'sourceGroupName'],
    filterableAttributes: ['sourceGroupId', 'tags', 'platform'],
    sortableAttributes: ['approvedAt'],
  });
}

export async function indexMessage(client: MeiliSearch, doc: MeiliDocument): Promise<void> {
  await client.index(MESSAGES_INDEX).addDocuments([doc]);
}

export async function deleteMessage(client: MeiliSearch, id: string): Promise<void> {
  await client.index(MESSAGES_INDEX).deleteDocument(id);
}
  • Step 3: Create packages/search/package.json
{
  "name": "@tower/search",
  "version": "0.0.1",
  "main": "./dist/index.js",
  "types": "./dist/index.d.ts",
  "exports": {
    ".": {
      "types": "./dist/index.d.ts",
      "default": "./dist/index.js"
    }
  },
  "scripts": {
    "build": "tsc",
    "dev": "tsc --watch",
    "test": "jest"
  },
  "dependencies": {
    "meilisearch": "^0.44.0"
  },
  "devDependencies": {
    "@types/jest": "^29.0.0",
    "@types/node": "^22.0.0",
    "jest": "^29.0.0",
    "ts-jest": "^29.0.0",
    "typescript": "^5.7.0"
  }
}
  • Step 4: Create packages/search/tsconfig.json
{
  "extends": "../../tsconfig.base.json",
  "compilerOptions": {
    "outDir": "./dist",
    "rootDir": "./src"
  },
  "include": ["src"]
}
  • Step 5: Create packages/search/jest.config.js
module.exports = {
  preset: 'ts-jest',
  testEnvironment: 'node',
  testMatch: ['**/*.test.ts'],
  rootDir: 'src',
};
  • Step 6: Install dependencies and run tests
cd /path/to/tower
pnpm install
pnpm --filter @tower/search test

Expected: 4 tests pass (MESSAGES_INDEX, configureIndex, indexMessage, deleteMessage).

  • Step 7: Build
pnpm --filter @tower/search build

Expected: packages/search/dist/ created, zero errors.

  • Step 8: Commit
git add packages/search/
git commit -m "feat(search): add @tower/search package with Meilisearch client and helpers"

Task 3: Update handleStarReaction to return ApprovalResult

Files:

  • Modify: apps/worker/src/core/approval.ts
  • Modify: apps/worker/src/core/approval.test.ts

The current return type is ForwardJobData[] | null. Change it to ApprovalResult | null where ApprovalResult includes both the forward jobs and the data needed for Meilisearch indexing. This lets main.ts index the message without an extra DB query.

  • Step 1: Update approval.test.ts first (TDD — tests fail before change)

Replace apps/worker/src/core/approval.test.ts entirely:

import { handleStarReaction } from './approval';
import { NormalizedReaction } from '@tower/types';

function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
  return {
    reactorJid: '919876543210@s.whatsapp.net',
    targetMsgId: 'TARGET_MSG_123',
    sourceGroupJid: '120363043312345678@g.us',
    emoji: '⭐',
    accountId: 'acc_1',
    ...overrides,
  };
}

const adminJids = ['919876543210@s.whatsapp.net'];

// Minimal valid message returned by prisma.message.findUnique
function makeMessage(overrides: object = {}) {
  return {
    id: 'msg_1',
    status: 'PENDING',
    approval: null,
    content: 'hello world',
    senderName: 'Alice',
    sourceGroupId: 'grp_1',
    tags: ['#important'],
    platform: 'whatsapp',
    sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
    ...overrides,
  };
}

function makePrisma(messageOverrides: object = {}, txCount = 1) {
  return {
    message: { findUnique: jest.fn().mockResolvedValue(makeMessage(messageOverrides)) },
    $transaction: jest.fn().mockImplementation(async (fn: any) =>
      fn({
        message: { updateMany: jest.fn().mockResolvedValue({ count: txCount }) },
        approval: { create: jest.fn().mockResolvedValue({}) },
      }),
    ),
  } as any;
}

describe('handleStarReaction', () => {
  it('returns null for non-star emoji', async () => {
    expect(await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any)).toBeNull();
  });

  it('returns null when reactor is not an admin', async () => {
    expect(
      await handleStarReaction(makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any),
    ).toBeNull();
  });

  it('returns null when message not found', async () => {
    const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
    expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
    expect(prisma.message.findUnique).toHaveBeenCalledWith({
      where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' } },
      include: {
        approval: true,
        sourceGroup: {
          include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } } },
        },
      },
    });
  });

  it('returns null when message status is not PENDING', async () => {
    const prisma = {
      message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'REJECTED' })) },
    } as any;
    expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
  });

  it('returns null when approval record already exists', async () => {
    const prisma = {
      message: {
        findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'APPROVED', approval: { id: 'appr_1' } })),
      },
    } as any;
    expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
  });

  it('returns null on double-approval race (updateMany count=0)', async () => {
    const result = await handleStarReaction(makeReaction(), adminJids, makePrisma({}, 0));
    expect(result).toBeNull();
  });

  it('returns ApprovalResult with empty forwardJobs and valid indexDoc when no sync routes', async () => {
    const result = await handleStarReaction(makeReaction(), adminJids, makePrisma());
    expect(result).not.toBeNull();
    expect(result!.forwardJobs).toEqual([]);
    expect(result!.indexDoc).toMatchObject({
      messageId: 'msg_1',
      content: 'hello world',
      senderName: 'Alice',
      sourceGroupId: 'grp_1',
      sourceGroupName: 'UP Parivar Dallas',
      tags: ['#important'],
      platform: 'whatsapp',
    });
    expect(result!.indexDoc.approvedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
  });

  it('returns ForwardJobData for each active sync route', async () => {
    const prisma = makePrisma({
      content: 'important announcement',
      senderName: 'Bob',
      sourceGroup: {
        name: 'Source Group',
        syncRoutesFrom: [
          { targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
          { targetGroup: { platformId: '888@g.us', accountId: null } },
        ],
      },
    });

    const result = await handleStarReaction(makeReaction(), adminJids, prisma);
    expect(result!.forwardJobs).toHaveLength(2);
    expect(result!.forwardJobs[0]).toMatchObject({
      messageId: 'msg_1',
      content: 'important announcement',
      sourceGroupName: 'Source Group',
      senderName: 'Bob',
      toGroupJid: '999@g.us',
      fromAccountId: 'acc_2',
    });
    expect(result!.forwardJobs[1]).toMatchObject({
      toGroupJid: '888@g.us',
      fromAccountId: 'acc_1', // falls back to reaction.accountId
    });
  });
});
  • Step 2: Run tests — expect 2 failures (the two tests that check result.forwardJobs and result.indexDoc)
pnpm --filter @tower/worker test -- --testPathPattern=approval

Expected: FAIL on the last two tests.

  • Step 3: Update apps/worker/src/core/approval.ts
import { NormalizedReaction, ForwardJobData, IndexJobData } from '@tower/types';

export interface ApprovalResult {
  forwardJobs: ForwardJobData[];
  indexDoc: IndexJobData;
}

export async function handleStarReaction(
  reaction: NormalizedReaction,
  adminJids: string[],
  prisma: any,
): Promise<ApprovalResult | null> {
  if (reaction.emoji !== '⭐') return null;
  if (!adminJids.includes(reaction.reactorJid)) return null;

  const message = await prisma.message.findUnique({
    where: {
      platform_platformMsgId: {
        // TODO: derive platform from NormalizedReaction when multi-platform support is added
        platform: 'whatsapp',
        platformMsgId: reaction.targetMsgId,
      },
    },
    include: {
      approval: true,
      sourceGroup: {
        include: {
          syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
        },
      },
    },
  });

  if (!message) return null;
  if (message.status !== 'PENDING') return null;
  if (message.approval) return null;

  let approved = false;
  await prisma.$transaction(async (tx: any) => {
    const updated = await tx.message.updateMany({
      where: { id: message.id, status: 'PENDING' },
      data: { status: 'APPROVED' },
    });
    if (updated.count === 0) return;
    approved = true;
    await tx.approval.create({
      data: {
        messageId: message.id,
        adminId: reaction.reactorJid,
        decision: 'APPROVED',
      },
    });
  });

  if (!approved) return null;

  const forwardJobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom
    .filter((route: any) => route.targetGroup != null)
    .map((route: any) => ({
      messageId: message.id,
      content: message.content,
      sourceGroupName: message.sourceGroup.name,
      senderName: message.senderName ?? undefined,
      toGroupJid: route.targetGroup.platformId,
      fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
    }));

  const indexDoc: IndexJobData = {
    messageId: message.id,
    content: message.content,
    senderName: message.senderName ?? null,
    sourceGroupId: message.sourceGroupId,
    sourceGroupName: message.sourceGroup.name,
    tags: message.tags,
    platform: message.platform,
    approvedAt: new Date().toISOString(),
  };

  return { forwardJobs, indexDoc };
}
  • Step 4: Run tests — all pass
pnpm --filter @tower/worker test -- --testPathPattern=approval

Expected: 8 tests pass.

  • Step 5: Commit
git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts
git commit -m "feat(worker): handleStarReaction returns ApprovalResult with indexDoc"

Task 4: Worker — index queue, processor, and wire main.ts

Files:

  • Create: apps/worker/src/queues/index.queue.ts

  • Create: apps/worker/src/queues/index.processor.ts

  • Create: apps/worker/src/queues/index.processor.test.ts

  • Modify: apps/worker/package.json — add @tower/search: workspace:*

  • Modify: apps/worker/src/main.ts — create meiliClient, configureIndex, indexQueue, indexWorker, update reaction handler, update shutdown

  • Step 1: Add @tower/search to worker dependencies

Edit apps/worker/package.json — add to the "dependencies" object:

"@tower/search": "workspace:*"

Then install:

pnpm install
  • Step 2: Write the failing processor test

Create apps/worker/src/queues/index.processor.test.ts:

import { processIndexJob } from './index.processor';
import { indexMessage } from '@tower/search';
import { IndexJobData } from '@tower/types';

jest.mock('@tower/search', () => ({
  indexMessage: jest.fn().mockResolvedValue(undefined),
  MESSAGES_INDEX: 'tower-messages',
}));

function makeJob(overrides: Partial<IndexJobData> = {}): IndexJobData {
  return {
    messageId: 'msg-1',
    content: 'hello world',
    senderName: 'Alice',
    sourceGroupId: 'grp-1',
    sourceGroupName: 'UP Parivar',
    tags: ['#important'],
    platform: 'whatsapp',
    approvedAt: '2026-05-27T10:00:00.000Z',
    ...overrides,
  };
}

describe('processIndexJob', () => {
  beforeEach(() => jest.clearAllMocks());

  it('calls indexMessage with MeiliDocument shape', async () => {
    const mockClient = {} as any;
    await processIndexJob(makeJob(), mockClient);
    expect(indexMessage).toHaveBeenCalledWith(mockClient, {
      id: 'msg-1',
      content: 'hello world',
      senderName: 'Alice',
      sourceGroupId: 'grp-1',
      sourceGroupName: 'UP Parivar',
      tags: ['#important'],
      platform: 'whatsapp',
      approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(),
    });
  });

  it('converts null senderName to empty string', async () => {
    const mockClient = {} as any;
    await processIndexJob(makeJob({ senderName: null }), mockClient);
    expect(indexMessage).toHaveBeenCalledWith(
      mockClient,
      expect.objectContaining({ senderName: '' }),
    );
  });

  it('converts approvedAt ISO string to Unix ms number', async () => {
    const mockClient = {} as any;
    await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient);
    expect(indexMessage).toHaveBeenCalledWith(
      mockClient,
      expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }),
    );
  });
});
  • Step 3: Run test — expect failure
pnpm --filter @tower/worker test -- --testPathPattern=index.processor

Expected: FAILCannot find module './index.processor'.

  • Step 4: Create apps/worker/src/queues/index.queue.ts
import { Queue } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';

export function createIndexQueue(redisUrl: string): Queue<IndexJobData> {
  return new Queue<IndexJobData>('tower-index', { connection: parseRedisUrl(redisUrl) });
}
  • Step 5: Create apps/worker/src/queues/index.processor.ts
import { Worker } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search';
import { parseRedisUrl } from './redis-connection';

export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise<void> {
  const doc: MeiliDocument = {
    id: job.messageId,
    content: job.content,
    senderName: job.senderName ?? '',
    sourceGroupId: job.sourceGroupId,
    sourceGroupName: job.sourceGroupName,
    tags: job.tags,
    platform: job.platform,
    approvedAt: new Date(job.approvedAt).getTime(),
  };
  await indexMessage(meiliClient, doc);
}

export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker<IndexJobData> {
  return new Worker<IndexJobData>(
    'tower-index',
    async (job) => processIndexJob(job.data, meiliClient),
    { connection: parseRedisUrl(redisUrl) },
  );
}
  • Step 6: Run tests — all pass
pnpm --filter @tower/worker test -- --testPathPattern=index.processor

Expected: 3 tests pass.

  • Step 7: Update apps/worker/src/main.ts

Replace the entire file with:

import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { createMeiliClient, configureIndex } from '@tower/search';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { createIndexQueue } from './queues/index.queue';
import { createIndexWorker } from './queues/index.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';

const logger = createLogger('tower-worker');

async function bootstrap() {
  const env = validateEnv();
  const prisma = new PrismaClient();
  await prisma.$connect();

  const adminJids = env.TOWER_ADMIN_JIDS
    ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
    : [];

  const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY);
  await configureIndex(meiliClient).catch((err) =>
    logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'),
  );

  const ingestQueue = createIngestQueue(env.REDIS_URL);
  const forwardQueue = createForwardQueue(env.REDIS_URL);
  const indexQueue = createIndexQueue(env.REDIS_URL);
  const pool = new WhatsAppSessionPool();

  const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
  const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
  const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient);

  ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
  ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
  forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
  forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
  indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed'));
  indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed'));

  const accounts = await prisma.account.findMany({
    where: { status: 'ACTIVE', platform: 'whatsapp' },
  });

  if (accounts.length === 0) {
    logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)');
  }

  const groupMaps = new Map<string, Map<string, string>>();

  for (const account of accounts) {
    groupMaps.set(account.id, new Map());

    try {
      await pool.add(
        account.id,
        account.sessionPath,
        async (msg, accountId) => {
          const tags = detectTags(msg.content, msg.senderJid, adminJids);
          if (!isFlagged(tags)) return;

          const groupMap = groupMaps.get(accountId);
          if (!groupMap) {
            logger.error({ accountId }, 'No group map for account — message dropped');
            return;
          }
          const sourceGroupId = groupMap.get(msg.sourceGroupJid);
          if (!sourceGroupId) {
            logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
            return;
          }

          await ingestQueue.add(
            'ingest',
            {
              platformMsgId: msg.platformMsgId,
              platform: 'whatsapp',
              accountId,
              sourceGroupId,
              senderJid: msg.senderJid,
              senderName: msg.senderName,
              content: msg.content,
              tags,
            },
            { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
          );

          logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
        },
        async (reaction) => {
          const result = await handleStarReaction(reaction, adminJids, prisma);
          if (!result) return;

          const { forwardJobs, indexDoc } = result;

          await indexQueue.add('index', indexDoc, {
            attempts: 3,
            backoff: { type: 'exponential', delay: 1000 },
          });

          for (const job of forwardJobs) {
            await forwardQueue.add('forward', job, {
              attempts: 3,
              backoff: { type: 'exponential', delay: 2000 },
            });
          }

          logger.info(
            { messageId: indexDoc.messageId, forwardCount: forwardJobs.length },
            'Message approved — indexed and forwarded',
          );
        },
        async (groups, accountId) => {
          logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
          const map = await syncGroups(groups, accountId, prisma);
          groupMaps.set(accountId, map);
        },
      );
    } catch (err) {
      logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account');
    }
  }

  logger.info({ accountCount: accounts.length }, 'Tower worker ready');

  const shutdown = async () => {
    logger.info('Shutting down...');
    await pool.closeAll();
    await ingestWorker.close();
    await forwardWorker.close();
    await indexWorker.close();
    await ingestQueue.close();
    await forwardQueue.close();
    await indexQueue.close();
    await prisma.$disconnect();
    process.exit(0);
  };

  process.on('SIGTERM', shutdown);
  process.on('SIGINT', shutdown);
}

bootstrap().catch((err) => {
  console.error('Worker failed to start', err);
  process.exit(1);
});
  • Step 8: Run all worker tests and build
pnpm --filter @tower/worker test
pnpm --filter @tower/worker build

Expected: All tests pass (the approval.test.ts changes in Task 3 must already be committed), zero build errors.

  • Step 9: Commit
git add apps/worker/src/queues/index.queue.ts \
        apps/worker/src/queues/index.processor.ts \
        apps/worker/src/queues/index.processor.test.ts \
        apps/worker/src/main.ts \
        apps/worker/package.json \
        pnpm-lock.yaml
git commit -m "feat(worker): add index queue and wire Meilisearch indexing after approval"

Task 5: API — SearchModule

Files:

  • Create: apps/api/src/modules/search/search.module.ts

  • Create: apps/api/src/modules/search/search.service.ts

  • Create: apps/api/src/modules/search/search.controller.ts

  • Create: apps/api/src/modules/search/search.controller.spec.ts

  • Create: apps/api/src/modules/search/search.service.spec.ts

  • Modify: apps/api/package.json — add @tower/search: workspace:*

  • Modify: apps/api/src/app.module.ts — register SearchModule

  • Step 1: Add @tower/search to API dependencies

Edit apps/api/package.json — add to "dependencies":

"@tower/search": "workspace:*"

Then:

pnpm install
  • Step 2: Write failing controller spec

Create apps/api/src/modules/search/search.controller.spec.ts:

import { Test, TestingModule } from '@nestjs/testing';
import { SearchController } from './search.controller';
import { SearchService } from './search.service';

const mockSearchService = { search: jest.fn() };

describe('SearchController', () => {
  let controller: SearchController;

  beforeEach(async () => {
    jest.clearAllMocks();
    const module: TestingModule = await Test.createTestingModule({
      controllers: [SearchController],
      providers: [{ provide: SearchService, useValue: mockSearchService }],
    }).compile();
    controller = module.get<SearchController>(SearchController);
  });

  it('calls service with all parsed params', async () => {
    mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 2, limit: 10, query: 'hello' });
    await controller.search('hello', 'grp-1', 'important,event', '2', '10');
    expect(mockSearchService.search).toHaveBeenCalledWith('hello', 'grp-1', ['important', 'event'], 2, 10);
  });

  it('defaults page to 1 and limit to 20 when not provided', async () => {
    mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' });
    await controller.search('');
    expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, undefined, 1, 20);
  });

  it('returns the service result directly', async () => {
    const expected = { hits: [{ id: 'msg-1' }], total: 1, page: 1, limit: 20, query: 'test' };
    mockSearchService.search.mockResolvedValue(expected);
    const result = await controller.search('test');
    expect(result).toEqual(expected);
  });

  it('splits tags on comma and trims whitespace', async () => {
    mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' });
    await controller.search('', undefined, ' important , event ');
    expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, ['important', 'event'], 1, 20);
  });
});
  • Step 3: Write failing service spec

Create apps/api/src/modules/search/search.service.spec.ts:

import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { SearchService } from './search.service';
import * as searchPkg from '@tower/search';

jest.mock('@tower/search', () => ({
  createMeiliClient: jest.fn(),
  configureIndex: jest.fn().mockResolvedValue(undefined),
  MESSAGES_INDEX: 'tower-messages',
}));

describe('SearchService', () => {
  let service: SearchService;
  const mockSearch = jest.fn();
  const mockIndex = jest.fn().mockReturnValue({ search: mockSearch });
  const mockClient = { index: mockIndex };

  beforeEach(async () => {
    jest.clearAllMocks();
    (searchPkg.createMeiliClient as jest.Mock).mockReturnValue(mockClient);

    const module: TestingModule = await Test.createTestingModule({
      providers: [
        SearchService,
        { provide: ConfigService, useValue: { get: jest.fn().mockReturnValue('') } },
      ],
    }).compile();

    service = module.get<SearchService>(SearchService);
    await service.onModuleInit();
  });

  it('calls configureIndex on init', () => {
    expect(searchPkg.configureIndex).toHaveBeenCalledWith(mockClient);
  });

  it('returns hits and total', async () => {
    mockSearch.mockResolvedValue({ hits: [{ id: 'msg-1', content: 'hello' }], totalHits: 1 });
    const result = await service.search('hello');
    expect(result.hits).toHaveLength(1);
    expect(result.total).toBe(1);
    expect(result.query).toBe('hello');
  });

  it('searches with no filter when no groupId or tags', async () => {
    mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
    await service.search('test');
    expect(mockSearch).toHaveBeenCalledWith('test', expect.objectContaining({ filter: undefined }));
  });

  it('applies sourceGroupId filter', async () => {
    mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
    await service.search('hello', 'grp-1');
    expect(mockSearch).toHaveBeenCalledWith(
      'hello',
      expect.objectContaining({ filter: 'sourceGroupId = "grp-1"' }),
    );
  });

  it('applies tags filter', async () => {
    mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
    await service.search('hello', undefined, ['#important']);
    expect(mockSearch).toHaveBeenCalledWith(
      'hello',
      expect.objectContaining({ filter: 'tags = "#important"' }),
    );
  });

  it('combines groupId and tags filters with AND', async () => {
    mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
    await service.search('hello', 'grp-1', ['#important', '#event']);
    expect(mockSearch).toHaveBeenCalledWith(
      'hello',
      expect.objectContaining({
        filter: 'sourceGroupId = "grp-1" AND tags = "#important" AND tags = "#event"',
      }),
    );
  });

  it('defaults page to 1 and hitsPerPage to 20', async () => {
    mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
    await service.search('hello');
    expect(mockSearch).toHaveBeenCalledWith(
      'hello',
      expect.objectContaining({ page: 1, hitsPerPage: 20 }),
    );
  });
});
  • Step 4: Run specs — expect failure
pnpm --filter @tower/api test

Expected: FAIL — modules not found.

  • Step 5: Create apps/api/src/modules/search/search.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import {
  MeiliSearch,
  MeiliDocument,
  MESSAGES_INDEX,
  createMeiliClient,
  configureIndex,
} from '@tower/search';

@Injectable()
export class SearchService implements OnModuleInit {
  private readonly client: MeiliSearch;

  constructor(private readonly config: ConfigService) {
    this.client = createMeiliClient(
      this.config.get('MEILI_URL', 'http://localhost:7700'),
      this.config.get('MEILI_MASTER_KEY', ''),
    );
  }

  async onModuleInit(): Promise<void> {
    await configureIndex(this.client).catch((err) =>
      console.warn('Failed to configure Meilisearch index:', err),
    );
  }

  async search(
    query: string,
    groupId?: string,
    tags?: string[],
    page = 1,
    limit = 20,
  ): Promise<{ hits: MeiliDocument[]; total: number; page: number; limit: number; query: string }> {
    const filters: string[] = [];
    if (groupId) filters.push(`sourceGroupId = "${groupId}"`);
    if (tags?.length) filters.push(...tags.map((t) => `tags = "${t}"`));

    const result = await this.client.index(MESSAGES_INDEX).search(query, {
      filter: filters.length ? filters.join(' AND ') : undefined,
      page,
      hitsPerPage: limit,
      sort: ['approvedAt:desc'],
    });

    return {
      hits: result.hits as MeiliDocument[],
      total: result.totalHits ?? 0,
      page,
      limit,
      query,
    };
  }
}
  • Step 6: Create apps/api/src/modules/search/search.controller.ts
import { Controller, Get, Query } from '@nestjs/common';
import { SearchService } from './search.service';

@Controller('search')
export class SearchController {
  constructor(private readonly searchService: SearchService) {}

  @Get()
  search(
    @Query('q') q = '',
    @Query('groupId') groupId?: string,
    @Query('tags') tags?: string,
    @Query('page') page = '1',
    @Query('limit') limit = '20',
  ) {
    const tagList = tags
      ? tags.split(',').map((t) => t.trim()).filter(Boolean)
      : undefined;
    return this.searchService.search(q, groupId, tagList, Number(page), Number(limit));
  }
}
  • Step 7: Create apps/api/src/modules/search/search.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { SearchController } from './search.controller';
import { SearchService } from './search.service';

@Module({
  imports: [ConfigModule],
  controllers: [SearchController],
  providers: [SearchService],
})
export class SearchModule {}
  • Step 8: Register SearchModule in apps/api/src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { PrismaModule } from './prisma/prisma.module';
import { HealthModule } from './modules/health/health.module';
import { SearchModule } from './modules/search/search.module';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    PrismaModule,
    HealthModule,
    SearchModule,
  ],
})
export class AppModule {}
  • Step 9: Run all API tests and build
pnpm --filter @tower/api test
pnpm --filter @tower/api build

Expected: All specs pass (health controller + search controller + search service), zero build errors.

  • Step 10: Commit
git add apps/api/src/modules/search/ \
        apps/api/src/app.module.ts \
        apps/api/package.json \
        pnpm-lock.yaml
git commit -m "feat(api): add SearchModule with GET /search endpoint backed by Meilisearch"

Self-Review

Spec coverage:

  • Approved messages indexed in Meilisearch — Task 4 (index queue + processor)
  • Index stays in sync with approvals — Task 3 wires indexDoc into approval result, Task 4 enqueues it
  • Full-text search endpoint — Task 5 GET /search
  • Filterable by group and tags — SearchService.search builds Meilisearch filter string
  • Paginated results — page + limit query params, hitsPerPage in Meilisearch query
  • Index configured with correct attributes — configureIndex called on both worker startup and API onModuleInit
  • Messages indexed even with no sync routes — main.ts always enqueues index job regardless of forwardJobs.length

Placeholder scan: None found. All code steps contain complete implementations.

Type consistency:

  • IndexJobData.approvedAt is string (ISO) throughout types and approval.ts
  • MeiliDocument.approvedAt is number (Unix ms) — conversion happens in processIndexJob
  • handleStarReaction returns ApprovalResult | null consistently across approval.ts and approval.test.ts
  • result.forwardJobs and result.indexDoc used consistently in tests and main.ts