From 1d6e1fb4daabc958142adf82dc320cfbac846b47 Mon Sep 17 00:00:00 2001 From: maaz519 Date: Wed, 27 May 2026 23:53:32 +0530 Subject: [PATCH] fix(search): tighten mock type cast for better type safety Replace `as any` cast with `as unknown as ReturnType` in the mock client factory. This preserves type safety without requiring the mock to implement the full SDK interface. Co-Authored-By: Claude Sonnet 4.6 --- .claude/settings.local.json | 6 +- .../plans/2026-05-27-archive-and-search.md | 1219 +++++++++++++++++ packages/search/src/index.test.ts | 3 +- pnpm-lock.yaml | 37 +- 4 files changed, 1258 insertions(+), 7 deletions(-) create mode 100644 docs/superpowers/plans/2026-05-27-archive-and-search.md diff --git a/.claude/settings.local.json b/.claude/settings.local.json index ff7ea1b..61369e3 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -69,7 +69,11 @@ "Bash(npx jest *)", "Bash(pnpm --filter @tower/worker test -- --testPathPattern=forward)", "Bash(pnpm --filter @tower/worker test -- forward.processor.test.ts)", - "Bash(npm test *)" + "Bash(npm test *)", + "Bash(grep -v \"^$\")", + "Bash(npm info *)", + "Bash(pnpm --filter @tower/search test)", + "Bash(pnpm --filter @tower/search build)" ] } } diff --git a/docs/superpowers/plans/2026-05-27-archive-and-search.md b/docs/superpowers/plans/2026-05-27-archive-and-search.md new file mode 100644 index 0000000..952668b --- /dev/null +++ b/docs/superpowers/plans/2026-05-27-archive-and-search.md @@ -0,0 +1,1219 @@ +# Archive & Search Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Index every ⭐-approved message into Meilisearch and expose a paginated full-text search endpoint on the API. + +**Architecture:** A shared `@tower/search` package owns the Meilisearch client factory, index name constant, settings configuration, and document type — consumed by both the worker (indexing) and the API (querying). `handleStarReaction` is extended to return an `ApprovalResult` containing both forward jobs and an `IndexJobData` document. The worker enqueues that document into a BullMQ `tower-index` queue immediately after approval; a processor converts it to a `MeiliDocument` (Unix ms timestamp) and calls `indexMessage`. The NestJS API gets a `SearchModule` with a `GET /search` endpoint that filters by `sourceGroupId` and `tags`. + +**Tech Stack:** Meilisearch v1.11 (already in docker-compose), `meilisearch` npm SDK ^0.44.0, BullMQ 5, NestJS 11, `@nestjs/config`, TypeScript 5, Jest 29 + +--- + +## File Map + +**Create:** +- `packages/search/package.json` — `@tower/search` package manifest +- `packages/search/tsconfig.json` — extends base tsconfig +- `packages/search/jest.config.js` — ts-jest preset +- `packages/search/src/index.ts` — `MeiliDocument`, `MESSAGES_INDEX`, `createMeiliClient`, `configureIndex`, `indexMessage`, `deleteMessage`, re-export `MeiliSearch` type +- `packages/search/src/index.test.ts` — unit tests for all exports +- `apps/worker/src/queues/index.queue.ts` — BullMQ Queue for `IndexJobData` +- `apps/worker/src/queues/index.processor.ts` — Worker that calls `indexMessage` +- `apps/worker/src/queues/index.processor.test.ts` — unit tests +- `apps/api/src/modules/search/search.module.ts` +- `apps/api/src/modules/search/search.service.ts` — wraps Meilisearch, `onModuleInit` calls `configureIndex` +- `apps/api/src/modules/search/search.controller.ts` — `GET /search` +- `apps/api/src/modules/search/search.controller.spec.ts` +- `apps/api/src/modules/search/search.service.spec.ts` + +**Modify:** +- `packages/types/src/message.ts` — add `IndexJobData` interface +- `apps/worker/src/core/approval.ts` — new `ApprovalResult` return type; build `indexDoc` +- `apps/worker/src/core/approval.test.ts` — update assertions to `result.forwardJobs` / `result.indexDoc`; add missing mock fields +- `apps/worker/src/main.ts` — create meiliClient + indexQueue + indexWorker; call `configureIndex`; enqueue index jobs after approval +- `apps/worker/package.json` — add `@tower/search: workspace:*` +- `apps/api/package.json` — add `@tower/search: workspace:*` +- `apps/api/src/app.module.ts` — import `SearchModule` + +--- + +### Task 1: Add `IndexJobData` to `@tower/types` + +**Files:** +- Modify: `packages/types/src/message.ts` + +- [ ] **Step 1: Write the failing test** + +Add to end of `packages/types/src/message.ts` — first verify the type compiles by adding a test file. Actually `@tower/types` has no test runner; type correctness is verified at build time. Skip to Step 2. + +- [ ] **Step 2: Add `IndexJobData` to `packages/types/src/message.ts`** + +Append after the `ForwardJobData` interface (keep every existing line intact): + +```typescript +export interface IndexJobData { + messageId: string; // DB Message.id (cuid) + content: string; + senderName: string | null; + sourceGroupId: string; // DB Group.id + sourceGroupName: string; + tags: string[]; + platform: string; + approvedAt: string; // ISO 8601 — converted to Unix ms in the processor +} +``` + +- [ ] **Step 3: Build the package to verify no compile errors** + +```bash +cd /path/to/tower +pnpm --filter @tower/types build +``` + +Expected: `dist/` updated, zero errors. + +- [ ] **Step 4: Commit** + +```bash +git add packages/types/src/message.ts +git commit -m "feat(types): add IndexJobData for Meilisearch indexing" +``` + +--- + +### Task 2: Create `packages/search` package + +**Files:** +- Create: `packages/search/package.json` +- Create: `packages/search/tsconfig.json` +- Create: `packages/search/jest.config.js` +- Create: `packages/search/src/index.ts` +- Create: `packages/search/src/index.test.ts` + +- [ ] **Step 1: Write the failing tests** + +Create `packages/search/src/index.test.ts`: + +```typescript +import { + MESSAGES_INDEX, + configureIndex, + indexMessage, + deleteMessage, + MeiliDocument, +} from './index'; + +function makeMockClient() { + const mockUpdateSettings = jest.fn().mockResolvedValue({}); + const mockAddDocuments = jest.fn().mockResolvedValue({}); + const mockDeleteDocument = jest.fn().mockResolvedValue({}); + const mockIndex = jest.fn().mockReturnValue({ + updateSettings: mockUpdateSettings, + addDocuments: mockAddDocuments, + deleteDocument: mockDeleteDocument, + }); + return { client: { index: mockIndex } as any, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument }; +} + +describe('MESSAGES_INDEX', () => { + it('is the string tower-messages', () => { + expect(MESSAGES_INDEX).toBe('tower-messages'); + }); +}); + +describe('configureIndex', () => { + it('sets searchable, filterable, and sortable attributes on the correct index', async () => { + const { client, mockIndex, mockUpdateSettings } = makeMockClient(); + await configureIndex(client); + expect(mockIndex).toHaveBeenCalledWith('tower-messages'); + expect(mockUpdateSettings).toHaveBeenCalledWith({ + searchableAttributes: ['content', 'senderName', 'sourceGroupName'], + filterableAttributes: ['sourceGroupId', 'tags', 'platform'], + sortableAttributes: ['approvedAt'], + }); + }); +}); + +describe('indexMessage', () => { + it('adds the document to the messages index', async () => { + const { client, mockIndex, mockAddDocuments } = makeMockClient(); + const doc: MeiliDocument = { + id: 'msg-1', + content: 'Hello community', + senderName: 'Alice', + sourceGroupId: 'grp-1', + sourceGroupName: 'UP Parivar', + tags: ['#important'], + platform: 'whatsapp', + approvedAt: 1716825600000, + }; + await indexMessage(client, doc); + expect(mockIndex).toHaveBeenCalledWith('tower-messages'); + expect(mockAddDocuments).toHaveBeenCalledWith([doc]); + }); +}); + +describe('deleteMessage', () => { + it('deletes document by id from the messages index', async () => { + const { client, mockIndex, mockDeleteDocument } = makeMockClient(); + await deleteMessage(client, 'msg-1'); + expect(mockIndex).toHaveBeenCalledWith('tower-messages'); + expect(mockDeleteDocument).toHaveBeenCalledWith('msg-1'); + }); +}); +``` + +- [ ] **Step 2: Create `packages/search/src/index.ts`** + +```typescript +import { MeiliSearch } from 'meilisearch'; + +export { MeiliSearch } from 'meilisearch'; + +export interface MeiliDocument { + id: string; // DB Message.id + content: string; + senderName: string; // empty string when null + sourceGroupId: string; + sourceGroupName: string; + tags: string[]; + platform: string; + approvedAt: number; // Unix ms — Meilisearch sorts numbers, not ISO strings +} + +export const MESSAGES_INDEX = 'tower-messages'; + +export function createMeiliClient(url: string, masterKey: string): MeiliSearch { + return new MeiliSearch({ host: url, apiKey: masterKey }); +} + +export async function configureIndex(client: MeiliSearch): Promise { + await client.index(MESSAGES_INDEX).updateSettings({ + searchableAttributes: ['content', 'senderName', 'sourceGroupName'], + filterableAttributes: ['sourceGroupId', 'tags', 'platform'], + sortableAttributes: ['approvedAt'], + }); +} + +export async function indexMessage(client: MeiliSearch, doc: MeiliDocument): Promise { + await client.index(MESSAGES_INDEX).addDocuments([doc]); +} + +export async function deleteMessage(client: MeiliSearch, id: string): Promise { + await client.index(MESSAGES_INDEX).deleteDocument(id); +} +``` + +- [ ] **Step 3: Create `packages/search/package.json`** + +```json +{ + "name": "@tower/search", + "version": "0.0.1", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + }, + "scripts": { + "build": "tsc", + "dev": "tsc --watch", + "test": "jest" + }, + "dependencies": { + "meilisearch": "^0.44.0" + }, + "devDependencies": { + "@types/jest": "^29.0.0", + "@types/node": "^22.0.0", + "jest": "^29.0.0", + "ts-jest": "^29.0.0", + "typescript": "^5.7.0" + } +} +``` + +- [ ] **Step 4: Create `packages/search/tsconfig.json`** + +```json +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src" + }, + "include": ["src"] +} +``` + +- [ ] **Step 5: Create `packages/search/jest.config.js`** + +```js +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + testMatch: ['**/*.test.ts'], + rootDir: 'src', +}; +``` + +- [ ] **Step 6: Install dependencies and run tests** + +```bash +cd /path/to/tower +pnpm install +pnpm --filter @tower/search test +``` + +Expected: 4 tests pass (MESSAGES_INDEX, configureIndex, indexMessage, deleteMessage). + +- [ ] **Step 7: Build** + +```bash +pnpm --filter @tower/search build +``` + +Expected: `packages/search/dist/` created, zero errors. + +- [ ] **Step 8: Commit** + +```bash +git add packages/search/ +git commit -m "feat(search): add @tower/search package with Meilisearch client and helpers" +``` + +--- + +### Task 3: Update `handleStarReaction` to return `ApprovalResult` + +**Files:** +- Modify: `apps/worker/src/core/approval.ts` +- Modify: `apps/worker/src/core/approval.test.ts` + +The current return type is `ForwardJobData[] | null`. Change it to `ApprovalResult | null` where `ApprovalResult` includes both the forward jobs and the data needed for Meilisearch indexing. This lets `main.ts` index the message without an extra DB query. + +- [ ] **Step 1: Update `approval.test.ts` first (TDD — tests fail before change)** + +Replace `apps/worker/src/core/approval.test.ts` entirely: + +```typescript +import { handleStarReaction } from './approval'; +import { NormalizedReaction } from '@tower/types'; + +function makeReaction(overrides: Partial = {}): NormalizedReaction { + return { + reactorJid: '919876543210@s.whatsapp.net', + targetMsgId: 'TARGET_MSG_123', + sourceGroupJid: '120363043312345678@g.us', + emoji: '⭐', + accountId: 'acc_1', + ...overrides, + }; +} + +const adminJids = ['919876543210@s.whatsapp.net']; + +// Minimal valid message returned by prisma.message.findUnique +function makeMessage(overrides: object = {}) { + return { + id: 'msg_1', + status: 'PENDING', + approval: null, + content: 'hello world', + senderName: 'Alice', + sourceGroupId: 'grp_1', + tags: ['#important'], + platform: 'whatsapp', + sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] }, + ...overrides, + }; +} + +function makePrisma(messageOverrides: object = {}, txCount = 1) { + return { + message: { findUnique: jest.fn().mockResolvedValue(makeMessage(messageOverrides)) }, + $transaction: jest.fn().mockImplementation(async (fn: any) => + fn({ + message: { updateMany: jest.fn().mockResolvedValue({ count: txCount }) }, + approval: { create: jest.fn().mockResolvedValue({}) }, + }), + ), + } as any; +} + +describe('handleStarReaction', () => { + it('returns null for non-star emoji', async () => { + expect(await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any)).toBeNull(); + }); + + it('returns null when reactor is not an admin', async () => { + expect( + await handleStarReaction(makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any), + ).toBeNull(); + }); + + it('returns null when message not found', async () => { + const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any; + expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); + expect(prisma.message.findUnique).toHaveBeenCalledWith({ + where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' } }, + include: { + approval: true, + sourceGroup: { + include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } } }, + }, + }, + }); + }); + + it('returns null when message status is not PENDING', async () => { + const prisma = { + message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'REJECTED' })) }, + } as any; + expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); + }); + + it('returns null when approval record already exists', async () => { + const prisma = { + message: { + findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'APPROVED', approval: { id: 'appr_1' } })), + }, + } as any; + expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); + }); + + it('returns null on double-approval race (updateMany count=0)', async () => { + const result = await handleStarReaction(makeReaction(), adminJids, makePrisma({}, 0)); + expect(result).toBeNull(); + }); + + it('returns ApprovalResult with empty forwardJobs and valid indexDoc when no sync routes', async () => { + const result = await handleStarReaction(makeReaction(), adminJids, makePrisma()); + expect(result).not.toBeNull(); + expect(result!.forwardJobs).toEqual([]); + expect(result!.indexDoc).toMatchObject({ + messageId: 'msg_1', + content: 'hello world', + senderName: 'Alice', + sourceGroupId: 'grp_1', + sourceGroupName: 'UP Parivar Dallas', + tags: ['#important'], + platform: 'whatsapp', + }); + expect(result!.indexDoc.approvedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it('returns ForwardJobData for each active sync route', async () => { + const prisma = makePrisma({ + content: 'important announcement', + senderName: 'Bob', + sourceGroup: { + name: 'Source Group', + syncRoutesFrom: [ + { targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } }, + { targetGroup: { platformId: '888@g.us', accountId: null } }, + ], + }, + }); + + const result = await handleStarReaction(makeReaction(), adminJids, prisma); + expect(result!.forwardJobs).toHaveLength(2); + expect(result!.forwardJobs[0]).toMatchObject({ + messageId: 'msg_1', + content: 'important announcement', + sourceGroupName: 'Source Group', + senderName: 'Bob', + toGroupJid: '999@g.us', + fromAccountId: 'acc_2', + }); + expect(result!.forwardJobs[1]).toMatchObject({ + toGroupJid: '888@g.us', + fromAccountId: 'acc_1', // falls back to reaction.accountId + }); + }); +}); +``` + +- [ ] **Step 2: Run tests — expect 2 failures** (the two tests that check `result.forwardJobs` and `result.indexDoc`) + +```bash +pnpm --filter @tower/worker test -- --testPathPattern=approval +``` + +Expected: `FAIL` on the last two tests. + +- [ ] **Step 3: Update `apps/worker/src/core/approval.ts`** + +```typescript +import { NormalizedReaction, ForwardJobData, IndexJobData } from '@tower/types'; + +export interface ApprovalResult { + forwardJobs: ForwardJobData[]; + indexDoc: IndexJobData; +} + +export async function handleStarReaction( + reaction: NormalizedReaction, + adminJids: string[], + prisma: any, +): Promise { + if (reaction.emoji !== '⭐') return null; + if (!adminJids.includes(reaction.reactorJid)) return null; + + const message = await prisma.message.findUnique({ + where: { + platform_platformMsgId: { + // TODO: derive platform from NormalizedReaction when multi-platform support is added + platform: 'whatsapp', + platformMsgId: reaction.targetMsgId, + }, + }, + include: { + approval: true, + sourceGroup: { + include: { + syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } }, + }, + }, + }, + }); + + if (!message) return null; + if (message.status !== 'PENDING') return null; + if (message.approval) return null; + + let approved = false; + await prisma.$transaction(async (tx: any) => { + const updated = await tx.message.updateMany({ + where: { id: message.id, status: 'PENDING' }, + data: { status: 'APPROVED' }, + }); + if (updated.count === 0) return; + approved = true; + await tx.approval.create({ + data: { + messageId: message.id, + adminId: reaction.reactorJid, + decision: 'APPROVED', + }, + }); + }); + + if (!approved) return null; + + const forwardJobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom + .filter((route: any) => route.targetGroup != null) + .map((route: any) => ({ + messageId: message.id, + content: message.content, + sourceGroupName: message.sourceGroup.name, + senderName: message.senderName ?? undefined, + toGroupJid: route.targetGroup.platformId, + fromAccountId: route.targetGroup.accountId ?? reaction.accountId, + })); + + const indexDoc: IndexJobData = { + messageId: message.id, + content: message.content, + senderName: message.senderName ?? null, + sourceGroupId: message.sourceGroupId, + sourceGroupName: message.sourceGroup.name, + tags: message.tags, + platform: message.platform, + approvedAt: new Date().toISOString(), + }; + + return { forwardJobs, indexDoc }; +} +``` + +- [ ] **Step 4: Run tests — all pass** + +```bash +pnpm --filter @tower/worker test -- --testPathPattern=approval +``` + +Expected: 8 tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts +git commit -m "feat(worker): handleStarReaction returns ApprovalResult with indexDoc" +``` + +--- + +### Task 4: Worker — index queue, processor, and wire `main.ts` + +**Files:** +- Create: `apps/worker/src/queues/index.queue.ts` +- Create: `apps/worker/src/queues/index.processor.ts` +- Create: `apps/worker/src/queues/index.processor.test.ts` +- Modify: `apps/worker/package.json` — add `@tower/search: workspace:*` +- Modify: `apps/worker/src/main.ts` — create meiliClient, configureIndex, indexQueue, indexWorker, update reaction handler, update shutdown + +- [ ] **Step 1: Add `@tower/search` to worker dependencies** + +Edit `apps/worker/package.json` — add to the `"dependencies"` object: + +```json +"@tower/search": "workspace:*" +``` + +Then install: + +```bash +pnpm install +``` + +- [ ] **Step 2: Write the failing processor test** + +Create `apps/worker/src/queues/index.processor.test.ts`: + +```typescript +import { processIndexJob } from './index.processor'; +import { indexMessage } from '@tower/search'; +import { IndexJobData } from '@tower/types'; + +jest.mock('@tower/search', () => ({ + indexMessage: jest.fn().mockResolvedValue(undefined), + MESSAGES_INDEX: 'tower-messages', +})); + +function makeJob(overrides: Partial = {}): IndexJobData { + return { + messageId: 'msg-1', + content: 'hello world', + senderName: 'Alice', + sourceGroupId: 'grp-1', + sourceGroupName: 'UP Parivar', + tags: ['#important'], + platform: 'whatsapp', + approvedAt: '2026-05-27T10:00:00.000Z', + ...overrides, + }; +} + +describe('processIndexJob', () => { + beforeEach(() => jest.clearAllMocks()); + + it('calls indexMessage with MeiliDocument shape', async () => { + const mockClient = {} as any; + await processIndexJob(makeJob(), mockClient); + expect(indexMessage).toHaveBeenCalledWith(mockClient, { + id: 'msg-1', + content: 'hello world', + senderName: 'Alice', + sourceGroupId: 'grp-1', + sourceGroupName: 'UP Parivar', + tags: ['#important'], + platform: 'whatsapp', + approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(), + }); + }); + + it('converts null senderName to empty string', async () => { + const mockClient = {} as any; + await processIndexJob(makeJob({ senderName: null }), mockClient); + expect(indexMessage).toHaveBeenCalledWith( + mockClient, + expect.objectContaining({ senderName: '' }), + ); + }); + + it('converts approvedAt ISO string to Unix ms number', async () => { + const mockClient = {} as any; + await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient); + expect(indexMessage).toHaveBeenCalledWith( + mockClient, + expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }), + ); + }); +}); +``` + +- [ ] **Step 3: Run test — expect failure** + +```bash +pnpm --filter @tower/worker test -- --testPathPattern=index.processor +``` + +Expected: `FAIL` — `Cannot find module './index.processor'`. + +- [ ] **Step 4: Create `apps/worker/src/queues/index.queue.ts`** + +```typescript +import { Queue } from 'bullmq'; +import { IndexJobData } from '@tower/types'; +import { parseRedisUrl } from './redis-connection'; + +export function createIndexQueue(redisUrl: string): Queue { + return new Queue('tower-index', { connection: parseRedisUrl(redisUrl) }); +} +``` + +- [ ] **Step 5: Create `apps/worker/src/queues/index.processor.ts`** + +```typescript +import { Worker } from 'bullmq'; +import { IndexJobData } from '@tower/types'; +import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search'; +import { parseRedisUrl } from './redis-connection'; + +export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise { + const doc: MeiliDocument = { + id: job.messageId, + content: job.content, + senderName: job.senderName ?? '', + sourceGroupId: job.sourceGroupId, + sourceGroupName: job.sourceGroupName, + tags: job.tags, + platform: job.platform, + approvedAt: new Date(job.approvedAt).getTime(), + }; + await indexMessage(meiliClient, doc); +} + +export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker { + return new Worker( + 'tower-index', + async (job) => processIndexJob(job.data, meiliClient), + { connection: parseRedisUrl(redisUrl) }, + ); +} +``` + +- [ ] **Step 6: Run tests — all pass** + +```bash +pnpm --filter @tower/worker test -- --testPathPattern=index.processor +``` + +Expected: 3 tests pass. + +- [ ] **Step 7: Update `apps/worker/src/main.ts`** + +Replace the entire file with: + +```typescript +import { PrismaClient } from '@prisma/client'; +import { createLogger } from '@tower/logger'; +import { validateEnv } from '@tower/config'; +import { createMeiliClient, configureIndex } from '@tower/search'; +import { createIngestQueue } from './queues/ingest.queue'; +import { createIngestWorker } from './queues/ingest.processor'; +import { createForwardQueue } from './queues/forward.queue'; +import { createForwardWorker } from './queues/forward.processor'; +import { createIndexQueue } from './queues/index.queue'; +import { createIndexWorker } from './queues/index.processor'; +import { WhatsAppSessionPool } from './whatsapp/session-pool'; +import { detectTags, isFlagged } from './whatsapp/tag-detector'; +import { syncGroups } from './whatsapp/group-sync'; +import { handleStarReaction } from './core/approval'; + +const logger = createLogger('tower-worker'); + +async function bootstrap() { + const env = validateEnv(); + const prisma = new PrismaClient(); + await prisma.$connect(); + + const adminJids = env.TOWER_ADMIN_JIDS + ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) + : []; + + const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY); + await configureIndex(meiliClient).catch((err) => + logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'), + ); + + const ingestQueue = createIngestQueue(env.REDIS_URL); + const forwardQueue = createForwardQueue(env.REDIS_URL); + const indexQueue = createIndexQueue(env.REDIS_URL); + const pool = new WhatsAppSessionPool(); + + const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); + const forwardWorker = createForwardWorker(env.REDIS_URL, pool); + const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient); + + ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); + ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); + forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); + forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); + indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed')); + indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed')); + + const accounts = await prisma.account.findMany({ + where: { status: 'ACTIVE', platform: 'whatsapp' }, + }); + + if (accounts.length === 0) { + logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); + } + + const groupMaps = new Map>(); + + for (const account of accounts) { + groupMaps.set(account.id, new Map()); + + try { + await pool.add( + account.id, + account.sessionPath, + async (msg, accountId) => { + const tags = detectTags(msg.content, msg.senderJid, adminJids); + if (!isFlagged(tags)) return; + + const groupMap = groupMaps.get(accountId); + if (!groupMap) { + logger.error({ accountId }, 'No group map for account — message dropped'); + return; + } + const sourceGroupId = groupMap.get(msg.sourceGroupJid); + if (!sourceGroupId) { + logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); + return; + } + + await ingestQueue.add( + 'ingest', + { + platformMsgId: msg.platformMsgId, + platform: 'whatsapp', + accountId, + sourceGroupId, + senderJid: msg.senderJid, + senderName: msg.senderName, + content: msg.content, + tags, + }, + { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, + ); + + logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); + }, + async (reaction) => { + const result = await handleStarReaction(reaction, adminJids, prisma); + if (!result) return; + + const { forwardJobs, indexDoc } = result; + + await indexQueue.add('index', indexDoc, { + attempts: 3, + backoff: { type: 'exponential', delay: 1000 }, + }); + + for (const job of forwardJobs) { + await forwardQueue.add('forward', job, { + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + }); + } + + logger.info( + { messageId: indexDoc.messageId, forwardCount: forwardJobs.length }, + 'Message approved — indexed and forwarded', + ); + }, + async (groups, accountId) => { + logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); + const map = await syncGroups(groups, accountId, prisma); + groupMaps.set(accountId, map); + }, + ); + } catch (err) { + logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account'); + } + } + + logger.info({ accountCount: accounts.length }, 'Tower worker ready'); + + const shutdown = async () => { + logger.info('Shutting down...'); + await pool.closeAll(); + await ingestWorker.close(); + await forwardWorker.close(); + await indexWorker.close(); + await ingestQueue.close(); + await forwardQueue.close(); + await indexQueue.close(); + await prisma.$disconnect(); + process.exit(0); + }; + + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); +} + +bootstrap().catch((err) => { + console.error('Worker failed to start', err); + process.exit(1); +}); +``` + +- [ ] **Step 8: Run all worker tests and build** + +```bash +pnpm --filter @tower/worker test +pnpm --filter @tower/worker build +``` + +Expected: All tests pass (the `approval.test.ts` changes in Task 3 must already be committed), zero build errors. + +- [ ] **Step 9: Commit** + +```bash +git add apps/worker/src/queues/index.queue.ts \ + apps/worker/src/queues/index.processor.ts \ + apps/worker/src/queues/index.processor.test.ts \ + apps/worker/src/main.ts \ + apps/worker/package.json \ + pnpm-lock.yaml +git commit -m "feat(worker): add index queue and wire Meilisearch indexing after approval" +``` + +--- + +### Task 5: API — SearchModule + +**Files:** +- Create: `apps/api/src/modules/search/search.module.ts` +- Create: `apps/api/src/modules/search/search.service.ts` +- Create: `apps/api/src/modules/search/search.controller.ts` +- Create: `apps/api/src/modules/search/search.controller.spec.ts` +- Create: `apps/api/src/modules/search/search.service.spec.ts` +- Modify: `apps/api/package.json` — add `@tower/search: workspace:*` +- Modify: `apps/api/src/app.module.ts` — register `SearchModule` + +- [ ] **Step 1: Add `@tower/search` to API dependencies** + +Edit `apps/api/package.json` — add to `"dependencies"`: + +```json +"@tower/search": "workspace:*" +``` + +Then: + +```bash +pnpm install +``` + +- [ ] **Step 2: Write failing controller spec** + +Create `apps/api/src/modules/search/search.controller.spec.ts`: + +```typescript +import { Test, TestingModule } from '@nestjs/testing'; +import { SearchController } from './search.controller'; +import { SearchService } from './search.service'; + +const mockSearchService = { search: jest.fn() }; + +describe('SearchController', () => { + let controller: SearchController; + + beforeEach(async () => { + jest.clearAllMocks(); + const module: TestingModule = await Test.createTestingModule({ + controllers: [SearchController], + providers: [{ provide: SearchService, useValue: mockSearchService }], + }).compile(); + controller = module.get(SearchController); + }); + + it('calls service with all parsed params', async () => { + mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 2, limit: 10, query: 'hello' }); + await controller.search('hello', 'grp-1', 'important,event', '2', '10'); + expect(mockSearchService.search).toHaveBeenCalledWith('hello', 'grp-1', ['important', 'event'], 2, 10); + }); + + it('defaults page to 1 and limit to 20 when not provided', async () => { + mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' }); + await controller.search(''); + expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, undefined, 1, 20); + }); + + it('returns the service result directly', async () => { + const expected = { hits: [{ id: 'msg-1' }], total: 1, page: 1, limit: 20, query: 'test' }; + mockSearchService.search.mockResolvedValue(expected); + const result = await controller.search('test'); + expect(result).toEqual(expected); + }); + + it('splits tags on comma and trims whitespace', async () => { + mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' }); + await controller.search('', undefined, ' important , event '); + expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, ['important', 'event'], 1, 20); + }); +}); +``` + +- [ ] **Step 3: Write failing service spec** + +Create `apps/api/src/modules/search/search.service.spec.ts`: + +```typescript +import { Test, TestingModule } from '@nestjs/testing'; +import { ConfigService } from '@nestjs/config'; +import { SearchService } from './search.service'; +import * as searchPkg from '@tower/search'; + +jest.mock('@tower/search', () => ({ + createMeiliClient: jest.fn(), + configureIndex: jest.fn().mockResolvedValue(undefined), + MESSAGES_INDEX: 'tower-messages', +})); + +describe('SearchService', () => { + let service: SearchService; + const mockSearch = jest.fn(); + const mockIndex = jest.fn().mockReturnValue({ search: mockSearch }); + const mockClient = { index: mockIndex }; + + beforeEach(async () => { + jest.clearAllMocks(); + (searchPkg.createMeiliClient as jest.Mock).mockReturnValue(mockClient); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + SearchService, + { provide: ConfigService, useValue: { get: jest.fn().mockReturnValue('') } }, + ], + }).compile(); + + service = module.get(SearchService); + await service.onModuleInit(); + }); + + it('calls configureIndex on init', () => { + expect(searchPkg.configureIndex).toHaveBeenCalledWith(mockClient); + }); + + it('returns hits and total', async () => { + mockSearch.mockResolvedValue({ hits: [{ id: 'msg-1', content: 'hello' }], totalHits: 1 }); + const result = await service.search('hello'); + expect(result.hits).toHaveLength(1); + expect(result.total).toBe(1); + expect(result.query).toBe('hello'); + }); + + it('searches with no filter when no groupId or tags', async () => { + mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); + await service.search('test'); + expect(mockSearch).toHaveBeenCalledWith('test', expect.objectContaining({ filter: undefined })); + }); + + it('applies sourceGroupId filter', async () => { + mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); + await service.search('hello', 'grp-1'); + expect(mockSearch).toHaveBeenCalledWith( + 'hello', + expect.objectContaining({ filter: 'sourceGroupId = "grp-1"' }), + ); + }); + + it('applies tags filter', async () => { + mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); + await service.search('hello', undefined, ['#important']); + expect(mockSearch).toHaveBeenCalledWith( + 'hello', + expect.objectContaining({ filter: 'tags = "#important"' }), + ); + }); + + it('combines groupId and tags filters with AND', async () => { + mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); + await service.search('hello', 'grp-1', ['#important', '#event']); + expect(mockSearch).toHaveBeenCalledWith( + 'hello', + expect.objectContaining({ + filter: 'sourceGroupId = "grp-1" AND tags = "#important" AND tags = "#event"', + }), + ); + }); + + it('defaults page to 1 and hitsPerPage to 20', async () => { + mockSearch.mockResolvedValue({ hits: [], totalHits: 0 }); + await service.search('hello'); + expect(mockSearch).toHaveBeenCalledWith( + 'hello', + expect.objectContaining({ page: 1, hitsPerPage: 20 }), + ); + }); +}); +``` + +- [ ] **Step 4: Run specs — expect failure** + +```bash +pnpm --filter @tower/api test +``` + +Expected: `FAIL` — modules not found. + +- [ ] **Step 5: Create `apps/api/src/modules/search/search.service.ts`** + +```typescript +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { + MeiliSearch, + MeiliDocument, + MESSAGES_INDEX, + createMeiliClient, + configureIndex, +} from '@tower/search'; + +@Injectable() +export class SearchService implements OnModuleInit { + private readonly client: MeiliSearch; + + constructor(private readonly config: ConfigService) { + this.client = createMeiliClient( + this.config.get('MEILI_URL', 'http://localhost:7700'), + this.config.get('MEILI_MASTER_KEY', ''), + ); + } + + async onModuleInit(): Promise { + await configureIndex(this.client).catch((err) => + console.warn('Failed to configure Meilisearch index:', err), + ); + } + + async search( + query: string, + groupId?: string, + tags?: string[], + page = 1, + limit = 20, + ): Promise<{ hits: MeiliDocument[]; total: number; page: number; limit: number; query: string }> { + const filters: string[] = []; + if (groupId) filters.push(`sourceGroupId = "${groupId}"`); + if (tags?.length) filters.push(...tags.map((t) => `tags = "${t}"`)); + + const result = await this.client.index(MESSAGES_INDEX).search(query, { + filter: filters.length ? filters.join(' AND ') : undefined, + page, + hitsPerPage: limit, + sort: ['approvedAt:desc'], + }); + + return { + hits: result.hits as MeiliDocument[], + total: result.totalHits ?? 0, + page, + limit, + query, + }; + } +} +``` + +- [ ] **Step 6: Create `apps/api/src/modules/search/search.controller.ts`** + +```typescript +import { Controller, Get, Query } from '@nestjs/common'; +import { SearchService } from './search.service'; + +@Controller('search') +export class SearchController { + constructor(private readonly searchService: SearchService) {} + + @Get() + search( + @Query('q') q = '', + @Query('groupId') groupId?: string, + @Query('tags') tags?: string, + @Query('page') page = '1', + @Query('limit') limit = '20', + ) { + const tagList = tags + ? tags.split(',').map((t) => t.trim()).filter(Boolean) + : undefined; + return this.searchService.search(q, groupId, tagList, Number(page), Number(limit)); + } +} +``` + +- [ ] **Step 7: Create `apps/api/src/modules/search/search.module.ts`** + +```typescript +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { SearchController } from './search.controller'; +import { SearchService } from './search.service'; + +@Module({ + imports: [ConfigModule], + controllers: [SearchController], + providers: [SearchService], +}) +export class SearchModule {} +``` + +- [ ] **Step 8: Register SearchModule in `apps/api/src/app.module.ts`** + +```typescript +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { PrismaModule } from './prisma/prisma.module'; +import { HealthModule } from './modules/health/health.module'; +import { SearchModule } from './modules/search/search.module'; + +@Module({ + imports: [ + ConfigModule.forRoot({ isGlobal: true }), + PrismaModule, + HealthModule, + SearchModule, + ], +}) +export class AppModule {} +``` + +- [ ] **Step 9: Run all API tests and build** + +```bash +pnpm --filter @tower/api test +pnpm --filter @tower/api build +``` + +Expected: All specs pass (health controller + search controller + search service), zero build errors. + +- [ ] **Step 10: Commit** + +```bash +git add apps/api/src/modules/search/ \ + apps/api/src/app.module.ts \ + apps/api/package.json \ + pnpm-lock.yaml +git commit -m "feat(api): add SearchModule with GET /search endpoint backed by Meilisearch" +``` + +--- + +## Self-Review + +**Spec coverage:** +- ✅ Approved messages indexed in Meilisearch — Task 4 (index queue + processor) +- ✅ Index stays in sync with approvals — Task 3 wires indexDoc into approval result, Task 4 enqueues it +- ✅ Full-text search endpoint — Task 5 `GET /search` +- ✅ Filterable by group and tags — `SearchService.search` builds Meilisearch filter string +- ✅ Paginated results — `page` + `limit` query params, `hitsPerPage` in Meilisearch query +- ✅ Index configured with correct attributes — `configureIndex` called on both worker startup and API `onModuleInit` +- ✅ Messages indexed even with no sync routes — `main.ts` always enqueues index job regardless of `forwardJobs.length` + +**Placeholder scan:** None found. All code steps contain complete implementations. + +**Type consistency:** +- `IndexJobData.approvedAt` is `string` (ISO) throughout types and approval.ts +- `MeiliDocument.approvedAt` is `number` (Unix ms) — conversion happens in `processIndexJob` +- `handleStarReaction` returns `ApprovalResult | null` consistently across approval.ts and approval.test.ts +- `result.forwardJobs` and `result.indexDoc` used consistently in tests and main.ts diff --git a/packages/search/src/index.test.ts b/packages/search/src/index.test.ts index 7b2546d..f9d41c5 100644 --- a/packages/search/src/index.test.ts +++ b/packages/search/src/index.test.ts @@ -4,6 +4,7 @@ import { indexMessage, deleteMessage, MeiliDocument, + createMeiliClient, } from './index'; function makeMockClient() { @@ -15,7 +16,7 @@ function makeMockClient() { addDocuments: mockAddDocuments, deleteDocument: mockDeleteDocument, }); - return { client: { index: mockIndex } as any, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument }; + return { client: { index: mockIndex } as unknown as ReturnType, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument }; } describe('MESSAGES_INDEX', () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9e90185..8437459 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -80,7 +80,7 @@ importers: version: 6.19.3(typescript@5.9.3) ts-jest: specifier: ^29.0.0 - version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19))(typescript@5.9.3) + version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)))(typescript@5.9.3) typescript: specifier: ^5.7.0 version: 5.9.3 @@ -135,7 +135,7 @@ importers: version: 4.3.0 ts-jest: specifier: ^29.0.0 - version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19))(typescript@5.9.3) + version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)))(typescript@5.9.3) typescript: specifier: ^5.7.0 version: 5.9.3 @@ -190,7 +190,7 @@ importers: version: 6.19.3(typescript@5.9.3) ts-jest: specifier: ^29.0.0 - version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19))(typescript@5.9.3) + version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)))(typescript@5.9.3) ts-node: specifier: ^10.9.0 version: 10.9.2(@types/node@22.19.19)(typescript@5.9.3) @@ -212,7 +212,7 @@ importers: version: 29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)) ts-jest: specifier: ^29.0.0 - version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19))(typescript@5.9.3) + version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)))(typescript@5.9.3) typescript: specifier: ^5.7.0 version: 5.9.3 @@ -239,6 +239,28 @@ importers: specifier: ^5.7.0 version: 5.9.3 + packages/search: + dependencies: + meilisearch: + specifier: ^0.44.0 + version: 0.44.1 + devDependencies: + '@types/jest': + specifier: ^29.0.0 + version: 29.5.14 + '@types/node': + specifier: ^22.0.0 + version: 22.19.19 + jest: + specifier: ^29.0.0 + version: 29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)) + ts-jest: + specifier: ^29.0.0 + version: 29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)))(typescript@5.9.3) + typescript: + specifier: ^5.7.0 + version: 5.9.3 + packages/types: devDependencies: typescript: @@ -2759,6 +2781,9 @@ packages: resolution: {integrity: sha512-aisnrDP4GNe06UcKFnV5bfMNPBUw4jsLGaWwWfnH3v02GnBuXX2MCVn5RbrWo0j3pczUilYblq7fQ7Nw2t5XKw==} engines: {node: '>= 0.8'} + meilisearch@0.44.1: + resolution: {integrity: sha512-ZTZYBmomtRwjaWbvU8U8ct04g/YnrNOlvchogJOPgHcQIQBfjdbAvMJ8mLhuZEzpioYXIT6Cv+FcE150pc2+nw==} + memfs@3.5.3: resolution: {integrity: sha512-UERzLsxzllchadvbPs5aolHh65ISpKpM+ccLbOJ8/vvpBKmAWf+la7dXFy7Mr0ySHbdHrFv5kGFCUHHe6GFEmw==} engines: {node: '>= 4.0.0'} @@ -6588,6 +6613,8 @@ snapshots: media-typer@1.1.0: {} + meilisearch@0.44.1: {} + memfs@3.5.3: dependencies: fs-monkey: 1.1.0 @@ -7354,7 +7381,7 @@ snapshots: dependencies: punycode: 2.3.1 - ts-jest@29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19))(typescript@5.9.3): + ts-jest@29.4.11(@babel/core@7.29.7)(@jest/transform@29.7.0)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.29.7))(jest-util@29.7.0)(jest@29.7.0(@types/node@22.19.19)(ts-node@10.9.2(@types/node@22.19.19)(typescript@5.9.3)))(typescript@5.9.3): dependencies: bs-logger: 0.2.6 fast-json-stable-stringify: 2.1.0