From 952a0e9b49a40d2940d36d9d10ee0f0afc33ce0e Mon Sep 17 00:00:00 2001 From: maaz519 Date: Fri, 29 May 2026 11:44:54 +0530 Subject: [PATCH] feat: pass JID on connect; extract startAccount() helper; poll 30s for new accounts Co-Authored-By: Claude Sonnet 4.6 --- apps/worker/src/main.ts | 49 ++++++++++++++----- apps/worker/src/whatsapp/session-pool.test.ts | 17 +++++++ apps/worker/src/whatsapp/session-pool.ts | 4 +- apps/worker/src/whatsapp/session.test.ts | 2 +- apps/worker/src/whatsapp/session.ts | 4 +- 5 files changed, 58 insertions(+), 18 deletions(-) diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index b163df5..dfde1db 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -45,19 +45,10 @@ async function bootstrap() { indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed')); indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed')); - const accounts = await prisma.account.findMany({ - where: { status: 'ACTIVE', platform: 'whatsapp' }, - }); - - if (accounts.length === 0) { - logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)'); - } - const groupMaps = new Map>(); - for (const account of accounts) { + async function startAccount(account: { id: string; sessionPath: string }) { groupMaps.set(account.id, new Map()); - try { await pool.add( account.id, @@ -129,13 +120,13 @@ async function bootstrap() { }).catch((err) => logger.error({ accountId, err }, 'Failed to store QR in DB')); logger.info({ accountId }, 'QR code updated'); }, - async (status, accountId) => { + async (status, accountId, jid?) => { if (status === 'connected') { await prisma.account.update({ where: { id: accountId }, - data: { qrCode: null, status: 'ACTIVE' }, + data: { qrCode: null, status: 'ACTIVE', ...(jid ? { jid } : {}) }, }).catch((err) => logger.error({ accountId, err }, 'Failed to update account status')); - logger.info({ accountId }, 'Account connected — QR cleared'); + logger.info({ accountId, jid }, 'Account connected — QR cleared'); } else if (status === 'logged_out') { await prisma.account.update({ where: { id: accountId }, @@ -150,8 +141,40 @@ async function bootstrap() { } } + // Load ACTIVE and DISCONNECTED accounts at startup (DISCONNECTED ones need re-auth) + const accounts = await prisma.account.findMany({ + where: { status: { in: ['ACTIVE', 'DISCONNECTED'] }, platform: 'whatsapp' }, + select: { id: true, sessionPath: true }, + }); + + if (accounts.length === 0) { + logger.warn('No WhatsApp accounts found — add one via the dashboard'); + } + + for (const account of accounts) { + await startAccount(account); + } + logger.info({ accountCount: accounts.length }, 'Tower worker ready'); + // Poll every 30s for accounts added via the dashboard while worker is running + setInterval(async () => { + try { + const all = await prisma.account.findMany({ + where: { status: { in: ['ACTIVE', 'DISCONNECTED'] }, platform: 'whatsapp' }, + select: { id: true, sessionPath: true }, + }); + for (const account of all) { + if (!pool.get(account.id)) { + logger.info({ accountId: account.id }, 'New account detected — starting session'); + await startAccount(account); + } + } + } catch (err) { + logger.error({ err }, 'Error polling for new accounts'); + } + }, 30_000); + const shutdown = async () => { logger.info('Shutting down...'); await pool.closeAll(); diff --git a/apps/worker/src/whatsapp/session-pool.test.ts b/apps/worker/src/whatsapp/session-pool.test.ts index 18d642d..b9ecf6b 100644 --- a/apps/worker/src/whatsapp/session-pool.test.ts +++ b/apps/worker/src/whatsapp/session-pool.test.ts @@ -106,4 +106,21 @@ describe('WhatsAppSessionPool', () => { await capturedOnQr('test-qr'); expect(onQr).toHaveBeenCalledWith('test-qr', 'acc_1'); }); + + it('add() injects accountId and jid into onStatus callback', async () => { + const onStatus = jest.fn(); + const { createWhatsAppSession } = require('./session'); + + let capturedOnStatus: any; + (createWhatsAppSession as jest.Mock).mockImplementationOnce( + (_id: string, _path: string, _onMsg: any, _onReaction: any, _onGroups: any, _onReconnect: any, _onQr: any, statusCb: any) => { + capturedOnStatus = statusCb; + return Promise.resolve({ sendMessage: jest.fn(), logout: jest.fn(), end: jest.fn() }); + }, + ); + + await pool.add('acc_1', './sessions/1', jest.fn(), jest.fn(), jest.fn(), undefined, onStatus); + await capturedOnStatus('connected', '1234@s.whatsapp.net'); + expect(onStatus).toHaveBeenCalledWith('connected', 'acc_1', '1234@s.whatsapp.net'); + }); }); diff --git a/apps/worker/src/whatsapp/session-pool.ts b/apps/worker/src/whatsapp/session-pool.ts index 619f892..02e6b1c 100644 --- a/apps/worker/src/whatsapp/session-pool.ts +++ b/apps/worker/src/whatsapp/session-pool.ts @@ -12,7 +12,7 @@ export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: str // groups typed as `any` to avoid leaking GroupMetadata (Baileys type) into main.ts export type PoolGroupsCallback = (groups: any, accountId: string) => Promise | void; export type PoolQrCallback = (qr: string, accountId: string) => Promise | void; -export type PoolStatusCallback = (status: string, accountId: string) => Promise | void; +export type PoolStatusCallback = (status: string, accountId: string, jid?: string) => Promise | void; export class WhatsAppSessionPool { private sessions = new Map(); @@ -38,7 +38,7 @@ export class WhatsAppSessionPool { this.sessions.set(accountId, newSocket); }, onQr ? (qr) => onQr(qr, accountId) : undefined, - onStatus ? (status) => onStatus(status, accountId) : undefined, + onStatus ? (status, jid?) => onStatus(status, accountId, jid) : undefined, ); this.sessions.set(accountId, sock); } diff --git a/apps/worker/src/whatsapp/session.test.ts b/apps/worker/src/whatsapp/session.test.ts index c2fe80e..ef3fd6b 100644 --- a/apps/worker/src/whatsapp/session.test.ts +++ b/apps/worker/src/whatsapp/session.test.ts @@ -70,7 +70,7 @@ describe('createWhatsAppSession', () => { const onStatus = jest.fn(); await createWhatsAppSession('acc_1', '/sessions/1', jest.fn(), jest.fn(), jest.fn(), undefined, undefined, onStatus); await connectionUpdateHandler({ connection: 'open' }); - expect(onStatus).toHaveBeenCalledWith('connected'); + expect(onStatus).toHaveBeenCalledWith('connected', undefined); }); it('calls onStatus with "disconnected" on non-logout close', async () => { diff --git a/apps/worker/src/whatsapp/session.ts b/apps/worker/src/whatsapp/session.ts index fd5fdcf..b1e2cdd 100644 --- a/apps/worker/src/whatsapp/session.ts +++ b/apps/worker/src/whatsapp/session.ts @@ -18,7 +18,7 @@ export type OnMessageCallback = (msg: NormalizedMessage) => Promise | void export type OnReactionCallback = (reaction: NormalizedReaction) => Promise | void; export type OnGroupsCallback = (groups: Record) => Promise | void; export type OnQrCallback = (qr: string) => Promise | void; -export type OnStatusCallback = (status: 'connected' | 'disconnected' | 'logged_out') => Promise | void; +export type OnStatusCallback = (status: 'connected' | 'disconnected' | 'logged_out', jid?: string) => Promise | void; export async function createWhatsAppSession( accountId: string, @@ -77,7 +77,7 @@ export async function createWhatsAppSession( }, 5000); } } else if (connection === 'open') { - await onStatus?.('connected'); + await onStatus?.('connected', sock.user?.id ?? undefined); try { logger.info({ accountId }, 'WhatsApp connected'); const groups = await sock.groupFetchAllParticipating();