# Archive & Search Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Index every ⭐-approved message into Meilisearch and expose a paginated full-text search endpoint on the API. **Architecture:** A shared `@tower/search` package owns the Meilisearch client factory, index name constant, settings configuration, and document type — consumed by both the worker (indexing) and the API (querying). `handleStarReaction` is extended to return an `ApprovalResult` containing both forward jobs and an `IndexJobData` document. The worker enqueues that document into a BullMQ `tower-index` queue immediately after approval; a processor converts it to a `MeiliDocument` (Unix ms timestamp) and calls `indexMessage`. The NestJS API gets a `SearchModule` with a `GET /search` endpoint that filters by `sourceGroupId` and `tags`. **Tech Stack:** Meilisearch v1.11 (already in docker-compose), `meilisearch` npm SDK ^0.44.0, BullMQ 5, NestJS 11, `@nestjs/config`, TypeScript 5, Jest 29 --- ## File Map **Create:** - `packages/search/package.json` — `@tower/search` package manifest - `packages/search/tsconfig.json` — extends base tsconfig - `packages/search/jest.config.js` — ts-jest preset - `packages/search/src/index.ts` — `MeiliDocument`, `MESSAGES_INDEX`, `createMeiliClient`, `configureIndex`, `indexMessage`, `deleteMessage`, re-export `MeiliSearch` type - `packages/search/src/index.test.ts` — unit tests for all exports - `apps/worker/src/queues/index.queue.ts` — BullMQ Queue for `IndexJobData` - `apps/worker/src/queues/index.processor.ts` — Worker that calls `indexMessage` - `apps/worker/src/queues/index.processor.test.ts` — unit tests - `apps/api/src/modules/search/search.module.ts` - `apps/api/src/modules/search/search.service.ts` — wraps Meilisearch, `onModuleInit` calls `configureIndex` - `apps/api/src/modules/search/search.controller.ts` — `GET /search` - `apps/api/src/modules/search/search.controller.spec.ts` - `apps/api/src/modules/search/search.service.spec.ts` **Modify:** - `packages/types/src/message.ts` — add `IndexJobData` interface - `apps/worker/src/core/approval.ts` — new `ApprovalResult` return type; build `indexDoc` - `apps/worker/src/core/approval.test.ts` — update assertions to `result.forwardJobs` / `result.indexDoc`; add missing mock fields - `apps/worker/src/main.ts` — create meiliClient + indexQueue + indexWorker; call `configureIndex`; enqueue index jobs after approval - `apps/worker/package.json` — add `@tower/search: workspace:*` - `apps/api/package.json` — add `@tower/search: workspace:*` - `apps/api/src/app.module.ts` — import `SearchModule` --- ### Task 1: Add `IndexJobData` to `@tower/types` **Files:** - Modify: `packages/types/src/message.ts` - [ ] **Step 1: Write the failing test** Add to end of `packages/types/src/message.ts` — first verify the type compiles by adding a test file. Actually `@tower/types` has no test runner; type correctness is verified at build time. Skip to Step 2. - [ ] **Step 2: Add `IndexJobData` to `packages/types/src/message.ts`** Append after the `ForwardJobData` interface (keep every existing line intact): ```typescript export interface IndexJobData { messageId: string; // DB Message.id (cuid) content: string; senderName: string | null; sourceGroupId: string; // DB Group.id sourceGroupName: string; tags: string[]; platform: string; approvedAt: string; // ISO 8601 — converted to Unix ms in the processor } ``` - [ ] **Step 3: Build the package to verify no compile errors** ```bash cd /path/to/tower pnpm --filter @tower/types build ``` Expected: `dist/` updated, zero errors. - [ ] **Step 4: Commit** ```bash git add packages/types/src/message.ts git commit -m "feat(types): add IndexJobData for Meilisearch indexing" ``` --- ### Task 2: Create `packages/search` package **Files:** - Create: `packages/search/package.json` - Create: `packages/search/tsconfig.json` - Create: `packages/search/jest.config.js` - Create: `packages/search/src/index.ts` - Create: `packages/search/src/index.test.ts` - [ ] **Step 1: Write the failing tests** Create `packages/search/src/index.test.ts`: ```typescript import { MESSAGES_INDEX, configureIndex, indexMessage, deleteMessage, MeiliDocument, } from './index'; function makeMockClient() { const mockUpdateSettings = jest.fn().mockResolvedValue({}); const mockAddDocuments = jest.fn().mockResolvedValue({}); const mockDeleteDocument = jest.fn().mockResolvedValue({}); const mockIndex = jest.fn().mockReturnValue({ updateSettings: mockUpdateSettings, addDocuments: mockAddDocuments, deleteDocument: mockDeleteDocument, }); return { client: { index: mockIndex } as any, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument }; } describe('MESSAGES_INDEX', () => { it('is the string tower-messages', () => { expect(MESSAGES_INDEX).toBe('tower-messages'); }); }); describe('configureIndex', () => { it('sets searchable, filterable, and sortable attributes on the correct index', async () => { const { client, mockIndex, mockUpdateSettings } = makeMockClient(); await configureIndex(client); expect(mockIndex).toHaveBeenCalledWith('tower-messages'); expect(mockUpdateSettings).toHaveBeenCalledWith({ searchableAttributes: ['content', 'senderName', 'sourceGroupName'], filterableAttributes: ['sourceGroupId', 'tags', 'platform'], sortableAttributes: ['approvedAt'], }); }); }); describe('indexMessage', () => { it('adds the document to the messages index', async () => { const { client, mockIndex, mockAddDocuments } = makeMockClient(); const doc: MeiliDocument = { id: 'msg-1', content: 'Hello community', senderName: 'Alice', sourceGroupId: 'grp-1', sourceGroupName: 'UP Parivar', tags: ['#important'], platform: 'whatsapp', approvedAt: 1716825600000, }; await indexMessage(client, doc); expect(mockIndex).toHaveBeenCalledWith('tower-messages'); expect(mockAddDocuments).toHaveBeenCalledWith([doc]); }); }); describe('deleteMessage', () => { it('deletes document by id from the messages index', async () => { const { client, mockIndex, mockDeleteDocument } = makeMockClient(); await deleteMessage(client, 'msg-1'); expect(mockIndex).toHaveBeenCalledWith('tower-messages'); expect(mockDeleteDocument).toHaveBeenCalledWith('msg-1'); }); }); ``` - [ ] **Step 2: Create `packages/search/src/index.ts`** ```typescript import { MeiliSearch } from 'meilisearch'; export { MeiliSearch } from 'meilisearch'; export interface MeiliDocument { id: string; // DB Message.id content: string; senderName: string; // empty string when null sourceGroupId: string; sourceGroupName: string; tags: string[]; platform: string; approvedAt: number; // Unix ms — Meilisearch sorts numbers, not ISO strings } export const MESSAGES_INDEX = 'tower-messages'; export function createMeiliClient(url: string, masterKey: string): MeiliSearch { return new MeiliSearch({ host: url, apiKey: masterKey }); } export async function configureIndex(client: MeiliSearch): Promise { await client.index(MESSAGES_INDEX).updateSettings({ searchableAttributes: ['content', 'senderName', 'sourceGroupName'], filterableAttributes: ['sourceGroupId', 'tags', 'platform'], sortableAttributes: ['approvedAt'], }); } export async function indexMessage(client: MeiliSearch, doc: MeiliDocument): Promise { await client.index(MESSAGES_INDEX).addDocuments([doc]); } export async function deleteMessage(client: MeiliSearch, id: string): Promise { await client.index(MESSAGES_INDEX).deleteDocument(id); } ``` - [ ] **Step 3: Create `packages/search/package.json`** ```json { "name": "@tower/search", "version": "0.0.1", "main": "./dist/index.js", "types": "./dist/index.d.ts", "exports": { ".": { "types": "./dist/index.d.ts", "default": "./dist/index.js" } }, "scripts": { "build": "tsc", "dev": "tsc --watch", "test": "jest" }, "dependencies": { "meilisearch": "^0.44.0" }, "devDependencies": { "@types/jest": "^29.0.0", "@types/node": "^22.0.0", "jest": "^29.0.0", "ts-jest": "^29.0.0", "typescript": "^5.7.0" } } ``` - [ ] **Step 4: Create `packages/search/tsconfig.json`** ```json { "extends": "../../tsconfig.base.json", "compilerOptions": { "outDir": "./dist", "rootDir": "./src" }, "include": ["src"] } ``` - [ ] **Step 5: Create `packages/search/jest.config.js`** ```js module.exports = { preset: 'ts-jest', testEnvironment: 'node', testMatch: ['**/*.test.ts'], rootDir: 'src', }; ``` - [ ] **Step 6: Install dependencies and run tests** ```bash cd /path/to/tower pnpm install pnpm --filter @tower/search test ``` Expected: 4 tests pass (MESSAGES_INDEX, configureIndex, indexMessage, deleteMessage). - [ ] **Step 7: Build** ```bash pnpm --filter @tower/search build ``` Expected: `packages/search/dist/` created, zero errors. - [ ] **Step 8: Commit** ```bash git add packages/search/ git commit -m "feat(search): add @tower/search package with Meilisearch client and helpers" ``` --- ### Task 3: Update `handleStarReaction` to return `ApprovalResult` **Files:** - Modify: `apps/worker/src/core/approval.ts` - Modify: `apps/worker/src/core/approval.test.ts` The current return type is `ForwardJobData[] | null`. Change it to `ApprovalResult | null` where `ApprovalResult` includes both the forward jobs and the data needed for Meilisearch indexing. This lets `main.ts` index the message without an extra DB query. - [ ] **Step 1: Update `approval.test.ts` first (TDD — tests fail before change)** Replace `apps/worker/src/core/approval.test.ts` entirely: ```typescript import { handleStarReaction } from './approval'; import { NormalizedReaction } from '@tower/types'; function makeReaction(overrides: Partial = {}): NormalizedReaction { return { reactorJid: '919876543210@s.whatsapp.net', targetMsgId: 'TARGET_MSG_123', sourceGroupJid: '120363043312345678@g.us', emoji: '⭐', accountId: 'acc_1', ...overrides, }; } const adminJids = ['919876543210@s.whatsapp.net']; // Minimal valid message returned by prisma.message.findUnique function makeMessage(overrides: object = {}) { return { id: 'msg_1', status: 'PENDING', approval: null, content: 'hello world', senderName: 'Alice', sourceGroupId: 'grp_1', tags: ['#important'], platform: 'whatsapp', sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] }, ...overrides, }; } function makePrisma(messageOverrides: object = {}, txCount = 1) { return { message: { findUnique: jest.fn().mockResolvedValue(makeMessage(messageOverrides)) }, $transaction: jest.fn().mockImplementation(async (fn: any) => fn({ message: { updateMany: jest.fn().mockResolvedValue({ count: txCount }) }, approval: { create: jest.fn().mockResolvedValue({}) }, }), ), } as any; } describe('handleStarReaction', () => { it('returns null for non-star emoji', async () => { expect(await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any)).toBeNull(); }); it('returns null when reactor is not an admin', async () => { expect( await handleStarReaction(makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any), ).toBeNull(); }); it('returns null when message not found', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any; expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); expect(prisma.message.findUnique).toHaveBeenCalledWith({ where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' } }, include: { approval: true, sourceGroup: { include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } } }, }, }, }); }); it('returns null when message status is not PENDING', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'REJECTED' })) }, } as any; expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); }); it('returns null when approval record already exists', async () => { const prisma = { message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'APPROVED', approval: { id: 'appr_1' } })), }, } as any; expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); }); it('returns null on double-approval race (updateMany count=0)', async () => { const result = await handleStarReaction(makeReaction(), adminJids, makePrisma({}, 0)); expect(result).toBeNull(); }); it('returns ApprovalResult with empty forwardJobs and valid indexDoc when no sync routes', async () => { const result = await handleStarReaction(makeReaction(), adminJids, makePrisma()); expect(result).not.toBeNull(); expect(result!.forwardJobs).toEqual([]); expect(result!.indexDoc).toMatchObject({ messageId: 'msg_1', content: 'hello world', senderName: 'Alice', sourceGroupId: 'grp_1', sourceGroupName: 'UP Parivar Dallas', tags: ['#important'], platform: 'whatsapp', }); expect(result!.indexDoc.approvedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/); }); it('returns ForwardJobData for each active sync route', async () => { const prisma = makePrisma({ content: 'important announcement', senderName: 'Bob', sourceGroup: { name: 'Source Group', syncRoutesFrom: [ { targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } }, { targetGroup: { platformId: '888@g.us', accountId: null } }, ], }, }); const result = await handleStarReaction(makeReaction(), adminJids, prisma); expect(result!.forwardJobs).toHaveLength(2); expect(result!.forwardJobs[0]).toMatchObject({ messageId: 'msg_1', content: 'important announcement', sourceGroupName: 'Source Group', senderName: 'Bob', toGroupJid: '999@g.us', fromAccountId: 'acc_2', }); expect(result!.forwardJobs[1]).toMatchObject({ toGroupJid: '888@g.us', fromAccountId: 'acc_1', // falls back to reaction.accountId }); }); }); ``` - [ ] **Step 2: Run tests — expect 2 failures** (the two tests that check `result.forwardJobs` and `result.indexDoc`) ```bash pnpm --filter @tower/worker test -- --testPathPattern=approval ``` Expected: `FAIL` on the last two tests. - [ ] **Step 3: Update `apps/worker/src/core/approval.ts`** ```typescript import { NormalizedReaction, ForwardJobData, IndexJobData } from '@tower/types'; export interface ApprovalResult { forwardJobs: ForwardJobData[]; indexDoc: IndexJobData; } export async function handleStarReaction( reaction: NormalizedReaction, adminJids: string[], prisma: any, ): Promise { if (reaction.emoji !== '⭐') return null; if (!adminJids.includes(reaction.reactorJid)) return null; const message = await prisma.message.findUnique({ where: { platform_platformMsgId: { // TODO: derive platform from NormalizedReaction when multi-platform support is added platform: 'whatsapp', platformMsgId: reaction.targetMsgId, }, }, include: { approval: true, sourceGroup: { include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } }, }, }, }, }); if (!message) return null; if (message.status !== 'PENDING') return null; if (message.approval) return null; let approved = false; await prisma.$transaction(async (tx: any) => { const updated = await tx.message.updateMany({ where: { id: message.id, status: 'PENDING' }, data: { status: 'APPROVED' }, }); if (updated.count === 0) return; approved = true; await tx.approval.create({ data: { messageId: message.id, adminId: reaction.reactorJid, decision: 'APPROVED', }, }); }); if (!approved) return null; const forwardJobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom .filter((route: any) => route.targetGroup != null) .map((route: any) => ({ messageId: message.id, content: message.content, sourceGroupName: message.sourceGroup.name, senderName: message.senderName ?? undefined, toGroupJid: route.targetGroup.platformId, fromAccountId: route.targetGroup.accountId ?? reaction.accountId, })); const indexDoc: IndexJobData = { messageId: message.id, content: message.content, senderName: message.senderName ?? null, sourceGroupId: message.sourceGroupId, sourceGroupName: message.sourceGroup.name, tags: message.tags, platform: message.platform, approvedAt: new Date().toISOString(), }; return { forwardJobs, indexDoc }; } ``` - [ ] **Step 4: Run tests — all pass** ```bash pnpm --filter @tower/worker test -- --testPathPattern=approval ``` Expected: 8 tests pass. - [ ] **Step 5: Commit** ```bash git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts git commit -m "feat(worker): handleStarReaction returns ApprovalResult with indexDoc" ``` --- ### Task 4: Worker — index queue, processor, and wire `main.ts` **Files:** - Create: `apps/worker/src/queues/index.queue.ts` - Create: `apps/worker/src/queues/index.processor.ts` - Create: `apps/worker/src/queues/index.processor.test.ts` - Modify: `apps/worker/package.json` — add `@tower/search: workspace:*` - Modify: `apps/worker/src/main.ts` — create meiliClient, configureIndex, indexQueue, indexWorker, update reaction handler, update shutdown - [ ] **Step 1: Add `@tower/search` to worker dependencies** Edit `apps/worker/package.json` — add to the `"dependencies"` object: ```json "@tower/search": "workspace:*" ``` Then install: ```bash pnpm install ``` - [ ] **Step 2: Write the failing processor test** Create `apps/worker/src/queues/index.processor.test.ts`: ```typescript import { processIndexJob } from './index.processor'; import { indexMessage } from '@tower/search'; import { IndexJobData } from '@tower/types'; jest.mock('@tower/search', () => ({ indexMessage: jest.fn().mockResolvedValue(undefined), MESSAGES_INDEX: 'tower-messages', })); function makeJob(overrides: Partial = {}): IndexJobData { return { messageId: 'msg-1', content: 'hello world', senderName: 'Alice', sourceGroupId: 'grp-1', sourceGroupName: 'UP Parivar', tags: ['#important'], platform: 'whatsapp', approvedAt: '2026-05-27T10:00:00.000Z', ...overrides, }; } describe('processIndexJob', () => { beforeEach(() => jest.clearAllMocks()); it('calls indexMessage with MeiliDocument shape', async () => { const mockClient = {} as any; await processIndexJob(makeJob(), mockClient); expect(indexMessage).toHaveBeenCalledWith(mockClient, { id: 'msg-1', content: 'hello world', senderName: 'Alice', sourceGroupId: 'grp-1', sourceGroupName: 'UP Parivar', tags: ['#important'], platform: 'whatsapp', approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(), }); }); it('converts null senderName to empty string', async () => { const mockClient = {} as any; await processIndexJob(makeJob({ senderName: null }), mockClient); expect(indexMessage).toHaveBeenCalledWith( mockClient, expect.objectContaining({ senderName: '' }), ); }); it('converts approvedAt ISO string to Unix ms number', async () => { const mockClient = {} as any; await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient); expect(indexMessage).toHaveBeenCalledWith( mockClient, expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }), ); }); }); ``` - [ ] **Step 3: Run test — expect failure** ```bash pnpm --filter @tower/worker test -- --testPathPattern=index.processor ``` Expected: `FAIL` — `Cannot find module './index.processor'`. - [ ] **Step 4: Create `apps/worker/src/queues/index.queue.ts`** ```typescript import { Queue } from 'bullmq'; import { IndexJobData } from '@tower/types'; import { parseRedisUrl } from './redis-connection'; export function createIndexQueue(redisUrl: string): Queue { return new Queue('tower-index', { connection: parseRedisUrl(redisUrl) }); } ``` - [ ] **Step 5: Create `apps/worker/src/queues/index.processor.ts`** ```typescript import { Worker } from 'bullmq'; import { IndexJobData } from '@tower/types'; import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search'; import { parseRedisUrl } from './redis-connection'; export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise { const doc: MeiliDocument = { id: job.messageId, content: job.content, senderName: job.senderName ?? '', sourceGroupId: job.sourceGroupId, sourceGroupName: job.sourceGroupName, tags: job.tags, platform: job.platform, approvedAt: new Date(job.approvedAt).getTime(), }; await indexMessage(meiliClient, doc); } export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker { return new Worker( 'tower-index', async (job) => processIndexJob(job.data, meiliClient), { connection: parseRedisUrl(redisUrl) }, ); } ``` - [ ] **Step 6: Run tests — all pass** ```bash pnpm --filter @tower/worker test -- --testPathPattern=index.processor ``` Expected: 3 tests pass. - [ ] **Step 7: Update `apps/worker/src/main.ts`** Replace the entire file with: ```typescript import { PrismaClient } from '@prisma/client'; import { createLogger } from '@tower/logger'; import { validateEnv } from '@tower/config'; import { createMeiliClient, configureIndex } from '@tower/search'; import { createIngestQueue } from './queues/ingest.queue'; import { createIngestWorker } from './queues/ingest.processor'; import { createForwardQueue } from './queues/forward.queue'; import { createForwardWorker } from './queues/forward.processor'; import { createIndexQueue } from './queues/index.queue'; import { createIndexWorker } from './queues/index.processor'; import { WhatsAppSessionPool } from './whatsapp/session-pool'; import { detectTags, isFlagged } from './whatsapp/tag-detector'; import { syncGroups } from './whatsapp/group-sync'; import { handleStarReaction } from './core/approval'; const logger = createLogger('tower-worker'); async function bootstrap() { const env = validateEnv(); const prisma = new PrismaClient(); await prisma.$connect(); const adminJids = env.TOWER_ADMIN_JIDS ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) : []; const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY); await configureIndex(meiliClient).catch((err) => logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'), ); const ingestQueue = createIngestQueue(env.REDIS_URL); const forwardQueue = createForwardQueue(env.REDIS_URL); const indexQueue = createIndexQueue(env.REDIS_URL); const pool = new WhatsAppSessionPool(); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); const forwardWorker = createForwardWorker(env.REDIS_URL, pool); const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient); ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed')); indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed')); const accounts = await prisma.account.findMany({ where: { status: 'ACTIVE', platform: 'whatsapp' }, }); if (accounts.length === 0) { logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); } const groupMaps = new Map>(); for (const account of accounts) { groupMaps.set(account.id, new Map()); try { await pool.add( account.id, account.sessionPath, async (msg, accountId) => { const tags = detectTags(msg.content, msg.senderJid, adminJids); if (!isFlagged(tags)) return; const groupMap = groupMaps.get(accountId); if (!groupMap) { logger.error({ accountId }, 'No group map for account — message dropped'); return; } const sourceGroupId = groupMap.get(msg.sourceGroupJid); if (!sourceGroupId) { logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); return; } await ingestQueue.add( 'ingest', { platformMsgId: msg.platformMsgId, platform: 'whatsapp', accountId, sourceGroupId, senderJid: msg.senderJid, senderName: msg.senderName, content: msg.content, tags, }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, ); logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); }, async (reaction) => { const result = await handleStarReaction(reaction, adminJids, prisma); if (!result) return; const { forwardJobs, indexDoc } = result; await indexQueue.add('index', indexDoc, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, }); for (const job of forwardJobs) { await forwardQueue.add('forward', job, { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, }); } logger.info( { messageId: indexDoc.messageId, forwardCount: forwardJobs.length }, 'Message approved — indexed and forwarded', ); }, async (groups, accountId) => { logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); const map = await syncGroups(groups, accountId, prisma); groupMaps.set(accountId, map); }, ); } catch (err) { logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account'); } } logger.info({ accountCount: accounts.length }, 'Tower worker ready'); const shutdown = async () => { logger.info('Shutting down...'); await pool.closeAll(); await ingestWorker.close(); await forwardWorker.close(); await indexWorker.close(); await ingestQueue.close(); await forwardQueue.close(); await indexQueue.close(); await prisma.$disconnect(); process.exit(0); }; process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); } bootstrap().catch((err) => { console.error('Worker failed to start', err); process.exit(1); }); ``` - [ ] **Step 8: Run all worker tests and build** ```bash pnpm --filter @tower/worker test pnpm --filter @tower/worker build ``` Expected: All tests pass (the `approval.test.ts` changes in Task 3 must already be committed), zero build errors. - [ ] **Step 9: Commit** ```bash git add apps/worker/src/queues/index.queue.ts \ apps/worker/src/queues/index.processor.ts \ apps/worker/src/queues/index.processor.test.ts \ apps/worker/src/main.ts \ apps/worker/package.json \ pnpm-lock.yaml git commit -m "feat(worker): add index queue and wire Meilisearch indexing after approval" ``` --- ### Task 5: API — SearchModule **Files:** - Create: `apps/api/src/modules/search/search.module.ts` - Create: `apps/api/src/modules/search/search.service.ts` - Create: `apps/api/src/modules/search/search.controller.ts` - Create: `apps/api/src/modules/search/search.controller.spec.ts` - Create: `apps/api/src/modules/search/search.service.spec.ts` - Modify: `apps/api/package.json` — add `@tower/search: workspace:*` - Modify: `apps/api/src/app.module.ts` — register `SearchModule` - [ ] **Step 1: Add `@tower/search` to API dependencies** Edit `apps/api/package.json` — add to `"dependencies"`: ```json "@tower/search": "workspace:*" ``` Then: ```bash pnpm install ``` - [ ] **Step 2: Write failing controller spec** Create `apps/api/src/modules/search/search.controller.spec.ts`: ```typescript import { Test, TestingModule } from '@nestjs/testing'; import { SearchController } from './search.controller'; import { SearchService } from './search.service'; const mockSearchService = { search: jest.fn() }; describe('SearchController', () => { let controller: SearchController; beforeEach(async () => { jest.clearAllMocks(); const module: TestingModule = await Test.createTestingModule({ controllers: [SearchController], providers: [{ provide: SearchService, useValue: mockSearchService }], }).compile(); controller = module.get(SearchController); }); it('calls service with all parsed params', async () => { mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 2, limit: 10, query: 'hello' }); await controller.search('hello', 'grp-1', 'important,event', '2', '10'); expect(mockSearchService.search).toHaveBeenCalledWith('hello', 'grp-1', ['important', 'event'], 2, 10); }); it('defaults page to 1 and limit to 20 when not provided', async () => { mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' }); await controller.search(''); expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, undefined, 1, 20); }); it('returns the service result directly', async () => { const expected = { hits: [{ id: 'msg-1' }], total: 1, page: 1, limit: 20, query: 'test' }; mockSearchService.search.mockResolvedValue(expected); const result = await controller.search('test'); expect(result).toEqual(expected); }); it('splits tags on comma and trims whitespace', async () => { mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' }); await controller.search('', undefined, ' important , event '); expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, ['important', 'event'], 1, 20); }); }); ``` - [ ] **Step 3: Write failing service spec** Create `apps/api/src/modules/search/search.service.spec.ts`: ```typescript import { Test, TestingModule } from '@nestjs/testing'; import { ConfigService } from '@nestjs/config'; import { SearchService } from './search.service'; import * as searchPkg from '@tower/search'; jest.mock('@tower/search', () => ({ createMeiliClient: jest.fn(), configureIndex: jest.fn().mockResolvedValue(undefined), MESSAGES_INDEX: 'tower-messages', })); describe('SearchService', () => { let service: SearchService; const mockSearch = jest.fn(); const mockIndex = jest.fn().mockReturnValue({ search: mockSearch }); const mockClient = { index: mockIndex }; beforeEach(async () => { jest.clearAllMocks(); (searchPkg.createMeiliClient as jest.Mock).mockReturnValue(mockClient); const module: TestingModule = await Test.createTestingModule({ providers: [ SearchService, { provide: ConfigService, useValue: { get: jest.fn().mockReturnValue('') } }, ], }).compile(); service = module.get(SearchService); await service.onModuleInit(); }); it('calls configureIndex on init', () => { expect(searchPkg.configureIndex).toHaveBeenCalledWith(mockClient); }); it('returns hits and total', async () => { mockSearch.mockResolvedValue({ hits: [{ id: 'msg-1', content: 'hello' }], totalHits: 1 }); const result = await service.search('hello'); expect(result.hits).toHaveLength(1); expect(result.total).toBe(1); expect(result.query).toBe('hello'); }); it('searches with no filter when no groupId or tags', async () => { mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); await service.search('test'); expect(mockSearch).toHaveBeenCalledWith('test', expect.objectContaining({ filter: undefined })); }); it('applies sourceGroupId filter', async () => { mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); await service.search('hello', 'grp-1'); expect(mockSearch).toHaveBeenCalledWith( 'hello', expect.objectContaining({ filter: 'sourceGroupId = "grp-1"' }), ); }); it('applies tags filter', async () => { mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); await service.search('hello', undefined, ['#important']); expect(mockSearch).toHaveBeenCalledWith( 'hello', expect.objectContaining({ filter: 'tags = "#important"' }), ); }); it('combines groupId and tags filters with AND', async () => { mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); await service.search('hello', 'grp-1', ['#important', '#event']); expect(mockSearch).toHaveBeenCalledWith( 'hello', expect.objectContaining({ filter: 'sourceGroupId = "grp-1" AND tags = "#important" AND tags = "#event"', }), ); }); it('defaults page to 1 and hitsPerPage to 20', async () => { mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); await service.search('hello'); expect(mockSearch).toHaveBeenCalledWith( 'hello', expect.objectContaining({ page: 1, hitsPerPage: 20 }), ); }); }); ``` - [ ] **Step 4: Run specs — expect failure** ```bash pnpm --filter @tower/api test ``` Expected: `FAIL` — modules not found. - [ ] **Step 5: Create `apps/api/src/modules/search/search.service.ts`** ```typescript import { Injectable, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { MeiliSearch, MeiliDocument, MESSAGES_INDEX, createMeiliClient, configureIndex, } from '@tower/search'; @Injectable() export class SearchService implements OnModuleInit { private readonly client: MeiliSearch; constructor(private readonly config: ConfigService) { this.client = createMeiliClient( this.config.get('MEILI_URL', 'http://localhost:7700'), this.config.get('MEILI_MASTER_KEY', ''), ); } async onModuleInit(): Promise { await configureIndex(this.client).catch((err) => console.warn('Failed to configure Meilisearch index:', err), ); } async search( query: string, groupId?: string, tags?: string[], page = 1, limit = 20, ): Promise<{ hits: MeiliDocument[]; total: number; page: number; limit: number; query: string }> { const filters: string[] = []; if (groupId) filters.push(`sourceGroupId = "${groupId}"`); if (tags?.length) filters.push(...tags.map((t) => `tags = "${t}"`)); const result = await this.client.index(MESSAGES_INDEX).search(query, { filter: filters.length ? filters.join(' AND ') : undefined, page, hitsPerPage: limit, sort: ['approvedAt:desc'], }); return { hits: result.hits as MeiliDocument[], total: result.totalHits ?? 0, page, limit, query, }; } } ``` - [ ] **Step 6: Create `apps/api/src/modules/search/search.controller.ts`** ```typescript import { Controller, Get, Query } from '@nestjs/common'; import { SearchService } from './search.service'; @Controller('search') export class SearchController { constructor(private readonly searchService: SearchService) {} @Get() search( @Query('q') q = '', @Query('groupId') groupId?: string, @Query('tags') tags?: string, @Query('page') page = '1', @Query('limit') limit = '20', ) { const tagList = tags ? tags.split(',').map((t) => t.trim()).filter(Boolean) : undefined; return this.searchService.search(q, groupId, tagList, Number(page), Number(limit)); } } ``` - [ ] **Step 7: Create `apps/api/src/modules/search/search.module.ts`** ```typescript import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { SearchController } from './search.controller'; import { SearchService } from './search.service'; @Module({ imports: [ConfigModule], controllers: [SearchController], providers: [SearchService], }) export class SearchModule {} ``` - [ ] **Step 8: Register SearchModule in `apps/api/src/app.module.ts`** ```typescript import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { PrismaModule } from './prisma/prisma.module'; import { HealthModule } from './modules/health/health.module'; import { SearchModule } from './modules/search/search.module'; @Module({ imports: [ ConfigModule.forRoot({ isGlobal: true }), PrismaModule, HealthModule, SearchModule, ], }) export class AppModule {} ``` - [ ] **Step 9: Run all API tests and build** ```bash pnpm --filter @tower/api test pnpm --filter @tower/api build ``` Expected: All specs pass (health controller + search controller + search service), zero build errors. - [ ] **Step 10: Commit** ```bash git add apps/api/src/modules/search/ \ apps/api/src/app.module.ts \ apps/api/package.json \ pnpm-lock.yaml git commit -m "feat(api): add SearchModule with GET /search endpoint backed by Meilisearch" ``` --- ## Self-Review **Spec coverage:** - ✅ Approved messages indexed in Meilisearch — Task 4 (index queue + processor) - ✅ Index stays in sync with approvals — Task 3 wires indexDoc into approval result, Task 4 enqueues it - ✅ Full-text search endpoint — Task 5 `GET /search` - ✅ Filterable by group and tags — `SearchService.search` builds Meilisearch filter string - ✅ Paginated results — `page` + `limit` query params, `hitsPerPage` in Meilisearch query - ✅ Index configured with correct attributes — `configureIndex` called on both worker startup and API `onModuleInit` - ✅ Messages indexed even with no sync routes — `main.ts` always enqueues index job regardless of `forwardJobs.length` **Placeholder scan:** None found. All code steps contain complete implementations. **Type consistency:** - `IndexJobData.approvedAt` is `string` (ISO) throughout types and approval.ts - `MeiliDocument.approvedAt` is `number` (Unix ms) — conversion happens in `processIndexJob` - `handleStarReaction` returns `ApprovalResult | null` consistently across approval.ts and approval.test.ts - `result.forwardJobs` and `result.indexDoc` used consistently in tests and main.ts