feat(worker): wire multi-account pool, reactions → approval → forward pipeline

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 17:28:08 +05:30
parent 5ad33fd416
commit 9e3ee0cd38
+79 -45
View File
@@ -1,13 +1,14 @@
import { PrismaClient } from '@prisma/client'; import { PrismaClient } from '@prisma/client';
import { createLogger } from '@tower/logger'; import { createLogger } from '@tower/logger';
import { validateEnv } from '@tower/config'; import { validateEnv } from '@tower/config';
import { IngestJobData } from '@tower/types';
import { createWhatsAppSession } from './whatsapp/session';
import { normalizeMessage } from './whatsapp/normalizer';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { createIngestQueue } from './queues/ingest.queue'; import { createIngestQueue } from './queues/ingest.queue';
import { createIngestWorker } from './queues/ingest.processor'; import { createIngestWorker } from './queues/ingest.processor';
import { createForwardQueue } from './queues/forward.queue';
import { createForwardWorker } from './queues/forward.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';
const logger = createLogger('tower-worker'); const logger = createLogger('tower-worker');
@@ -21,62 +22,95 @@ async function bootstrap() {
: []; : [];
const ingestQueue = createIngestQueue(env.REDIS_URL); const ingestQueue = createIngestQueue(env.REDIS_URL);
const forwardQueue = createForwardQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
ingestWorker.on('completed', (job) => { ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed'));
logger.info({ jobId: job.id }, 'Ingest job completed'); ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed'));
}); forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed'));
ingestWorker.on('failed', (job, err) => { forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed'));
logger.error({ jobId: job?.id, err }, 'Ingest job failed');
// Load active accounts from DB — each becomes one WhatsApp session
const accounts = await prisma.account.findMany({
where: { status: 'ACTIVE', platform: 'whatsapp' },
}); });
let groupMap = new Map<string, string>(); if (accounts.length === 0) {
logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)');
}
await createWhatsAppSession( // Per-account map of groupJid → DB Group id
env.WHATSAPP_SESSION_PATH, const groupMaps = new Map<string, Map<string, string>>();
async (msg) => {
const normalized = normalizeMessage(msg);
if (!normalized) return;
const tags = detectTags(normalized.content, normalized.senderJid, adminJids); for (const account of accounts) {
if (!isFlagged(tags)) return; groupMaps.set(account.id, new Map());
const sourceGroupId = groupMap.get(normalized.sourceGroupJid); await pool.add(
if (!sourceGroupId) { account.id,
logger.warn({ jid: normalized.sourceGroupJid }, 'Unknown group — skipping message'); account.sessionPath,
return; async (msg, accountId) => {
} const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return;
const jobData: IngestJobData = { const groupMap = groupMaps.get(accountId) ?? new Map();
platformMsgId: normalized.platformMsgId, const sourceGroupId = groupMap.get(msg.sourceGroupJid);
platform: 'whatsapp', if (!sourceGroupId) {
sourceGroupId, logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
senderJid: normalized.senderJid, return;
senderName: normalized.senderName, }
content: normalized.content,
tags,
};
await ingestQueue.add('ingest', jobData, { await ingestQueue.add(
attempts: 3, 'ingest',
backoff: { type: 'exponential', delay: 1000 }, {
}); platformMsgId: msg.platformMsgId,
platform: 'whatsapp',
accountId,
sourceGroupId,
senderJid: msg.senderJid,
senderName: msg.senderName,
content: msg.content,
tags,
},
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
);
logger.info({ platformMsgId: normalized.platformMsgId, tags }, 'Message enqueued'); logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
}, },
async (groups) => { async (reaction) => {
logger.info({ count: Object.keys(groups).length }, 'Syncing groups'); const forwardJobs = await handleStarReaction(reaction, adminJids, prisma);
groupMap = await syncGroups(groups, prisma); if (!forwardJobs || forwardJobs.length === 0) return;
logger.info({ count: groupMap.size }, 'Groups synced');
},
);
logger.info('Tower worker ready'); for (const job of forwardJobs) {
await forwardQueue.add('forward', job, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
logger.info(
{ count: forwardJobs.length, messageId: forwardJobs[0]?.messageId },
'Forward jobs enqueued',
);
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
groupMaps.set(accountId, map);
},
);
}
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
const shutdown = async () => { const shutdown = async () => {
logger.info('Shutting down...'); logger.info('Shutting down...');
await ingestWorker.close(); await ingestWorker.close();
await forwardWorker.close();
await ingestQueue.close(); await ingestQueue.close();
await forwardQueue.close();
await prisma.$disconnect(); await prisma.$disconnect();
process.exit(0); process.exit(0);
}; };