diff --git a/apps/worker/package.json b/apps/worker/package.json index 469aa98..0ba0d4e 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -13,6 +13,7 @@ "@prisma/client": "^6.0.0", "@tower/config": "workspace:*", "@tower/logger": "workspace:*", + "@tower/search": "workspace:*", "@tower/types": "workspace:*", "@whiskeysockets/baileys": "7.0.0-rc13", "bullmq": "^5.0.0", diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index 6bfa1b4..3af418d 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -1,10 +1,13 @@ import { PrismaClient } from '@prisma/client'; import { createLogger } from '@tower/logger'; import { validateEnv } from '@tower/config'; +import { createMeiliClient, configureIndex } from '@tower/search'; import { createIngestQueue } from './queues/ingest.queue'; import { createIngestWorker } from './queues/ingest.processor'; import { createForwardQueue } from './queues/forward.queue'; import { createForwardWorker } from './queues/forward.processor'; +import { createIndexQueue } from './queues/index.queue'; +import { createIndexWorker } from './queues/index.processor'; import { WhatsAppSessionPool } from './whatsapp/session-pool'; import { detectTags, isFlagged } from './whatsapp/tag-detector'; import { syncGroups } from './whatsapp/group-sync'; @@ -21,19 +24,27 @@ async function bootstrap() { ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) : []; + const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY); + await configureIndex(meiliClient).catch((err) => + logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'), + ); + const ingestQueue = createIngestQueue(env.REDIS_URL); const forwardQueue = createForwardQueue(env.REDIS_URL); + const indexQueue = createIndexQueue(env.REDIS_URL); const pool = new WhatsAppSessionPool(); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); const forwardWorker = createForwardWorker(env.REDIS_URL, pool); + const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient); ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); + indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed')); + indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed')); - // Load active accounts from DB — each becomes one WhatsApp session const accounts = await prisma.account.findMany({ where: { status: 'ACTIVE', platform: 'whatsapp' }, }); @@ -42,70 +53,76 @@ async function bootstrap() { logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); } - // Per-account map of groupJid → DB Group id const groupMaps = new Map>(); for (const account of accounts) { groupMaps.set(account.id, new Map()); try { - await pool.add( - account.id, - account.sessionPath, - async (msg, accountId) => { - const tags = detectTags(msg.content, msg.senderJid, adminJids); - if (!isFlagged(tags)) return; + await pool.add( + account.id, + account.sessionPath, + async (msg, accountId) => { + const tags = detectTags(msg.content, msg.senderJid, adminJids); + if (!isFlagged(tags)) return; - const groupMap = groupMaps.get(accountId); - if (!groupMap) { - logger.error({ accountId }, 'No group map for account — message dropped'); - return; - } - const sourceGroupId = groupMap.get(msg.sourceGroupJid); - if (!sourceGroupId) { - logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); - return; - } + const groupMap = groupMaps.get(accountId); + if (!groupMap) { + logger.error({ accountId }, 'No group map for account — message dropped'); + return; + } + const sourceGroupId = groupMap.get(msg.sourceGroupJid); + if (!sourceGroupId) { + logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); + return; + } - await ingestQueue.add( - 'ingest', - { - platformMsgId: msg.platformMsgId, - platform: 'whatsapp', - accountId, - sourceGroupId, - senderJid: msg.senderJid, - senderName: msg.senderName, - content: msg.content, - tags, - }, - { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, - ); + await ingestQueue.add( + 'ingest', + { + platformMsgId: msg.platformMsgId, + platform: 'whatsapp', + accountId, + sourceGroupId, + senderJid: msg.senderJid, + senderName: msg.senderName, + content: msg.content, + tags, + }, + { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, + ); - logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); - }, - async (reaction) => { - const forwardJobs = await handleStarReaction(reaction, adminJids, prisma); - if (!forwardJobs || forwardJobs.length === 0) return; + logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); + }, + async (reaction) => { + const result = await handleStarReaction(reaction, adminJids, prisma); + if (!result) return; - for (const job of forwardJobs) { - await forwardQueue.add('forward', job, { + const { forwardJobs, indexDoc } = result; + + await indexQueue.add('index', indexDoc, { attempts: 3, - backoff: { type: 'exponential', delay: 2000 }, + backoff: { type: 'exponential', delay: 1000 }, }); - } - logger.info( - { count: forwardJobs.length, messageId: forwardJobs[0]?.messageId }, - 'Forward jobs enqueued', - ); - }, - async (groups, accountId) => { - logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); - const map = await syncGroups(groups, accountId, prisma); - groupMaps.set(accountId, map); - }, - ); + for (const job of forwardJobs) { + await forwardQueue.add('forward', job, { + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + }); + } + + logger.info( + { messageId: indexDoc.messageId, forwardCount: forwardJobs.length }, + 'Message approved — indexed and forwarded', + ); + }, + async (groups, accountId) => { + logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); + const map = await syncGroups(groups, accountId, prisma); + groupMaps.set(accountId, map); + }, + ); } catch (err) { logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account'); } @@ -118,8 +135,10 @@ async function bootstrap() { await pool.closeAll(); await ingestWorker.close(); await forwardWorker.close(); + await indexWorker.close(); await ingestQueue.close(); await forwardQueue.close(); + await indexQueue.close(); await prisma.$disconnect(); process.exit(0); }; diff --git a/apps/worker/src/queues/index.processor.test.ts b/apps/worker/src/queues/index.processor.test.ts new file mode 100644 index 0000000..267c2bb --- /dev/null +++ b/apps/worker/src/queues/index.processor.test.ts @@ -0,0 +1,59 @@ +import { processIndexJob } from './index.processor'; +import { indexMessage } from '@tower/search'; +import { IndexJobData } from '@tower/types'; + +jest.mock('@tower/search', () => ({ + indexMessage: jest.fn().mockResolvedValue(undefined), + MESSAGES_INDEX: 'tower-messages', +})); + +function makeJob(overrides: Partial = {}): IndexJobData { + return { + messageId: 'msg-1', + content: 'hello world', + senderName: 'Alice', + sourceGroupId: 'grp-1', + sourceGroupName: 'UP Parivar', + tags: ['#important'], + platform: 'whatsapp', + approvedAt: '2026-05-27T10:00:00.000Z', + ...overrides, + }; +} + +describe('processIndexJob', () => { + beforeEach(() => jest.clearAllMocks()); + + it('calls indexMessage with MeiliDocument shape', async () => { + const mockClient = {} as any; + await processIndexJob(makeJob(), mockClient); + expect(indexMessage).toHaveBeenCalledWith(mockClient, { + id: 'msg-1', + content: 'hello world', + senderName: 'Alice', + sourceGroupId: 'grp-1', + sourceGroupName: 'UP Parivar', + tags: ['#important'], + platform: 'whatsapp', + approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(), + }); + }); + + it('converts null senderName to empty string', async () => { + const mockClient = {} as any; + await processIndexJob(makeJob({ senderName: null }), mockClient); + expect(indexMessage).toHaveBeenCalledWith( + mockClient, + expect.objectContaining({ senderName: '' }), + ); + }); + + it('converts approvedAt ISO string to Unix ms number', async () => { + const mockClient = {} as any; + await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient); + expect(indexMessage).toHaveBeenCalledWith( + mockClient, + expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }), + ); + }); +}); diff --git a/apps/worker/src/queues/index.processor.ts b/apps/worker/src/queues/index.processor.ts new file mode 100644 index 0000000..30f8168 --- /dev/null +++ b/apps/worker/src/queues/index.processor.ts @@ -0,0 +1,26 @@ +import { Worker } from 'bullmq'; +import { IndexJobData } from '@tower/types'; +import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search'; +import { parseRedisUrl } from './redis-connection'; + +export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise { + const doc: MeiliDocument = { + id: job.messageId, + content: job.content, + senderName: job.senderName ?? '', + sourceGroupId: job.sourceGroupId, + sourceGroupName: job.sourceGroupName, + tags: job.tags, + platform: job.platform, + approvedAt: new Date(job.approvedAt).getTime(), + }; + await indexMessage(meiliClient, doc); +} + +export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker { + return new Worker( + 'tower-index', + async (job) => processIndexJob(job.data, meiliClient), + { connection: parseRedisUrl(redisUrl) }, + ); +} diff --git a/apps/worker/src/queues/index.queue.ts b/apps/worker/src/queues/index.queue.ts new file mode 100644 index 0000000..817ba69 --- /dev/null +++ b/apps/worker/src/queues/index.queue.ts @@ -0,0 +1,7 @@ +import { Queue } from 'bullmq'; +import { IndexJobData } from '@tower/types'; +import { parseRedisUrl } from './redis-connection'; + +export function createIndexQueue(redisUrl: string): Queue { + return new Queue('tower-index', { connection: parseRedisUrl(redisUrl) }); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8437459..a0c769c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -154,6 +154,9 @@ importers: '@tower/logger': specifier: workspace:* version: link:../../packages/logger + '@tower/search': + specifier: workspace:* + version: link:../../packages/search '@tower/types': specifier: workspace:* version: link:../../packages/types