feat(worker): add index queue and wire Meilisearch indexing after approval

Adds index.queue.ts and index.processor.ts to handle BullMQ indexing jobs,
updates main.ts to create a Meilisearch client, configure the index on startup,
and enqueue index + forward jobs from ApprovalResult after a star reaction.
This commit is contained in:
2026-05-28 00:02:47 +05:30
parent 7d905b166e
commit 1e421c0073
6 changed files with 167 additions and 52 deletions
+1
View File
@@ -13,6 +13,7 @@
"@prisma/client": "^6.0.0", "@prisma/client": "^6.0.0",
"@tower/config": "workspace:*", "@tower/config": "workspace:*",
"@tower/logger": "workspace:*", "@tower/logger": "workspace:*",
"@tower/search": "workspace:*",
"@tower/types": "workspace:*", "@tower/types": "workspace:*",
"@whiskeysockets/baileys": "7.0.0-rc13", "@whiskeysockets/baileys": "7.0.0-rc13",
"bullmq": "^5.0.0", "bullmq": "^5.0.0",
+71 -52
View File
@@ -1,10 +1,13 @@
import { PrismaClient } from '@prisma/client'; import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger'; import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config'; import { validateEnv } from '@tower/config';
import { createMeiliClient, configureIndex } from '@tower/search';
import { createIngestQueue } from './queues/ingest.queue'; import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor'; import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue'; import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor'; import { createForwardWorker } from './queues/forward.processor';
import { createIndexQueue } from './queues/index.queue';
import { createIndexWorker } from './queues/index.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool'; import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector'; import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync'; import { syncGroups } from './whatsapp/group-sync';
@@ -21,19 +24,27 @@ async function bootstrap() {
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: []; : [];
const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY);
await configureIndex(meiliClient).catch((err) =>
logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'),
);
const ingestQueue = createIngestQueue(env.REDIS_URL); const ingestQueue = createIngestQueue(env.REDIS_URL);
const forwardQueue = createForwardQueue(env.REDIS_URL); const forwardQueue = createForwardQueue(env.REDIS_URL);
const indexQueue = createIndexQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool(); const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool); const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient);
ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed'));
indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed'));
// Load active accounts from DB — each becomes one WhatsApp session
const accounts = await prisma.account.findMany({ const accounts = await prisma.account.findMany({
where: { status: 'ACTIVE', platform: 'whatsapp' }, where: { status: 'ACTIVE', platform: 'whatsapp' },
}); });
@@ -42,70 +53,76 @@ async function bootstrap() {
logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)');
} }
// Per-account map of groupJid → DB Group id
const groupMaps = new Map<string, Map<string, string>>(); const groupMaps = new Map<string, Map<string, string>>();
for (const account of accounts) { for (const account of accounts) {
groupMaps.set(account.id, new Map()); groupMaps.set(account.id, new Map());
try { try {
await pool.add( await pool.add(
account.id, account.id,
account.sessionPath, account.sessionPath,
async (msg, accountId) => { async (msg, accountId) => {
const tags = detectTags(msg.content, msg.senderJid, adminJids); const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return; if (!isFlagged(tags)) return;
const groupMap = groupMaps.get(accountId); const groupMap = groupMaps.get(accountId);
if (!groupMap) { if (!groupMap) {
logger.error({ accountId }, 'No group map for account — message dropped'); logger.error({ accountId }, 'No group map for account — message dropped');
return; return;
} }
const sourceGroupId = groupMap.get(msg.sourceGroupJid); const sourceGroupId = groupMap.get(msg.sourceGroupJid);
if (!sourceGroupId) { if (!sourceGroupId) {
logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
return; return;
} }
await ingestQueue.add( await ingestQueue.add(
'ingest', 'ingest',
{ {
platformMsgId: msg.platformMsgId, platformMsgId: msg.platformMsgId,
platform: 'whatsapp', platform: 'whatsapp',
accountId, accountId,
sourceGroupId, sourceGroupId,
senderJid: msg.senderJid, senderJid: msg.senderJid,
senderName: msg.senderName, senderName: msg.senderName,
content: msg.content, content: msg.content,
tags, tags,
}, },
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
); );
logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
}, },
async (reaction) => { async (reaction) => {
const forwardJobs = await handleStarReaction(reaction, adminJids, prisma); const result = await handleStarReaction(reaction, adminJids, prisma);
if (!forwardJobs || forwardJobs.length === 0) return; if (!result) return;
for (const job of forwardJobs) { const { forwardJobs, indexDoc } = result;
await forwardQueue.add('forward', job, {
await indexQueue.add('index', indexDoc, {
attempts: 3, attempts: 3,
backoff: { type: 'exponential', delay: 2000 }, backoff: { type: 'exponential', delay: 1000 },
}); });
}
logger.info( for (const job of forwardJobs) {
{ count: forwardJobs.length, messageId: forwardJobs[0]?.messageId }, await forwardQueue.add('forward', job, {
'Forward jobs enqueued', attempts: 3,
); backoff: { type: 'exponential', delay: 2000 },
}, });
async (groups, accountId) => { }
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma); logger.info(
groupMaps.set(accountId, map); { messageId: indexDoc.messageId, forwardCount: forwardJobs.length },
}, 'Message approved — indexed and forwarded',
); );
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
groupMaps.set(accountId, map);
},
);
} catch (err) { } catch (err) {
logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account'); logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account');
} }
@@ -118,8 +135,10 @@ async function bootstrap() {
await pool.closeAll(); await pool.closeAll();
await ingestWorker.close(); await ingestWorker.close();
await forwardWorker.close(); await forwardWorker.close();
await indexWorker.close();
await ingestQueue.close(); await ingestQueue.close();
await forwardQueue.close(); await forwardQueue.close();
await indexQueue.close();
await prisma.$disconnect(); await prisma.$disconnect();
process.exit(0); process.exit(0);
}; };
@@ -0,0 +1,59 @@
import { processIndexJob } from './index.processor';
import { indexMessage } from '@tower/search';
import { IndexJobData } from '@tower/types';
jest.mock('@tower/search', () => ({
indexMessage: jest.fn().mockResolvedValue(undefined),
MESSAGES_INDEX: 'tower-messages',
}));
function makeJob(overrides: Partial<IndexJobData> = {}): IndexJobData {
return {
messageId: 'msg-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: '2026-05-27T10:00:00.000Z',
...overrides,
};
}
describe('processIndexJob', () => {
beforeEach(() => jest.clearAllMocks());
it('calls indexMessage with MeiliDocument shape', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob(), mockClient);
expect(indexMessage).toHaveBeenCalledWith(mockClient, {
id: 'msg-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
sourceGroupName: 'UP Parivar',
tags: ['#important'],
platform: 'whatsapp',
approvedAt: new Date('2026-05-27T10:00:00.000Z').getTime(),
});
});
it('converts null senderName to empty string', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob({ senderName: null }), mockClient);
expect(indexMessage).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({ senderName: '' }),
);
});
it('converts approvedAt ISO string to Unix ms number', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob({ approvedAt: '2026-01-01T00:00:00.000Z' }), mockClient);
expect(indexMessage).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({ approvedAt: new Date('2026-01-01T00:00:00.000Z').getTime() }),
);
});
});
+26
View File
@@ -0,0 +1,26 @@
import { Worker } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { MeiliSearch, MeiliDocument, indexMessage } from '@tower/search';
import { parseRedisUrl } from './redis-connection';
export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearch): Promise<void> {
const doc: MeiliDocument = {
id: job.messageId,
content: job.content,
senderName: job.senderName ?? '',
sourceGroupId: job.sourceGroupId,
sourceGroupName: job.sourceGroupName,
tags: job.tags,
platform: job.platform,
approvedAt: new Date(job.approvedAt).getTime(),
};
await indexMessage(meiliClient, doc);
}
export function createIndexWorker(redisUrl: string, meiliClient: MeiliSearch): Worker<IndexJobData> {
return new Worker<IndexJobData>(
'tower-index',
async (job) => processIndexJob(job.data, meiliClient),
{ connection: parseRedisUrl(redisUrl) },
);
}
+7
View File
@@ -0,0 +1,7 @@
import { Queue } from 'bullmq';
import { IndexJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
export function createIndexQueue(redisUrl: string): Queue<IndexJobData> {
return new Queue<IndexJobData>('tower-index', { connection: parseRedisUrl(redisUrl) });
}
+3
View File
@@ -154,6 +154,9 @@ importers:
'@tower/logger': '@tower/logger':
specifier: workspace:* specifier: workspace:*
version: link:../../packages/logger version: link:../../packages/logger
'@tower/search':
specifier: workspace:*
version: link:../../packages/search
'@tower/types': '@tower/types':
specifier: workspace:* specifier: workspace:*
version: link:../../packages/types version: link:../../packages/types