import { PrismaClient } from '@prisma/client'; import { createLogger } from '@tower/logger'; import { validateEnv } from '@tower/config'; import { createMeiliClient, configureIndex } from '@tower/search'; import { createIngestQueue } from './queues/ingest.queue'; import { createIngestWorker } from './queues/ingest.processor'; import { createForwardQueue } from './queues/forward.queue'; import { createForwardWorker } from './queues/forward.processor'; import { createIndexQueue } from './queues/index.queue'; import { createIndexWorker } from './queues/index.processor'; import { WhatsAppSessionPool } from './whatsapp/session-pool'; import { detectTags, isFlagged } from './whatsapp/tag-detector'; import { syncGroups } from './whatsapp/group-sync'; import { handleStarReaction } from './core/approval'; const logger = createLogger('tower-worker'); async function bootstrap() { const env = validateEnv(); const prisma = new PrismaClient(); await prisma.$connect(); const adminJids = env.TOWER_ADMIN_JIDS ? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean) : []; const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY); await configureIndex(meiliClient).catch((err) => logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'), ); const ingestQueue = createIngestQueue(env.REDIS_URL); const forwardQueue = createForwardQueue(env.REDIS_URL); const indexQueue = createIndexQueue(env.REDIS_URL); const pool = new WhatsAppSessionPool(); const ingestWorker = createIngestWorker(env.REDIS_URL, prisma); const forwardWorker = createForwardWorker(env.REDIS_URL, pool); const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient); ingestWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Ingest job completed')); ingestWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Ingest job failed')); forwardWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Forward job completed')); forwardWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Forward job failed')); indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed')); indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed')); const accounts = await prisma.account.findMany({ where: { status: 'ACTIVE', platform: 'whatsapp' }, }); if (accounts.length === 0) { logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); } const groupMaps = new Map>(); for (const account of accounts) { groupMaps.set(account.id, new Map()); try { await pool.add( account.id, account.sessionPath, async (msg, accountId) => { const tags = detectTags(msg.content, msg.senderJid, adminJids); if (!isFlagged(tags)) return; const groupMap = groupMaps.get(accountId); if (!groupMap) { logger.error({ accountId }, 'No group map for account — message dropped'); return; } const sourceGroupId = groupMap.get(msg.sourceGroupJid); if (!sourceGroupId) { logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); return; } await ingestQueue.add( 'ingest', { platformMsgId: msg.platformMsgId, platform: 'whatsapp', accountId, sourceGroupId, senderJid: msg.senderJid, senderName: msg.senderName, content: msg.content, tags, }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 } }, ); logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued'); }, async (reaction) => { const result = await handleStarReaction(reaction, adminJids, prisma); if (!result) return; const { forwardJobs, indexDoc } = result; await indexQueue.add('index', indexDoc, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, }); for (const job of forwardJobs) { await forwardQueue.add('forward', job, { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, }); } logger.info( { messageId: indexDoc.messageId, forwardCount: forwardJobs.length }, 'Message approved — indexed and forwarded', ); }, async (groups, accountId) => { logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups'); const map = await syncGroups(groups, accountId, prisma); groupMaps.set(accountId, map); }, ); } catch (err) { logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account'); } } logger.info({ accountCount: accounts.length }, 'Tower worker ready'); const shutdown = async () => { logger.info('Shutting down...'); await pool.closeAll(); await ingestWorker.close(); await forwardWorker.close(); await indexWorker.close(); await ingestQueue.close(); await forwardQueue.close(); await indexQueue.close(); await prisma.$disconnect(); process.exit(0); }; process.on('SIGTERM', shutdown); process.on('SIGINT', shutdown); } bootstrap().catch((err) => { console.error('Worker failed to start', err); process.exit(1); });