Files
tower/docs/superpowers/plans/2026-05-27-archive-and-search.md
maaz519 1d6e1fb4da fix(search): tighten mock type cast for better type safety
Replace `as any` cast with `as unknown as ReturnType<typeof createMeiliClient>`
in the mock client factory. This preserves type safety without requiring the mock
to implement the full SDK interface.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 23:53:32 +05:30

1220 lines
39 KiB
Markdown

# Archive & Search Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Index every ⭐-approved message into Meilisearch and expose a paginated full-text search endpoint on the API.
**Architecture:** A shared `@tower/search` package owns the Meilisearch client factory, index name constant, settings configuration, and document type — consumed by both the worker (indexing) and the API (querying). `handleStarReaction` is extended to return an `ApprovalResult` containing both forward jobs and an `IndexJobData` document. The worker enqueues that document into a BullMQ `tower-index` queue immediately after approval; a processor converts it to a `MeiliDocument` (Unix ms timestamp) and calls `indexMessage`. The NestJS API gets a `SearchModule` with a `GET /search` endpoint that filters by `sourceGroupId` and `tags`.
**Tech Stack:** Meilisearch v1.11 (already in docker-compose), `meilisearch` npm SDK ^0.44.0, BullMQ 5, NestJS 11, `@nestjs/config`, TypeScript 5, Jest 29
---
## File Map
**Create:**
- `packages/search/package.json``@tower/search` package manifest
- `packages/search/tsconfig.json` — extends base tsconfig
- `packages/search/jest.config.js` — ts-jest preset
- `packages/search/src/index.ts``MeiliDocument`, `MESSAGES_INDEX`, `createMeiliClient`, `configureIndex`, `indexMessage`, `deleteMessage`, re-export `MeiliSearch` type
- `packages/search/src/index.test.ts` — unit tests for all exports
- `apps/worker/src/queues/index.queue.ts` — BullMQ Queue for `IndexJobData`
- `apps/worker/src/queues/index.processor.ts` — Worker that calls `indexMessage`
- `apps/worker/src/queues/index.processor.test.ts` — unit tests
- `apps/api/src/modules/search/search.module.ts`
- `apps/api/src/modules/search/search.service.ts` — wraps Meilisearch, `onModuleInit` calls `configureIndex`
- `apps/api/src/modules/search/search.controller.ts``GET /search`
- `apps/api/src/modules/search/search.controller.spec.ts`
- `apps/api/src/modules/search/search.service.spec.ts`
**Modify:**
- `packages/types/src/message.ts` — add `IndexJobData` interface
- `apps/worker/src/core/approval.ts` — new `ApprovalResult` return type; build `indexDoc`
- `apps/worker/src/core/approval.test.ts` — update assertions to `result.forwardJobs` / `result.indexDoc`; add missing mock fields
- `apps/worker/src/main.ts` — create meiliClient + indexQueue + indexWorker; call `configureIndex`; enqueue index jobs after approval
- `apps/worker/package.json` — add `@tower/search: workspace:*`
- `apps/api/package.json` — add `@tower/search: workspace:*`
- `apps/api/src/app.module.ts` — import `SearchModule`
---
### Task 1: Add `IndexJobData` to `@tower/types`
**Files:**
- Modify: `packages/types/src/message.ts`
- [ ] **Step 1: Write the failing test**
Add to end of `packages/types/src/message.ts` — first verify the type compiles by adding a test file. Actually `@tower/types` has no test runner; type correctness is verified at build time. Skip to Step 2.
- [ ] **Step 2: Add `IndexJobData` to `packages/types/src/message.ts`**
Append after the `ForwardJobData` interface (keep every existing line intact):
```typescript
export interface IndexJobData {
messageId: string; // DB Message.id (cuid)
content: string;
senderName: string | null;
sourceGroupId: string; // DB Group.id
sourceGroupName: string;
tags: string[];
platform: string;
approvedAt: string; // ISO 8601 — converted to Unix ms in the processor
}
```
- [ ] **Step 3: Build the package to verify no compile errors**
```bash
cd /path/to/tower
pnpm --filter @tower/types build
```
Expected: `dist/` updated, zero errors.
- [ ] **Step 4: Commit**
```bash
git add packages/types/src/message.ts
git commit -m "feat(types): add IndexJobData for Meilisearch indexing"
```
---
### Task 2: Create `packages/search` package
**Files:**
- Create: `packages/search/package.json`
- Create: `packages/search/tsconfig.json`
- Create: `packages/search/jest.config.js`
- Create: `packages/search/src/index.ts`
- Create: `packages/search/src/index.test.ts`
- [ ] **Step 1: Write the failing tests**
Create `packages/search/src/index.test.ts`:
```typescript
import {
MESSAGES_INDEX,
configureIndex,
indexMessage,
deleteMessage,
MeiliDocument,
} from './index';
function makeMockClient() {
const mockUpdateSettings = jest.fn().mockResolvedValue({});
const mockAddDocuments = jest.fn().mockResolvedValue({});
const mockDeleteDocument = jest.fn().mockResolvedValue({});
const mockIndex = jest.fn().mockReturnValue({
updateSettings: mockUpdateSettings,
addDocuments: mockAddDocuments,
deleteDocument: mockDeleteDocument,
});
return { client: { index: mockIndex } as any, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument };
}
describe('MESSAGES_INDEX', () => {
it('is the string tower-messages', () => {
expect(MESSAGES_INDEX).toBe('tower-messages');
});
});
describe('configureIndex', () => {
it('sets searchable, filterable, and sortable attributes on the correct index', async () => {
const { client, mockIndex, mockUpdateSettings } = makeMockClient();
await configureIndex(client);
expect(mockIndex).toHaveBeenCalledWith('tower-messages');
expect(mockUpdateSettings).toHaveBeenCalledWith({
searchableAttributes: ['content', 'senderName', 'sourceGroupName'],
filterableAttributes: ['sourceGroupId', 'tags', 'platform'],
sortableAttributes: ['approvedAt'],
});
});
});
describe('indexMessage', () => {
it('adds the document to the messages index', async () => {
const { client, mockIndex, mockAddDocuments } = makeMockClient();
const doc: MeiliDocument = {
id: 'msg-1',
content: 'Hello community',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: 1716825600000,
};
await indexMessage(client, doc);
expect(mockIndex).toHaveBeenCalledWith('tower-messages');
expect(mockAddDocuments).toHaveBeenCalledWith([doc]);
});
});
describe('deleteMessage', () => {
it('deletes document by id from the messages index', async () => {
const { client, mockIndex, mockDeleteDocument } = makeMockClient();
await deleteMessage(client, 'msg-1');
expect(mockIndex).toHaveBeenCalledWith('tower-messages');
expect(mockDeleteDocument).toHaveBeenCalledWith('msg-1');
});
});
```
- [ ] **Step 2: Create `packages/search/src/index.ts`**
```typescript
import { MeiliSearch } from 'meilisearch';
export { MeiliSearch } from 'meilisearch';
export interface MeiliDocument {
id: string; // DB Message.id
content: string;
senderName: string; // empty string when null
sourceGroupId: string;
sourceGroupName: string;
tags: string[];
platform: string;
approvedAt: number; // Unix ms — Meilisearch sorts numbers, not ISO strings
}
export const MESSAGES_INDEX = 'tower-messages';
export function createMeiliClient(url: string, masterKey: string): MeiliSearch {
return new MeiliSearch({ host: url, apiKey: masterKey });
}
export async function configureIndex(client: MeiliSearch): Promise<void> {
await client.index(MESSAGES_INDEX).updateSettings({
searchableAttributes: ['content', 'senderName', 'sourceGroupName'],
filterableAttributes: ['sourceGroupId', 'tags', 'platform'],
sortableAttributes: ['approvedAt'],
});
}
export async function indexMessage(client: MeiliSearch, doc: MeiliDocument): Promise<void> {
await client.index(MESSAGES_INDEX).addDocuments([doc]);
}
export async function deleteMessage(client: MeiliSearch, id: string): Promise<void> {
await client.index(MESSAGES_INDEX).deleteDocument(id);
}
```
- [ ] **Step 3: Create `packages/search/package.json`**
```json
{
"name": "@tower/search",
"version": "0.0.1",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"test": "jest"
},
"dependencies": {
"meilisearch": "^0.44.0"
},
"devDependencies": {
"@types/jest": "^29.0.0",
"@types/node": "^22.0.0",
"jest": "^29.0.0",
"ts-jest": "^29.0.0",
"typescript": "^5.7.0"
}
}
```
- [ ] **Step 4: Create `packages/search/tsconfig.json`**
```json
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src"]
}
```
- [ ] **Step 5: Create `packages/search/jest.config.js`**
```js
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/*.test.ts'],
rootDir: 'src',
};
```
- [ ] **Step 6: Install dependencies and run tests**
```bash
cd /path/to/tower
pnpm install
pnpm --filter @tower/search test
```
Expected: 4 tests pass (MESSAGES_INDEX, configureIndex, indexMessage, deleteMessage).
- [ ] **Step 7: Build**
```bash
pnpm --filter @tower/search build
```
Expected: `packages/search/dist/` created, zero errors.
- [ ] **Step 8: Commit**
```bash
git add packages/search/
git commit -m "feat(search): add @tower/search package with Meilisearch client and helpers"
```
---
### Task 3: Update `handleStarReaction` to return `ApprovalResult`
**Files:**
- Modify: `apps/worker/src/core/approval.ts`
- Modify: `apps/worker/src/core/approval.test.ts`
The current return type is `ForwardJobData[] | null`. Change it to `ApprovalResult | null` where `ApprovalResult` includes both the forward jobs and the data needed for Meilisearch indexing. This lets `main.ts` index the message without an extra DB query.
- [ ] **Step 1: Update `approval.test.ts` first (TDD — tests fail before change)**
Replace `apps/worker/src/core/approval.test.ts` entirely:
```typescript
import { handleStarReaction } from './approval';
import { NormalizedReaction } from '@tower/types';
function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
return {
reactorJid: '919876543210@s.whatsapp.net',
targetMsgId: 'TARGET_MSG_123',
sourceGroupJid: '120363043312345678@g.us',
emoji: '⭐',
accountId: 'acc_1',
...overrides,
};
}
const adminJids = ['919876543210@s.whatsapp.net'];
// Minimal valid message returned by prisma.message.findUnique
function makeMessage(overrides: object = {}) {
return {
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp_1',
tags: ['#important'],
platform: 'whatsapp',
sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
...overrides,
};
}
function makePrisma(messageOverrides: object = {}, txCount = 1) {
return {
message: { findUnique: jest.fn().mockResolvedValue(makeMessage(messageOverrides)) },
$transaction: jest.fn().mockImplementation(async (fn: any) =>
fn({
message: { updateMany: jest.fn().mockResolvedValue({ count: txCount }) },
approval: { create: jest.fn().mockResolvedValue({}) },
}),
),
} as any;
}
describe('handleStarReaction', () => {
it('returns null for non-star emoji', async () => {
expect(await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any)).toBeNull();
});
it('returns null when reactor is not an admin', async () => {
expect(
await handleStarReaction(makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any),
).toBeNull();
});
it('returns null when message not found', async () => {
const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
expect(prisma.message.findUnique).toHaveBeenCalledWith({
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' } },
include: {
approval: true,
sourceGroup: {
include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } } },
},
},
});
});
it('returns null when message status is not PENDING', async () => {
const prisma = {
message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'REJECTED' })) },
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('returns null when approval record already exists', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'APPROVED', approval: { id: 'appr_1' } })),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('returns null on double-approval race (updateMany count=0)', async () => {
const result = await handleStarReaction(makeReaction(), adminJids, makePrisma({}, 0));
expect(result).toBeNull();
});
it('returns ApprovalResult with empty forwardJobs and valid indexDoc when no sync routes', async () => {
const result = await handleStarReaction(makeReaction(), adminJids, makePrisma());
expect(result).not.toBeNull();
expect(result!.forwardJobs).toEqual([]);
expect(result!.indexDoc).toMatchObject({
messageId: 'msg_1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp_1',
sourceGroupName: 'UP Parivar Dallas',
tags: ['#important'],
platform: 'whatsapp',
});
expect(result!.indexDoc.approvedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
});
it('returns ForwardJobData for each active sync route', async () => {
const prisma = makePrisma({
content: 'important announcement',
senderName: 'Bob',
sourceGroup: {
name: 'Source Group',
syncRoutesFrom: [
{ targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
{ targetGroup: { platformId: '888@g.us', accountId: null } },
],
},
});
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result!.forwardJobs).toHaveLength(2);
expect(result!.forwardJobs[0]).toMatchObject({
messageId: 'msg_1',
content: 'important announcement',
sourceGroupName: 'Source Group',
senderName: 'Bob',
toGroupJid: '999@g.us',
fromAccountId: 'acc_2',
});
expect(result!.forwardJobs[1]).toMatchObject({
toGroupJid: '888@g.us',
fromAccountId: 'acc_1', // falls back to reaction.accountId
});
});
});
```
- [ ] **Step 2: Run tests — expect 2 failures** (the two tests that check `result.forwardJobs` and `result.indexDoc`)
```bash
pnpm --filter @tower/worker test -- --testPathPattern=approval
```
Expected: `FAIL` on the last two tests.
- [ ] **Step 3: Update `apps/worker/src/core/approval.ts`**
```typescript
import { NormalizedReaction, ForwardJobData, IndexJobData } from '@tower/types';
export interface ApprovalResult {
forwardJobs: ForwardJobData[];
indexDoc: IndexJobData;
}
export async function handleStarReaction(
reaction: NormalizedReaction,
adminJids: string[],
prisma: any,
): Promise<ApprovalResult | null> {
if (reaction.emoji !== '⭐') return null;
if (!adminJids.includes(reaction.reactorJid)) return null;
const message = await prisma.message.findUnique({
where: {
platform_platformMsgId: {
// TODO: derive platform from NormalizedReaction when multi-platform support is added
platform: 'whatsapp',
platformMsgId: reaction.targetMsgId,
},
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
});
if (!message) return null;
if (message.status !== 'PENDING') return null;
if (message.approval) return null;
let approved = false;
await prisma.$transaction(async (tx: any) => {
const updated = await tx.message.updateMany({
where: { id: message.id, status: 'PENDING' },
data: { status: 'APPROVED' },
});
if (updated.count === 0) return;
approved = true;
await tx.approval.create({
data: {
messageId: message.id,
adminId: reaction.reactorJid,
decision: 'APPROVED',
},
});
});
if (!approved) return null;
const forwardJobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom
.filter((route: any) => route.targetGroup != null)
.map((route: any) => ({
messageId: message.id,
content: message.content,
sourceGroupName: message.sourceGroup.name,
senderName: message.senderName ?? undefined,
toGroupJid: route.targetGroup.platformId,
fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
}));
const indexDoc: IndexJobData = {
messageId: message.id,
content: message.content,
senderName: message.senderName ?? null,
sourceGroupId: message.sourceGroupId,
sourceGroupName: message.sourceGroup.name,
tags: message.tags,
platform: message.platform,
approvedAt: new Date().toISOString(),
};
return { forwardJobs, indexDoc };
}
```
- [ ] **Step 4: Run tests — all pass**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=approval
```
Expected: 8 tests pass.
- [ ] **Step 5: Commit**
```bash
git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts
git commit -m "feat(worker): handleStarReaction returns ApprovalResult with indexDoc"
```
---
### Task 4: Worker — index queue, processor, and wire `main.ts`
**Files:**
- Create: `apps/worker/src/queues/index.queue.ts`
- Create: `apps/worker/src/queues/index.processor.ts`
- Create: `apps/worker/src/queues/index.processor.test.ts`
- Modify: `apps/worker/package.json` — add `@tower/search: workspace:*`
- Modify: `apps/worker/src/main.ts` — create meiliClient, configureIndex, indexQueue, indexWorker, update reaction handler, update shutdown
- [ ] **Step 1: Add `@tower/search` to worker dependencies**
Edit `apps/worker/package.json` — add to the `"dependencies"` object:
```json
"@tower/search": "workspace:*"
```
Then install:
```bash
pnpm install
```
- [ ] **Step 2: Write the failing processor test**
Create `apps/worker/src/queues/index.processor.test.ts`:
```typescript
import { processIndexJob } from './index.processor';
import { indexMessage } from '@tower/search';
import { IndexJobData } from '@tower/types';
jest.mock('@tower/search', () => ({
indexMessage: jest.fn().mockResolvedValue(undefined),
MESSAGES_INDEX: 'tower-messages',
}));
function makeJob(overrides: Partial<IndexJobData> = {}): IndexJobData {
return {
messageId: 'msg-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: '2026-05-27T10:00:00.000Z',
...overrides,
};
}
describe('processIndexJob', () => {
beforeEach(() => jest.clearAllMocks());
it('calls indexMessage with MeiliDocument shape', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob(), mockClient);
expect(indexMessage).toHaveBeenCalledWith(mockClient, {
id: 'msg-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(),
});
});
it('converts null senderName to empty string', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob({ senderName: null }), mockClient);
expect(indexMessage).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({ senderName: '' }),
);
});
it('converts approvedAt ISO string to Unix ms number', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient);
expect(indexMessage).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }),
);
});
});
```
- [ ] **Step 3: Run test — expect failure**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=index.processor
```
Expected: `FAIL``Cannot find module './index.processor'`.
- [ ] **Step 4: Create `apps/worker/src/queues/index.queue.ts`**
```typescript
import { Queue } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
export function createIndexQueue(redisUrl: string): Queue<IndexJobData> {
return new Queue<IndexJobData>('tower-index', { connection: parseRedisUrl(redisUrl) });
}
```
- [ ] **Step 5: Create `apps/worker/src/queues/index.processor.ts`**
```typescript
import { Worker } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search';
import { parseRedisUrl } from './redis-connection';
export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise<void> {
const doc: MeiliDocument = {
id: job.messageId,
content: job.content,
senderName: job.senderName ?? '',
sourceGroupId: job.sourceGroupId,
sourceGroupName: job.sourceGroupName,
tags: job.tags,
platform: job.platform,
approvedAt: new Date(job.approvedAt).getTime(),
};
await indexMessage(meiliClient, doc);
}
export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker<IndexJobData> {
return new Worker<IndexJobData>(
'tower-index',
async (job) => processIndexJob(job.data, meiliClient),
{ connection: parseRedisUrl(redisUrl) },
);
}
```
- [ ] **Step 6: Run tests — all pass**
```bash
pnpm --filter @tower/worker test -- --testPathPattern=index.processor
```
Expected: 3 tests pass.
- [ ] **Step 7: Update `apps/worker/src/main.ts`**
Replace the entire file with:
```typescript
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { createMeiliClient, configureIndex } from '@tower/search';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { createIndexQueue } from './queues/index.queue';
import { createIndexWorker } from './queues/index.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';
const logger = createLogger('tower-worker');
async function bootstrap() {
const env = validateEnv();
const prisma = new PrismaClient();
await prisma.$connect();
const adminJids = env.TOWER_ADMIN_JIDS
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: [];
const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY);
await configureIndex(meiliClient).catch((err) =>
logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'),
);
const ingestQueue = createIngestQueue(env.REDIS_URL);
const forwardQueue = createForwardQueue(env.REDIS_URL);
const indexQueue = createIndexQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient);
ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed'));
indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed'));
const accounts = await prisma.account.findMany({
where: { status: 'ACTIVE', platform: 'whatsapp' },
});
if (accounts.length === 0) {
logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)');
}
const groupMaps = new Map<string, Map<string, string>>();
for (const account of accounts) {
groupMaps.set(account.id, new Map());
try {
await pool.add(
account.id,
account.sessionPath,
async (msg, accountId) => {
const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return;
const groupMap = groupMaps.get(accountId);
if (!groupMap) {
logger.error({ accountId }, 'No group map for account — message dropped');
return;
}
const sourceGroupId = groupMap.get(msg.sourceGroupJid);
if (!sourceGroupId) {
logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
return;
}
await ingestQueue.add(
'ingest',
{
platformMsgId: msg.platformMsgId,
platform: 'whatsapp',
accountId,
sourceGroupId,
senderJid: msg.senderJid,
senderName: msg.senderName,
content: msg.content,
tags,
},
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
);
logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
},
async (reaction) => {
const result = await handleStarReaction(reaction, adminJids, prisma);
if (!result) return;
const { forwardJobs, indexDoc } = result;
await indexQueue.add('index', indexDoc, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
for (const job of forwardJobs) {
await forwardQueue.add('forward', job, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
logger.info(
{ messageId: indexDoc.messageId, forwardCount: forwardJobs.length },
'Message approved — indexed and forwarded',
);
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
groupMaps.set(accountId, map);
},
);
} catch (err) {
logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account');
}
}
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
const shutdown = async () => {
logger.info('Shutting down...');
await pool.closeAll();
await ingestWorker.close();
await forwardWorker.close();
await indexWorker.close();
await ingestQueue.close();
await forwardQueue.close();
await indexQueue.close();
await prisma.$disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
bootstrap().catch((err) => {
console.error('Worker failed to start', err);
process.exit(1);
});
```
- [ ] **Step 8: Run all worker tests and build**
```bash
pnpm --filter @tower/worker test
pnpm --filter @tower/worker build
```
Expected: All tests pass (the `approval.test.ts` changes in Task 3 must already be committed), zero build errors.
- [ ] **Step 9: Commit**
```bash
git add apps/worker/src/queues/index.queue.ts \
apps/worker/src/queues/index.processor.ts \
apps/worker/src/queues/index.processor.test.ts \
apps/worker/src/main.ts \
apps/worker/package.json \
pnpm-lock.yaml
git commit -m "feat(worker): add index queue and wire Meilisearch indexing after approval"
```
---
### Task 5: API — SearchModule
**Files:**
- Create: `apps/api/src/modules/search/search.module.ts`
- Create: `apps/api/src/modules/search/search.service.ts`
- Create: `apps/api/src/modules/search/search.controller.ts`
- Create: `apps/api/src/modules/search/search.controller.spec.ts`
- Create: `apps/api/src/modules/search/search.service.spec.ts`
- Modify: `apps/api/package.json` — add `@tower/search: workspace:*`
- Modify: `apps/api/src/app.module.ts` — register `SearchModule`
- [ ] **Step 1: Add `@tower/search` to API dependencies**
Edit `apps/api/package.json` — add to `"dependencies"`:
```json
"@tower/search": "workspace:*"
```
Then:
```bash
pnpm install
```
- [ ] **Step 2: Write failing controller spec**
Create `apps/api/src/modules/search/search.controller.spec.ts`:
```typescript
import { Test, TestingModule } from '@nestjs/testing';
import { SearchController } from './search.controller';
import { SearchService } from './search.service';
const mockSearchService = { search: jest.fn() };
describe('SearchController', () => {
let controller: SearchController;
beforeEach(async () => {
jest.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
controllers: [SearchController],
providers: [{ provide: SearchService, useValue: mockSearchService }],
}).compile();
controller = module.get<SearchController>(SearchController);
});
it('calls service with all parsed params', async () => {
mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 2, limit: 10, query: 'hello' });
await controller.search('hello', 'grp-1', 'important,event', '2', '10');
expect(mockSearchService.search).toHaveBeenCalledWith('hello', 'grp-1', ['important', 'event'], 2, 10);
});
it('defaults page to 1 and limit to 20 when not provided', async () => {
mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' });
await controller.search('');
expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, undefined, 1, 20);
});
it('returns the service result directly', async () => {
const expected = { hits: [{ id: 'msg-1' }], total: 1, page: 1, limit: 20, query: 'test' };
mockSearchService.search.mockResolvedValue(expected);
const result = await controller.search('test');
expect(result).toEqual(expected);
});
it('splits tags on comma and trims whitespace', async () => {
mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' });
await controller.search('', undefined, ' important , event ');
expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, ['important', 'event'], 1, 20);
});
});
```
- [ ] **Step 3: Write failing service spec**
Create `apps/api/src/modules/search/search.service.spec.ts`:
```typescript
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { SearchService } from './search.service';
import * as searchPkg from '@tower/search';
jest.mock('@tower/search', () => ({
createMeiliClient: jest.fn(),
configureIndex: jest.fn().mockResolvedValue(undefined),
MESSAGES_INDEX: 'tower-messages',
}));
describe('SearchService', () => {
let service: SearchService;
const mockSearch = jest.fn();
const mockIndex = jest.fn().mockReturnValue({ search: mockSearch });
const mockClient = { index: mockIndex };
beforeEach(async () => {
jest.clearAllMocks();
(searchPkg.createMeiliClient as jest.Mock).mockReturnValue(mockClient);
const module: TestingModule = await Test.createTestingModule({
providers: [
SearchService,
{ provide: ConfigService, useValue: { get: jest.fn().mockReturnValue('') } },
],
}).compile();
service = module.get<SearchService>(SearchService);
await service.onModuleInit();
});
it('calls configureIndex on init', () => {
expect(searchPkg.configureIndex).toHaveBeenCalledWith(mockClient);
});
it('returns hits and total', async () => {
mockSearch.mockResolvedValue({ hits: [{ id: 'msg-1', content: 'hello' }], totalHits: 1 });
const result = await service.search('hello');
expect(result.hits).toHaveLength(1);
expect(result.total).toBe(1);
expect(result.query).toBe('hello');
});
it('searches with no filter when no groupId or tags', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('test');
expect(mockSearch).toHaveBeenCalledWith('test', expect.objectContaining({ filter: undefined }));
});
it('applies sourceGroupId filter', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello', 'grp-1');
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({ filter: 'sourceGroupId = "grp-1"' }),
);
});
it('applies tags filter', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello', undefined, ['#important']);
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({ filter: 'tags = "#important"' }),
);
});
it('combines groupId and tags filters with AND', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello', 'grp-1', ['#important', '#event']);
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({
filter: 'sourceGroupId = "grp-1" AND tags = "#important" AND tags = "#event"',
}),
);
});
it('defaults page to 1 and hitsPerPage to 20', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello');
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({ page: 1, hitsPerPage: 20 }),
);
});
});
```
- [ ] **Step 4: Run specs — expect failure**
```bash
pnpm --filter @tower/api test
```
Expected: `FAIL` — modules not found.
- [ ] **Step 5: Create `apps/api/src/modules/search/search.service.ts`**
```typescript
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import {
MeiliSearch,
MeiliDocument,
MESSAGES_INDEX,
createMeiliClient,
configureIndex,
} from '@tower/search';
@Injectable()
export class SearchService implements OnModuleInit {
private readonly client: MeiliSearch;
constructor(private readonly config: ConfigService) {
this.client = createMeiliClient(
this.config.get('MEILI_URL', 'http://localhost:7700'),
this.config.get('MEILI_MASTER_KEY', ''),
);
}
async onModuleInit(): Promise<void> {
await configureIndex(this.client).catch((err) =>
console.warn('Failed to configure Meilisearch index:', err),
);
}
async search(
query: string,
groupId?: string,
tags?: string[],
page = 1,
limit = 20,
): Promise<{ hits: MeiliDocument[]; total: number; page: number; limit: number; query: string }> {
const filters: string[] = [];
if (groupId) filters.push(`sourceGroupId = "${groupId}"`);
if (tags?.length) filters.push(...tags.map((t) => `tags = "${t}"`));
const result = await this.client.index(MESSAGES_INDEX).search(query, {
filter: filters.length ? filters.join(' AND ') : undefined,
page,
hitsPerPage: limit,
sort: ['approvedAt:desc'],
});
return {
hits: result.hits as MeiliDocument[],
total: result.totalHits ?? 0,
page,
limit,
query,
};
}
}
```
- [ ] **Step 6: Create `apps/api/src/modules/search/search.controller.ts`**
```typescript
import { Controller, Get, Query } from '@nestjs/common';
import { SearchService } from './search.service';
@Controller('search')
export class SearchController {
constructor(private readonly searchService: SearchService) {}
@Get()
search(
@Query('q') q = '',
@Query('groupId') groupId?: string,
@Query('tags') tags?: string,
@Query('page') page = '1',
@Query('limit') limit = '20',
) {
const tagList = tags
? tags.split(',').map((t) => t.trim()).filter(Boolean)
: undefined;
return this.searchService.search(q, groupId, tagList, Number(page), Number(limit));
}
}
```
- [ ] **Step 7: Create `apps/api/src/modules/search/search.module.ts`**
```typescript
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { SearchController } from './search.controller';
import { SearchService } from './search.service';
@Module({
imports: [ConfigModule],
controllers: [SearchController],
providers: [SearchService],
})
export class SearchModule {}
```
- [ ] **Step 8: Register SearchModule in `apps/api/src/app.module.ts`**
```typescript
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { PrismaModule } from './prisma/prisma.module';
import { HealthModule } from './modules/health/health.module';
import { SearchModule } from './modules/search/search.module';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
PrismaModule,
HealthModule,
SearchModule,
],
})
export class AppModule {}
```
- [ ] **Step 9: Run all API tests and build**
```bash
pnpm --filter @tower/api test
pnpm --filter @tower/api build
```
Expected: All specs pass (health controller + search controller + search service), zero build errors.
- [ ] **Step 10: Commit**
```bash
git add apps/api/src/modules/search/ \
apps/api/src/app.module.ts \
apps/api/package.json \
pnpm-lock.yaml
git commit -m "feat(api): add SearchModule with GET /search endpoint backed by Meilisearch"
```
---
## Self-Review
**Spec coverage:**
- ✅ Approved messages indexed in Meilisearch — Task 4 (index queue + processor)
- ✅ Index stays in sync with approvals — Task 3 wires indexDoc into approval result, Task 4 enqueues it
- ✅ Full-text search endpoint — Task 5 `GET /search`
- ✅ Filterable by group and tags — `SearchService.search` builds Meilisearch filter string
- ✅ Paginated results — `page` + `limit` query params, `hitsPerPage` in Meilisearch query
- ✅ Index configured with correct attributes — `configureIndex` called on both worker startup and API `onModuleInit`
- ✅ Messages indexed even with no sync routes — `main.ts` always enqueues index job regardless of `forwardJobs.length`
**Placeholder scan:** None found. All code steps contain complete implementations.
**Type consistency:**
- `IndexJobData.approvedAt` is `string` (ISO) throughout types and approval.ts
- `MeiliDocument.approvedAt` is `number` (Unix ms) — conversion happens in `processIndexJob`
- `handleStarReaction` returns `ApprovalResult | null` consistently across approval.ts and approval.test.ts
- `result.forwardJobs` and `result.indexDoc` used consistently in tests and main.ts