From 9e3ee0cd388e56d86ed92f2f775d8ce7339f09d2 Mon Sep 17 00:00:00 2001 From: maaz519 Date: Wed, 27 May 2026 17:28:08 +0530 Subject: [PATCH] =?UTF-8?q?feat(worker):=20wire=20multi-account=20pool,=20?= =?UTF-8?q?reactions=20=E2=86=92=20approval=20=E2=86=92=20forward=20pipeli?= =?UTF-8?q?ne?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- apps/worker/src/main.ts | 124 +++++++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 45 deletions(-) diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index a5cf162..11ccf2b 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -1,13 +1,14 @@ import { PrismaClient } from '@prisma/client'; import { createLogger } from '@tower/logger'; import { validateEnv } from '@tower/config'; -import { IngestJobData } from '@tower/types'; -import { createWhatsAppSession } from './whatsapp/session'; -import { normalizeMessage } from './whatsapp/normalizer'; -import { detectTags, isFlagged } from './whatsapp/tag-detector'; -import { syncGroups } from './whatsapp/group-sync'; import { createIngestQueue } from './queues/ingest.queue'; import { createIngestWorker } from './queues/ingest.processor'; +import { createForwardQueue } from './queues/forward.queue'; +import { createForwardWorker } from './queues/forward.processor'; +import { WhatsAppSessionPool } from './whatsapp/session-pool'; +import { detectTags, isFlagged } from './whatsapp/tag-detector'; +import { syncGroups } from './whatsapp/group-sync'; +import { handleStarReaction } from './core/approval'; const logger = createLogger('tower-worker'); @@ -21,62 +22,95 @@ async function bootstrap() { : []; const ingestQueue = createIngestQueue(env.REDIS_URL); + const forwardQueue = createForwardQueue(env.REDIS_URL); + const pool = new WhatsAppSessionPool(); + const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); + const forwardWorker = createForwardWorker(env.REDIS_URL, pool); - ingestWorker.on('completed', (job) => { - logger.info({ jobId: job.id }, 'Ingest job completed'); - }); - ingestWorker.on('failed', (job, err) => { - logger.error({ jobId: job?.id, err }, 'Ingest job failed'); + ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); + ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); + forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); + forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); + + // Load active accounts from DB — each becomes one WhatsApp session + const accounts = await prisma.account.findMany({ + where: { status: 'ACTIVE', platform: 'whatsapp' }, }); - let groupMap = new Map(); + if (accounts.length === 0) { + logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); + } - await createWhatsAppSession( - env.WHATSAPP_SESSION_PATH, - async (msg) => { - const normalized = normalizeMessage(msg); - if (!normalized) return; + // Per-account map of groupJid → DB Group id + const groupMaps = new Map>(); - const tags = detectTags(normalized.content, normalized.senderJid, adminJids); - if (!isFlagged(tags)) return; + for (const account of accounts) { + groupMaps.set(account.id, new Map()); - const sourceGroupId = groupMap.get(normalized.sourceGroupJid); - if (!sourceGroupId) { - logger.warn({ jid: normalized.sourceGroupJid }, 'Unknown group — skipping message'); - return; - } + await pool.add( + account.id, + account.sessionPath, + async (msg, accountId) => { + const tags = detectTags(msg.content, msg.senderJid, adminJids); + if (!isFlagged(tags)) return; - const jobData: IngestJobData = { - platformMsgId: normalized.platformMsgId, - platform: 'whatsapp', - sourceGroupId, - senderJid: normalized.senderJid, - senderName: normalized.senderName, - content: normalized.content, - tags, - }; + const groupMap = groupMaps.get(accountId) ?? new Map(); + const sourceGroupId = groupMap.get(msg.sourceGroupJid); + if (!sourceGroupId) { + logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); + return; + } - await ingestQueue.add('ingest', jobData, { - attempts: 3, - backoff: { type: 'exponential', delay: 1000 }, - }); + await ingestQueue.add( + 'ingest', + { + platformMsgId: msg.platformMsgId, + platform: 'whatsapp', + accountId, + sourceGroupId, + senderJid: msg.senderJid, + senderName: msg.senderName, + content: msg.content, + tags, + }, + { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, + ); - logger.info({ platformMsgId: normalized.platformMsgId, tags }, 'Message enqueued'); - }, - async (groups) => { - logger.info({ count: Object.keys(groups).length }, 'Syncing groups'); - groupMap = await syncGroups(groups, prisma); - logger.info({ count: groupMap.size }, 'Groups synced'); - }, - ); + logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); + }, + async (reaction) => { + const forwardJobs = await handleStarReaction(reaction, adminJids, prisma); + if (!forwardJobs || forwardJobs.length === 0) return; - logger.info('Tower worker ready'); + for (const job of forwardJobs) { + await forwardQueue.add('forward', job, { + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + }); + } + + logger.info( + { count: forwardJobs.length, messageId: forwardJobs[0]?.messageId }, + 'Forward jobs enqueued', + ); + }, + async (groups, accountId) => { + logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); + const map = await syncGroups(groups, accountId, prisma); + groupMaps.set(accountId, map); + }, + ); + } + + logger.info({ accountCount: accounts.length }, 'Tower worker ready'); const shutdown = async () => { logger.info('Shutting down...'); await ingestWorker.close(); + await forwardWorker.close(); await ingestQueue.close(); + await forwardQueue.close(); await prisma.$disconnect(); process.exit(0); };