feat: pass JID on connect; extract startAccount() helper; poll 30s for new accounts
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+36
-13
@@ -45,19 +45,10 @@ async function bootstrap() {
|
||||
indexWorker.on('completed', (job) => logger.info({ jobId: job.id }, 'Index job completed'));
|
||||
indexWorker.on('failed', (job, err) => logger.error({ jobId: job?.id, err }, 'Index job failed'));
|
||||
|
||||
const accounts = await prisma.account.findMany({
|
||||
where: { status: 'ACTIVE', platform: 'whatsapp' },
|
||||
});
|
||||
|
||||
if (accounts.length === 0) {
|
||||
logger.warn('No active WhatsApp accounts found — seed one in the Account table (see docs)');
|
||||
}
|
||||
|
||||
const groupMaps = new Map<string, Map<string, string>>();
|
||||
|
||||
for (const account of accounts) {
|
||||
async function startAccount(account: { id: string; sessionPath: string }) {
|
||||
groupMaps.set(account.id, new Map());
|
||||
|
||||
try {
|
||||
await pool.add(
|
||||
account.id,
|
||||
@@ -129,13 +120,13 @@ async function bootstrap() {
|
||||
}).catch((err) => logger.error({ accountId, err }, 'Failed to store QR in DB'));
|
||||
logger.info({ accountId }, 'QR code updated');
|
||||
},
|
||||
async (status, accountId) => {
|
||||
async (status, accountId, jid?) => {
|
||||
if (status === 'connected') {
|
||||
await prisma.account.update({
|
||||
where: { id: accountId },
|
||||
data: { qrCode: null, status: 'ACTIVE' },
|
||||
data: { qrCode: null, status: 'ACTIVE', ...(jid ? { jid } : {}) },
|
||||
}).catch((err) => logger.error({ accountId, err }, 'Failed to update account status'));
|
||||
logger.info({ accountId }, 'Account connected — QR cleared');
|
||||
logger.info({ accountId, jid }, 'Account connected — QR cleared');
|
||||
} else if (status === 'logged_out') {
|
||||
await prisma.account.update({
|
||||
where: { id: accountId },
|
||||
@@ -150,8 +141,40 @@ async function bootstrap() {
|
||||
}
|
||||
}
|
||||
|
||||
// Load ACTIVE and DISCONNECTED accounts at startup (DISCONNECTED ones need re-auth)
|
||||
const accounts = await prisma.account.findMany({
|
||||
where: { status: { in: ['ACTIVE', 'DISCONNECTED'] }, platform: 'whatsapp' },
|
||||
select: { id: true, sessionPath: true },
|
||||
});
|
||||
|
||||
if (accounts.length === 0) {
|
||||
logger.warn('No WhatsApp accounts found — add one via the dashboard');
|
||||
}
|
||||
|
||||
for (const account of accounts) {
|
||||
await startAccount(account);
|
||||
}
|
||||
|
||||
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
|
||||
|
||||
// Poll every 30s for accounts added via the dashboard while worker is running
|
||||
setInterval(async () => {
|
||||
try {
|
||||
const all = await prisma.account.findMany({
|
||||
where: { status: { in: ['ACTIVE', 'DISCONNECTED'] }, platform: 'whatsapp' },
|
||||
select: { id: true, sessionPath: true },
|
||||
});
|
||||
for (const account of all) {
|
||||
if (!pool.get(account.id)) {
|
||||
logger.info({ accountId: account.id }, 'New account detected — starting session');
|
||||
await startAccount(account);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Error polling for new accounts');
|
||||
}
|
||||
}, 30_000);
|
||||
|
||||
const shutdown = async () => {
|
||||
logger.info('Shutting down...');
|
||||
await pool.closeAll();
|
||||
|
||||
@@ -106,4 +106,21 @@ describe('WhatsAppSessionPool', () => {
|
||||
await capturedOnQr('test-qr');
|
||||
expect(onQr).toHaveBeenCalledWith('test-qr', 'acc_1');
|
||||
});
|
||||
|
||||
it('add() injects accountId and jid into onStatus callback', async () => {
|
||||
const onStatus = jest.fn();
|
||||
const { createWhatsAppSession } = require('./session');
|
||||
|
||||
let capturedOnStatus: any;
|
||||
(createWhatsAppSession as jest.Mock).mockImplementationOnce(
|
||||
(_id: string, _path: string, _onMsg: any, _onReaction: any, _onGroups: any, _onReconnect: any, _onQr: any, statusCb: any) => {
|
||||
capturedOnStatus = statusCb;
|
||||
return Promise.resolve({ sendMessage: jest.fn(), logout: jest.fn(), end: jest.fn() });
|
||||
},
|
||||
);
|
||||
|
||||
await pool.add('acc_1', './sessions/1', jest.fn(), jest.fn(), jest.fn(), undefined, onStatus);
|
||||
await capturedOnStatus('connected', '1234@s.whatsapp.net');
|
||||
expect(onStatus).toHaveBeenCalledWith('connected', 'acc_1', '1234@s.whatsapp.net');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,7 +12,7 @@ export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: str
|
||||
// groups typed as `any` to avoid leaking GroupMetadata (Baileys type) into main.ts
|
||||
export type PoolGroupsCallback = (groups: any, accountId: string) => Promise<void> | void;
|
||||
export type PoolQrCallback = (qr: string, accountId: string) => Promise<void> | void;
|
||||
export type PoolStatusCallback = (status: string, accountId: string) => Promise<void> | void;
|
||||
export type PoolStatusCallback = (status: string, accountId: string, jid?: string) => Promise<void> | void;
|
||||
|
||||
export class WhatsAppSessionPool {
|
||||
private sessions = new Map<string, WASocket>();
|
||||
@@ -38,7 +38,7 @@ export class WhatsAppSessionPool {
|
||||
this.sessions.set(accountId, newSocket);
|
||||
},
|
||||
onQr ? (qr) => onQr(qr, accountId) : undefined,
|
||||
onStatus ? (status) => onStatus(status, accountId) : undefined,
|
||||
onStatus ? (status, jid?) => onStatus(status, accountId, jid) : undefined,
|
||||
);
|
||||
this.sessions.set(accountId, sock);
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ describe('createWhatsAppSession', () => {
|
||||
const onStatus = jest.fn();
|
||||
await createWhatsAppSession('acc_1', '/sessions/1', jest.fn(), jest.fn(), jest.fn(), undefined, undefined, onStatus);
|
||||
await connectionUpdateHandler({ connection: 'open' });
|
||||
expect(onStatus).toHaveBeenCalledWith('connected');
|
||||
expect(onStatus).toHaveBeenCalledWith('connected', undefined);
|
||||
});
|
||||
|
||||
it('calls onStatus with "disconnected" on non-logout close', async () => {
|
||||
|
||||
@@ -18,7 +18,7 @@ export type OnMessageCallback = (msg: NormalizedMessage) => Promise<void> | void
|
||||
export type OnReactionCallback = (reaction: NormalizedReaction) => Promise<void> | void;
|
||||
export type OnGroupsCallback = (groups: Record<string, GroupMetadata>) => Promise<void> | void;
|
||||
export type OnQrCallback = (qr: string) => Promise<void> | void;
|
||||
export type OnStatusCallback = (status: 'connected' | 'disconnected' | 'logged_out') => Promise<void> | void;
|
||||
export type OnStatusCallback = (status: 'connected' | 'disconnected' | 'logged_out', jid?: string) => Promise<void> | void;
|
||||
|
||||
export async function createWhatsAppSession(
|
||||
accountId: string,
|
||||
@@ -77,7 +77,7 @@ export async function createWhatsAppSession(
|
||||
}, 5000);
|
||||
}
|
||||
} else if (connection === 'open') {
|
||||
await onStatus?.('connected');
|
||||
await onStatus?.('connected', sock.user?.id ?? undefined);
|
||||
try {
|
||||
logger.info({ accountId }, 'WhatsApp connected');
|
||||
const groups = await sock.groupFetchAllParticipating();
|
||||
|
||||
Reference in New Issue
Block a user