Replace `as any` cast with `as unknown as ReturnType<typeof createMeiliClient>` in the mock client factory. This preserves type safety without requiring the mock to implement the full SDK interface. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
39 KiB
Archive & Search Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Index every ⭐-approved message into Meilisearch and expose a paginated full-text search endpoint on the API.
Architecture: A shared @tower/search package owns the Meilisearch client factory, index name constant, settings configuration, and document type — consumed by both the worker (indexing) and the API (querying). handleStarReaction is extended to return an ApprovalResult containing both forward jobs and an IndexJobData document. The worker enqueues that document into a BullMQ tower-index queue immediately after approval; a processor converts it to a MeiliDocument (Unix ms timestamp) and calls indexMessage. The NestJS API gets a SearchModule with a GET /search endpoint that filters by sourceGroupId and tags.
Tech Stack: Meilisearch v1.11 (already in docker-compose), meilisearch npm SDK ^0.44.0, BullMQ 5, NestJS 11, @nestjs/config, TypeScript 5, Jest 29
File Map
Create:
packages/search/package.json—@tower/searchpackage manifestpackages/search/tsconfig.json— extends base tsconfigpackages/search/jest.config.js— ts-jest presetpackages/search/src/index.ts—MeiliDocument,MESSAGES_INDEX,createMeiliClient,configureIndex,indexMessage,deleteMessage, re-exportMeiliSearchtypepackages/search/src/index.test.ts— unit tests for all exportsapps/worker/src/queues/index.queue.ts— BullMQ Queue forIndexJobDataapps/worker/src/queues/index.processor.ts— Worker that callsindexMessageapps/worker/src/queues/index.processor.test.ts— unit testsapps/api/src/modules/search/search.module.tsapps/api/src/modules/search/search.service.ts— wraps Meilisearch,onModuleInitcallsconfigureIndexapps/api/src/modules/search/search.controller.ts—GET /searchapps/api/src/modules/search/search.controller.spec.tsapps/api/src/modules/search/search.service.spec.ts
Modify:
packages/types/src/message.ts— addIndexJobDatainterfaceapps/worker/src/core/approval.ts— newApprovalResultreturn type; buildindexDocapps/worker/src/core/approval.test.ts— update assertions toresult.forwardJobs/result.indexDoc; add missing mock fieldsapps/worker/src/main.ts— create meiliClient + indexQueue + indexWorker; callconfigureIndex; enqueue index jobs after approvalapps/worker/package.json— add@tower/search: workspace:*apps/api/package.json— add@tower/search: workspace:*apps/api/src/app.module.ts— importSearchModule
Task 1: Add IndexJobData to @tower/types
Files:
-
Modify:
packages/types/src/message.ts -
Step 1: Write the failing test
Add to end of packages/types/src/message.ts — first verify the type compiles by adding a test file. Actually @tower/types has no test runner; type correctness is verified at build time. Skip to Step 2.
- Step 2: Add
IndexJobDatatopackages/types/src/message.ts
Append after the ForwardJobData interface (keep every existing line intact):
export interface IndexJobData {
messageId: string; // DB Message.id (cuid)
content: string;
senderName: string | null;
sourceGroupId: string; // DB Group.id
sourceGroupName: string;
tags: string[];
platform: string;
approvedAt: string; // ISO 8601 — converted to Unix ms in the processor
}
- Step 3: Build the package to verify no compile errors
cd /path/to/tower
pnpm --filter @tower/types build
Expected: dist/ updated, zero errors.
- Step 4: Commit
git add packages/types/src/message.ts
git commit -m "feat(types): add IndexJobData for Meilisearch indexing"
Task 2: Create packages/search package
Files:
-
Create:
packages/search/package.json -
Create:
packages/search/tsconfig.json -
Create:
packages/search/jest.config.js -
Create:
packages/search/src/index.ts -
Create:
packages/search/src/index.test.ts -
Step 1: Write the failing tests
Create packages/search/src/index.test.ts:
import {
MESSAGES_INDEX,
configureIndex,
indexMessage,
deleteMessage,
MeiliDocument,
} from './index';
function makeMockClient() {
const mockUpdateSettings = jest.fn().mockResolvedValue({});
const mockAddDocuments = jest.fn().mockResolvedValue({});
const mockDeleteDocument = jest.fn().mockResolvedValue({});
const mockIndex = jest.fn().mockReturnValue({
updateSettings: mockUpdateSettings,
addDocuments: mockAddDocuments,
deleteDocument: mockDeleteDocument,
});
return { client: { index: mockIndex } as any, mockIndex, mockUpdateSettings, mockAddDocuments, mockDeleteDocument };
}
describe('MESSAGES_INDEX', () => {
it('is the string tower-messages', () => {
expect(MESSAGES_INDEX).toBe('tower-messages');
});
});
describe('configureIndex', () => {
it('sets searchable, filterable, and sortable attributes on the correct index', async () => {
const { client, mockIndex, mockUpdateSettings } = makeMockClient();
await configureIndex(client);
expect(mockIndex).toHaveBeenCalledWith('tower-messages');
expect(mockUpdateSettings).toHaveBeenCalledWith({
searchableAttributes: ['content', 'senderName', 'sourceGroupName'],
filterableAttributes: ['sourceGroupId', 'tags', 'platform'],
sortableAttributes: ['approvedAt'],
});
});
});
describe('indexMessage', () => {
it('adds the document to the messages index', async () => {
const { client, mockIndex, mockAddDocuments } = makeMockClient();
const doc: MeiliDocument = {
id: 'msg-1',
content: 'Hello community',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: 1716825600000,
};
await indexMessage(client, doc);
expect(mockIndex).toHaveBeenCalledWith('tower-messages');
expect(mockAddDocuments).toHaveBeenCalledWith([doc]);
});
});
describe('deleteMessage', () => {
it('deletes document by id from the messages index', async () => {
const { client, mockIndex, mockDeleteDocument } = makeMockClient();
await deleteMessage(client, 'msg-1');
expect(mockIndex).toHaveBeenCalledWith('tower-messages');
expect(mockDeleteDocument).toHaveBeenCalledWith('msg-1');
});
});
- Step 2: Create
packages/search/src/index.ts
import { MeiliSearch } from 'meilisearch';
export { MeiliSearch } from 'meilisearch';
export interface MeiliDocument {
id: string; // DB Message.id
content: string;
senderName: string; // empty string when null
sourceGroupId: string;
sourceGroupName: string;
tags: string[];
platform: string;
approvedAt: number; // Unix ms — Meilisearch sorts numbers, not ISO strings
}
export const MESSAGES_INDEX = 'tower-messages';
export function createMeiliClient(url: string, masterKey: string): MeiliSearch {
return new MeiliSearch({ host: url, apiKey: masterKey });
}
export async function configureIndex(client: MeiliSearch): Promise<void> {
await client.index(MESSAGES_INDEX).updateSettings({
searchableAttributes: ['content', 'senderName', 'sourceGroupName'],
filterableAttributes: ['sourceGroupId', 'tags', 'platform'],
sortableAttributes: ['approvedAt'],
});
}
export async function indexMessage(client: MeiliSearch, doc: MeiliDocument): Promise<void> {
await client.index(MESSAGES_INDEX).addDocuments([doc]);
}
export async function deleteMessage(client: MeiliSearch, id: string): Promise<void> {
await client.index(MESSAGES_INDEX).deleteDocument(id);
}
- Step 3: Create
packages/search/package.json
{
"name": "@tower/search",
"version": "0.0.1",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"test": "jest"
},
"dependencies": {
"meilisearch": "^0.44.0"
},
"devDependencies": {
"@types/jest": "^29.0.0",
"@types/node": "^22.0.0",
"jest": "^29.0.0",
"ts-jest": "^29.0.0",
"typescript": "^5.7.0"
}
}
- Step 4: Create
packages/search/tsconfig.json
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src"
},
"include": ["src"]
}
- Step 5: Create
packages/search/jest.config.js
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/*.test.ts'],
rootDir: 'src',
};
- Step 6: Install dependencies and run tests
cd /path/to/tower
pnpm install
pnpm --filter @tower/search test
Expected: 4 tests pass (MESSAGES_INDEX, configureIndex, indexMessage, deleteMessage).
- Step 7: Build
pnpm --filter @tower/search build
Expected: packages/search/dist/ created, zero errors.
- Step 8: Commit
git add packages/search/
git commit -m "feat(search): add @tower/search package with Meilisearch client and helpers"
Task 3: Update handleStarReaction to return ApprovalResult
Files:
- Modify:
apps/worker/src/core/approval.ts - Modify:
apps/worker/src/core/approval.test.ts
The current return type is ForwardJobData[] | null. Change it to ApprovalResult | null where ApprovalResult includes both the forward jobs and the data needed for Meilisearch indexing. This lets main.ts index the message without an extra DB query.
- Step 1: Update
approval.test.tsfirst (TDD — tests fail before change)
Replace apps/worker/src/core/approval.test.ts entirely:
import { handleStarReaction } from './approval';
import { NormalizedReaction } from '@tower/types';
function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
return {
reactorJid: '919876543210@s.whatsapp.net',
targetMsgId: 'TARGET_MSG_123',
sourceGroupJid: '120363043312345678@g.us',
emoji: '⭐',
accountId: 'acc_1',
...overrides,
};
}
const adminJids = ['919876543210@s.whatsapp.net'];
// Minimal valid message returned by prisma.message.findUnique
function makeMessage(overrides: object = {}) {
return {
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp_1',
tags: ['#important'],
platform: 'whatsapp',
sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
...overrides,
};
}
function makePrisma(messageOverrides: object = {}, txCount = 1) {
return {
message: { findUnique: jest.fn().mockResolvedValue(makeMessage(messageOverrides)) },
$transaction: jest.fn().mockImplementation(async (fn: any) =>
fn({
message: { updateMany: jest.fn().mockResolvedValue({ count: txCount }) },
approval: { create: jest.fn().mockResolvedValue({}) },
}),
),
} as any;
}
describe('handleStarReaction', () => {
it('returns null for non-star emoji', async () => {
expect(await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any)).toBeNull();
});
it('returns null when reactor is not an admin', async () => {
expect(
await handleStarReaction(makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any),
).toBeNull();
});
it('returns null when message not found', async () => {
const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
expect(prisma.message.findUnique).toHaveBeenCalledWith({
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' } },
include: {
approval: true,
sourceGroup: {
include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } } },
},
},
});
});
it('returns null when message status is not PENDING', async () => {
const prisma = {
message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'REJECTED' })) },
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('returns null when approval record already exists', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'APPROVED', approval: { id: 'appr_1' } })),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
});
it('returns null on double-approval race (updateMany count=0)', async () => {
const result = await handleStarReaction(makeReaction(), adminJids, makePrisma({}, 0));
expect(result).toBeNull();
});
it('returns ApprovalResult with empty forwardJobs and valid indexDoc when no sync routes', async () => {
const result = await handleStarReaction(makeReaction(), adminJids, makePrisma());
expect(result).not.toBeNull();
expect(result!.forwardJobs).toEqual([]);
expect(result!.indexDoc).toMatchObject({
messageId: 'msg_1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp_1',
sourceGroupName: 'UP Parivar Dallas',
tags: ['#important'],
platform: 'whatsapp',
});
expect(result!.indexDoc.approvedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
});
it('returns ForwardJobData for each active sync route', async () => {
const prisma = makePrisma({
content: 'important announcement',
senderName: 'Bob',
sourceGroup: {
name: 'Source Group',
syncRoutesFrom: [
{ targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
{ targetGroup: { platformId: '888@g.us', accountId: null } },
],
},
});
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result!.forwardJobs).toHaveLength(2);
expect(result!.forwardJobs[0]).toMatchObject({
messageId: 'msg_1',
content: 'important announcement',
sourceGroupName: 'Source Group',
senderName: 'Bob',
toGroupJid: '999@g.us',
fromAccountId: 'acc_2',
});
expect(result!.forwardJobs[1]).toMatchObject({
toGroupJid: '888@g.us',
fromAccountId: 'acc_1', // falls back to reaction.accountId
});
});
});
- Step 2: Run tests — expect 2 failures (the two tests that check
result.forwardJobsandresult.indexDoc)
pnpm --filter @tower/worker test -- --testPathPattern=approval
Expected: FAIL on the last two tests.
- Step 3: Update
apps/worker/src/core/approval.ts
import { NormalizedReaction, ForwardJobData, IndexJobData } from '@tower/types';
export interface ApprovalResult {
forwardJobs: ForwardJobData[];
indexDoc: IndexJobData;
}
export async function handleStarReaction(
reaction: NormalizedReaction,
adminJids: string[],
prisma: any,
): Promise<ApprovalResult | null> {
if (reaction.emoji !== '⭐') return null;
if (!adminJids.includes(reaction.reactorJid)) return null;
const message = await prisma.message.findUnique({
where: {
platform_platformMsgId: {
// TODO: derive platform from NormalizedReaction when multi-platform support is added
platform: 'whatsapp',
platformMsgId: reaction.targetMsgId,
},
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
});
if (!message) return null;
if (message.status !== 'PENDING') return null;
if (message.approval) return null;
let approved = false;
await prisma.$transaction(async (tx: any) => {
const updated = await tx.message.updateMany({
where: { id: message.id, status: 'PENDING' },
data: { status: 'APPROVED' },
});
if (updated.count === 0) return;
approved = true;
await tx.approval.create({
data: {
messageId: message.id,
adminId: reaction.reactorJid,
decision: 'APPROVED',
},
});
});
if (!approved) return null;
const forwardJobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom
.filter((route: any) => route.targetGroup != null)
.map((route: any) => ({
messageId: message.id,
content: message.content,
sourceGroupName: message.sourceGroup.name,
senderName: message.senderName ?? undefined,
toGroupJid: route.targetGroup.platformId,
fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
}));
const indexDoc: IndexJobData = {
messageId: message.id,
content: message.content,
senderName: message.senderName ?? null,
sourceGroupId: message.sourceGroupId,
sourceGroupName: message.sourceGroup.name,
tags: message.tags,
platform: message.platform,
approvedAt: new Date().toISOString(),
};
return { forwardJobs, indexDoc };
}
- Step 4: Run tests — all pass
pnpm --filter @tower/worker test -- --testPathPattern=approval
Expected: 8 tests pass.
- Step 5: Commit
git add apps/worker/src/core/approval.ts apps/worker/src/core/approval.test.ts
git commit -m "feat(worker): handleStarReaction returns ApprovalResult with indexDoc"
Task 4: Worker — index queue, processor, and wire main.ts
Files:
-
Create:
apps/worker/src/queues/index.queue.ts -
Create:
apps/worker/src/queues/index.processor.ts -
Create:
apps/worker/src/queues/index.processor.test.ts -
Modify:
apps/worker/package.json— add@tower/search: workspace:* -
Modify:
apps/worker/src/main.ts— create meiliClient, configureIndex, indexQueue, indexWorker, update reaction handler, update shutdown -
Step 1: Add
@tower/searchto worker dependencies
Edit apps/worker/package.json — add to the "dependencies" object:
"@tower/search": "workspace:*"
Then install:
pnpm install
- Step 2: Write the failing processor test
Create apps/worker/src/queues/index.processor.test.ts:
import { processIndexJob } from './index.processor';
import { indexMessage } from '@tower/search';
import { IndexJobData } from '@tower/types';
jest.mock('@tower/search', () => ({
indexMessage: jest.fn().mockResolvedValue(undefined),
MESSAGES_INDEX: 'tower-messages',
}));
function makeJob(overrides: Partial<IndexJobData> = {}): IndexJobData {
return {
messageId: 'msg-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: '2026-05-27T10:00:00.000Z',
...overrides,
};
}
describe('processIndexJob', () => {
beforeEach(() => jest.clearAllMocks());
it('calls indexMessage with MeiliDocument shape', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob(), mockClient);
expect(indexMessage).toHaveBeenCalledWith(mockClient, {
id: 'msg-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(),
});
});
it('converts null senderName to empty string', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob({ senderName: null }), mockClient);
expect(indexMessage).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({ senderName: '' }),
);
});
it('converts approvedAt ISO string to Unix ms number', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient);
expect(indexMessage).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }),
);
});
});
- Step 3: Run test — expect failure
pnpm --filter @tower/worker test -- --testPathPattern=index.processor
Expected: FAIL — Cannot find module './index.processor'.
- Step 4: Create
apps/worker/src/queues/index.queue.ts
import { Queue } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
export function createIndexQueue(redisUrl: string): Queue<IndexJobData> {
return new Queue<IndexJobData>('tower-index', { connection: parseRedisUrl(redisUrl) });
}
- Step 5: Create
apps/worker/src/queues/index.processor.ts
import { Worker } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search';
import { parseRedisUrl } from './redis-connection';
export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise<void> {
const doc: MeiliDocument = {
id: job.messageId,
content: job.content,
senderName: job.senderName ?? '',
sourceGroupId: job.sourceGroupId,
sourceGroupName: job.sourceGroupName,
tags: job.tags,
platform: job.platform,
approvedAt: new Date(job.approvedAt).getTime(),
};
await indexMessage(meiliClient, doc);
}
export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker<IndexJobData> {
return new Worker<IndexJobData>(
'tower-index',
async (job) => processIndexJob(job.data, meiliClient),
{ connection: parseRedisUrl(redisUrl) },
);
}
- Step 6: Run tests — all pass
pnpm --filter @tower/worker test -- --testPathPattern=index.processor
Expected: 3 tests pass.
- Step 7: Update
apps/worker/src/main.ts
Replace the entire file with:
import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config';
import { createMeiliClient, configureIndex } from '@tower/search';
import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { createIndexQueue } from './queues/index.queue';
import { createIndexWorker } from './queues/index.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';
const logger = createLogger('tower-worker');
async function bootstrap() {
const env = validateEnv();
const prisma = new PrismaClient();
await prisma.$connect();
const adminJids = env.TOWER_ADMIN_JIDS
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: [];
const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY);
await configureIndex(meiliClient).catch((err) =>
logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'),
);
const ingestQueue = createIngestQueue(env.REDIS_URL);
const forwardQueue = createForwardQueue(env.REDIS_URL);
const indexQueue = createIndexQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient);
ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed'));
indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed'));
const accounts = await prisma.account.findMany({
where: { status: 'ACTIVE', platform: 'whatsapp' },
});
if (accounts.length === 0) {
logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)');
}
const groupMaps = new Map<string, Map<string, string>>();
for (const account of accounts) {
groupMaps.set(account.id, new Map());
try {
await pool.add(
account.id,
account.sessionPath,
async (msg, accountId) => {
const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return;
const groupMap = groupMaps.get(accountId);
if (!groupMap) {
logger.error({ accountId }, 'No group map for account — message dropped');
return;
}
const sourceGroupId = groupMap.get(msg.sourceGroupJid);
if (!sourceGroupId) {
logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
return;
}
await ingestQueue.add(
'ingest',
{
platformMsgId: msg.platformMsgId,
platform: 'whatsapp',
accountId,
sourceGroupId,
senderJid: msg.senderJid,
senderName: msg.senderName,
content: msg.content,
tags,
},
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
);
logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
},
async (reaction) => {
const result = await handleStarReaction(reaction, adminJids, prisma);
if (!result) return;
const { forwardJobs, indexDoc } = result;
await indexQueue.add('index', indexDoc, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
for (const job of forwardJobs) {
await forwardQueue.add('forward', job, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
logger.info(
{ messageId: indexDoc.messageId, forwardCount: forwardJobs.length },
'Message approved — indexed and forwarded',
);
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
groupMaps.set(accountId, map);
},
);
} catch (err) {
logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account');
}
}
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
const shutdown = async () => {
logger.info('Shutting down...');
await pool.closeAll();
await ingestWorker.close();
await forwardWorker.close();
await indexWorker.close();
await ingestQueue.close();
await forwardQueue.close();
await indexQueue.close();
await prisma.$disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
bootstrap().catch((err) => {
console.error('Worker failed to start', err);
process.exit(1);
});
- Step 8: Run all worker tests and build
pnpm --filter @tower/worker test
pnpm --filter @tower/worker build
Expected: All tests pass (the approval.test.ts changes in Task 3 must already be committed), zero build errors.
- Step 9: Commit
git add apps/worker/src/queues/index.queue.ts \
apps/worker/src/queues/index.processor.ts \
apps/worker/src/queues/index.processor.test.ts \
apps/worker/src/main.ts \
apps/worker/package.json \
pnpm-lock.yaml
git commit -m "feat(worker): add index queue and wire Meilisearch indexing after approval"
Task 5: API — SearchModule
Files:
-
Create:
apps/api/src/modules/search/search.module.ts -
Create:
apps/api/src/modules/search/search.service.ts -
Create:
apps/api/src/modules/search/search.controller.ts -
Create:
apps/api/src/modules/search/search.controller.spec.ts -
Create:
apps/api/src/modules/search/search.service.spec.ts -
Modify:
apps/api/package.json— add@tower/search: workspace:* -
Modify:
apps/api/src/app.module.ts— registerSearchModule -
Step 1: Add
@tower/searchto API dependencies
Edit apps/api/package.json — add to "dependencies":
"@tower/search": "workspace:*"
Then:
pnpm install
- Step 2: Write failing controller spec
Create apps/api/src/modules/search/search.controller.spec.ts:
import { Test, TestingModule } from '@nestjs/testing';
import { SearchController } from './search.controller';
import { SearchService } from './search.service';
const mockSearchService = { search: jest.fn() };
describe('SearchController', () => {
let controller: SearchController;
beforeEach(async () => {
jest.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
controllers: [SearchController],
providers: [{ provide: SearchService, useValue: mockSearchService }],
}).compile();
controller = module.get<SearchController>(SearchController);
});
it('calls service with all parsed params', async () => {
mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 2, limit: 10, query: 'hello' });
await controller.search('hello', 'grp-1', 'important,event', '2', '10');
expect(mockSearchService.search).toHaveBeenCalledWith('hello', 'grp-1', ['important', 'event'], 2, 10);
});
it('defaults page to 1 and limit to 20 when not provided', async () => {
mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' });
await controller.search('');
expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, undefined, 1, 20);
});
it('returns the service result directly', async () => {
const expected = { hits: [{ id: 'msg-1' }], total: 1, page: 1, limit: 20, query: 'test' };
mockSearchService.search.mockResolvedValue(expected);
const result = await controller.search('test');
expect(result).toEqual(expected);
});
it('splits tags on comma and trims whitespace', async () => {
mockSearchService.search.mockResolvedValue({ hits: [], total: 0, page: 1, limit: 20, query: '' });
await controller.search('', undefined, ' important , event ');
expect(mockSearchService.search).toHaveBeenCalledWith('', undefined, ['important', 'event'], 1, 20);
});
});
- Step 3: Write failing service spec
Create apps/api/src/modules/search/search.service.spec.ts:
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { SearchService } from './search.service';
import * as searchPkg from '@tower/search';
jest.mock('@tower/search', () => ({
createMeiliClient: jest.fn(),
configureIndex: jest.fn().mockResolvedValue(undefined),
MESSAGES_INDEX: 'tower-messages',
}));
describe('SearchService', () => {
let service: SearchService;
const mockSearch = jest.fn();
const mockIndex = jest.fn().mockReturnValue({ search: mockSearch });
const mockClient = { index: mockIndex };
beforeEach(async () => {
jest.clearAllMocks();
(searchPkg.createMeiliClient as jest.Mock).mockReturnValue(mockClient);
const module: TestingModule = await Test.createTestingModule({
providers: [
SearchService,
{ provide: ConfigService, useValue: { get: jest.fn().mockReturnValue('') } },
],
}).compile();
service = module.get<SearchService>(SearchService);
await service.onModuleInit();
});
it('calls configureIndex on init', () => {
expect(searchPkg.configureIndex).toHaveBeenCalledWith(mockClient);
});
it('returns hits and total', async () => {
mockSearch.mockResolvedValue({ hits: [{ id: 'msg-1', content: 'hello' }], totalHits: 1 });
const result = await service.search('hello');
expect(result.hits).toHaveLength(1);
expect(result.total).toBe(1);
expect(result.query).toBe('hello');
});
it('searches with no filter when no groupId or tags', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('test');
expect(mockSearch).toHaveBeenCalledWith('test', expect.objectContaining({ filter: undefined }));
});
it('applies sourceGroupId filter', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello', 'grp-1');
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({ filter: 'sourceGroupId = "grp-1"' }),
);
});
it('applies tags filter', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello', undefined, ['#important']);
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({ filter: 'tags = "#important"' }),
);
});
it('combines groupId and tags filters with AND', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello', 'grp-1', ['#important', '#event']);
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({
filter: 'sourceGroupId = "grp-1" AND tags = "#important" AND tags = "#event"',
}),
);
});
it('defaults page to 1 and hitsPerPage to 20', async () => {
mockSearch.mockResolvedValue({ hits: [], totalHits: 0 });
await service.search('hello');
expect(mockSearch).toHaveBeenCalledWith(
'hello',
expect.objectContaining({ page: 1, hitsPerPage: 20 }),
);
});
});
- Step 4: Run specs — expect failure
pnpm --filter @tower/api test
Expected: FAIL — modules not found.
- Step 5: Create
apps/api/src/modules/search/search.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import {
MeiliSearch,
MeiliDocument,
MESSAGES_INDEX,
createMeiliClient,
configureIndex,
} from '@tower/search';
@Injectable()
export class SearchService implements OnModuleInit {
private readonly client: MeiliSearch;
constructor(private readonly config: ConfigService) {
this.client = createMeiliClient(
this.config.get('MEILI_URL', 'http://localhost:7700'),
this.config.get('MEILI_MASTER_KEY', ''),
);
}
async onModuleInit(): Promise<void> {
await configureIndex(this.client).catch((err) =>
console.warn('Failed to configure Meilisearch index:', err),
);
}
async search(
query: string,
groupId?: string,
tags?: string[],
page = 1,
limit = 20,
): Promise<{ hits: MeiliDocument[]; total: number; page: number; limit: number; query: string }> {
const filters: string[] = [];
if (groupId) filters.push(`sourceGroupId = "${groupId}"`);
if (tags?.length) filters.push(...tags.map((t) => `tags = "${t}"`));
const result = await this.client.index(MESSAGES_INDEX).search(query, {
filter: filters.length ? filters.join(' AND ') : undefined,
page,
hitsPerPage: limit,
sort: ['approvedAt:desc'],
});
return {
hits: result.hits as MeiliDocument[],
total: result.totalHits ?? 0,
page,
limit,
query,
};
}
}
- Step 6: Create
apps/api/src/modules/search/search.controller.ts
import { Controller, Get, Query } from '@nestjs/common';
import { SearchService } from './search.service';
@Controller('search')
export class SearchController {
constructor(private readonly searchService: SearchService) {}
@Get()
search(
@Query('q') q = '',
@Query('groupId') groupId?: string,
@Query('tags') tags?: string,
@Query('page') page = '1',
@Query('limit') limit = '20',
) {
const tagList = tags
? tags.split(',').map((t) => t.trim()).filter(Boolean)
: undefined;
return this.searchService.search(q, groupId, tagList, Number(page), Number(limit));
}
}
- Step 7: Create
apps/api/src/modules/search/search.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { SearchController } from './search.controller';
import { SearchService } from './search.service';
@Module({
imports: [ConfigModule],
controllers: [SearchController],
providers: [SearchService],
})
export class SearchModule {}
- Step 8: Register SearchModule in
apps/api/src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { PrismaModule } from './prisma/prisma.module';
import { HealthModule } from './modules/health/health.module';
import { SearchModule } from './modules/search/search.module';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
PrismaModule,
HealthModule,
SearchModule,
],
})
export class AppModule {}
- Step 9: Run all API tests and build
pnpm --filter @tower/api test
pnpm --filter @tower/api build
Expected: All specs pass (health controller + search controller + search service), zero build errors.
- Step 10: Commit
git add apps/api/src/modules/search/ \
apps/api/src/app.module.ts \
apps/api/package.json \
pnpm-lock.yaml
git commit -m "feat(api): add SearchModule with GET /search endpoint backed by Meilisearch"
Self-Review
Spec coverage:
- ✅ Approved messages indexed in Meilisearch — Task 4 (index queue + processor)
- ✅ Index stays in sync with approvals — Task 3 wires indexDoc into approval result, Task 4 enqueues it
- ✅ Full-text search endpoint — Task 5
GET /search - ✅ Filterable by group and tags —
SearchService.searchbuilds Meilisearch filter string - ✅ Paginated results —
page+limitquery params,hitsPerPagein Meilisearch query - ✅ Index configured with correct attributes —
configureIndexcalled on both worker startup and APIonModuleInit - ✅ Messages indexed even with no sync routes —
main.tsalways enqueues index job regardless offorwardJobs.length
Placeholder scan: None found. All code steps contain complete implementations.
Type consistency:
IndexJobData.approvedAtisstring(ISO) throughout types and approval.tsMeiliDocument.approvedAtisnumber(Unix ms) — conversion happens inprocessIndexJobhandleStarReactionreturnsApprovalResult | nullconsistently across approval.ts and approval.test.tsresult.forwardJobsandresult.indexDocused consistently in tests and main.ts