diff --git a/apps/worker/src/core/approval.test.ts b/apps/worker/src/core/approval.test.ts index 1917f8e..1799852 100644 --- a/apps/worker/src/core/approval.test.ts +++ b/apps/worker/src/core/approval.test.ts @@ -76,6 +76,29 @@ describe('handleStarReaction', () => { expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); }); + it('returns null on double-approval race (updateMany returns count=0)', async () => { + const prisma = { + message: { + findUnique: jest.fn().mockResolvedValue({ + id: 'msg_1', + status: 'PENDING', + approval: null, + content: 'hello', + senderName: 'Alice', + sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] }, + }), + }, + $transaction: jest.fn().mockImplementation(async (fn: any) => fn({ + message: { updateMany: jest.fn().mockResolvedValue({ count: 0 }) }, + approval: { create: jest.fn().mockResolvedValue({}) }, + })), + } as any; + + const result = await handleStarReaction(makeReaction(), adminJids, prisma); + expect(result).toBeNull(); + expect(prisma.$transaction).toHaveBeenCalled(); + }); + it('approves message and returns empty array when no sync routes', async () => { const prisma = { message: { diff --git a/apps/worker/src/core/approval.ts b/apps/worker/src/core/approval.ts index 578a7f0..6502433 100644 --- a/apps/worker/src/core/approval.ts +++ b/apps/worker/src/core/approval.ts @@ -30,12 +30,14 @@ export async function handleStarReaction( if (message.status !== 'PENDING') return null; if (message.approval) return null; + let approved = false; await prisma.$transaction(async (tx: any) => { const updated = await tx.message.updateMany({ where: { id: message.id, status: 'PENDING' }, data: { status: 'APPROVED' }, }); if (updated.count === 0) return; // another admin approved first — idempotent + approved = true; await tx.approval.create({ data: { messageId: message.id, @@ -45,6 +47,8 @@ export async function handleStarReaction( }); }); + if (!approved) return null; + const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom .filter((route: any) => route.targetGroup != null) .map((route: any) => ({ diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index 11ccf2b..6bfa1b4 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -48,6 +48,7 @@ async function bootstrap() { for (const account of accounts) { groupMaps.set(account.id, new Map()); + try { await pool.add( account.id, account.sessionPath, @@ -55,7 +56,11 @@ async function bootstrap() { const tags = detectTags(msg.content, msg.senderJid, adminJids); if (!isFlagged(tags)) return; - const groupMap = groupMaps.get(accountId) ?? new Map(); + const groupMap = groupMaps.get(accountId); + if (!groupMap) { + logger.error({ accountId }, 'No group map for account — message dropped'); + return; + } const sourceGroupId = groupMap.get(msg.sourceGroupJid); if (!sourceGroupId) { logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); @@ -101,12 +106,16 @@ async function bootstrap() { groupMaps.set(accountId, map); }, ); + } catch (err) { + logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account'); + } } logger.info({ accountCount: accounts.length }, 'Tower worker ready'); const shutdown = async () => { logger.info('Shutting down...'); + await pool.closeAll(); await ingestWorker.close(); await forwardWorker.close(); await ingestQueue.close(); diff --git a/apps/worker/src/whatsapp/session-pool.test.ts b/apps/worker/src/whatsapp/session-pool.test.ts index 3daeb4d..3695bdc 100644 --- a/apps/worker/src/whatsapp/session-pool.test.ts +++ b/apps/worker/src/whatsapp/session-pool.test.ts @@ -5,6 +5,7 @@ jest.mock('./session', () => ({ createWhatsAppSession: jest.fn().mockResolvedValue({ sendMessage: jest.fn().mockResolvedValue({}), logout: jest.fn().mockResolvedValue({}), + end: jest.fn(), }), })); @@ -55,6 +56,21 @@ describe('WhatsAppSessionPool', () => { await expect(pool.remove('acc_unknown')).resolves.not.toThrow(); }); + it('closeAll() calls end() on all sessions and clears the pool', async () => { + await pool.add('acc_1', './sessions/1', jest.fn(), jest.fn(), jest.fn()); + await pool.add('acc_2', './sessions/2', jest.fn(), jest.fn(), jest.fn()); + expect(pool.getAll().size).toBe(2); + + const sock1 = pool.get('acc_1')!; + const sock2 = pool.get('acc_2')!; + + await pool.closeAll(); + + expect(sock1.end).toHaveBeenCalledWith(undefined); + expect(sock2.end).toHaveBeenCalledWith(undefined); + expect(pool.getAll().size).toBe(0); + }); + it('add() injects accountId into onMessage callback', async () => { const onMessage = jest.fn(); const { createWhatsAppSession } = require('./session'); diff --git a/apps/worker/src/whatsapp/session-pool.ts b/apps/worker/src/whatsapp/session-pool.ts index 0c8c24f..73c4fcf 100644 --- a/apps/worker/src/whatsapp/session-pool.ts +++ b/apps/worker/src/whatsapp/session-pool.ts @@ -28,6 +28,10 @@ export class WhatsAppSessionPool { (msg) => onMessage(msg, accountId), (reaction) => onReaction(reaction, accountId), (groups) => onGroups(groups, accountId), + (newSocket) => { + logger.info({ accountId }, 'Session reconnected — updating pool'); + this.sessions.set(accountId, newSocket); + }, ); this.sessions.set(accountId, sock); } @@ -57,4 +61,17 @@ export class WhatsAppSessionPool { logger.info({ accountId }, 'Session removed'); } } + + async closeAll(): Promise { + logger.info({ count: this.sessions.size }, 'Closing all WhatsApp sessions'); + for (const [accountId, sock] of this.sessions) { + try { + sock.end(undefined); + logger.info({ accountId }, 'Session closed'); + } catch (err) { + logger.error({ accountId, err }, 'Error closing session'); + } + } + this.sessions.clear(); + } } diff --git a/apps/worker/src/whatsapp/session.ts b/apps/worker/src/whatsapp/session.ts index e42a134..75c8888 100644 --- a/apps/worker/src/whatsapp/session.ts +++ b/apps/worker/src/whatsapp/session.ts @@ -23,6 +23,7 @@ export async function createWhatsAppSession( onMessage: OnMessageCallback, onReaction: OnReactionCallback, onGroups: OnGroupsCallback, + onReconnect?: (newSocket: WASocket) => void, ): Promise { const { state, saveCreds } = await useMultiFileAuthState(sessionPath); const { version } = await fetchLatestBaileysVersion(); @@ -46,10 +47,17 @@ export async function createWhatsAppSession( logger.info({ reason, shouldReconnect }, 'Connection closed'); if (shouldReconnect) { logger.info('Reconnecting in 5s...'); - setTimeout( - () => createWhatsAppSession(accountId, sessionPath, onMessage, onReaction, onGroups), - 5000, - ); + setTimeout(async () => { + const newSocket = await createWhatsAppSession( + accountId, + sessionPath, + onMessage, + onReaction, + onGroups, + onReconnect, + ); + onReconnect?.(newSocket); + }, 5000); } } else if (connection === 'open') { try {