fix: address code quality issues in session pool, approval, and main

- session.ts: add onReconnect callback so reconnected socket replaces stale pool entry
- session-pool.ts: pass onReconnect to update pool on reconnect; add closeAll() to gracefully end WebSocket connections on shutdown
- approval.ts: use approved flag so double-approval race (updateMany count=0) returns null instead of sending duplicate forward jobs
- main.ts: wrap pool.add() in try/catch to continue if one account fails; replace silent groupMap fallback with explicit error log and early return; call pool.closeAll() before BullMQ shutdown to prevent hanging sockets
- Tests: add closeAll() test to session-pool.test.ts; add double-approval race test to approval.test.ts (56 tests total, all passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 17:35:52 +05:30
parent 9e3ee0cd38
commit 41aabc4c0d
6 changed files with 82 additions and 5 deletions
+23
View File
@@ -76,6 +76,29 @@ describe('handleStarReaction', () => {
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull(); expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
}); });
it('returns null on double-approval race (updateMany returns count=0)', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue({
id: 'msg_1',
status: 'PENDING',
approval: null,
content: 'hello',
senderName: 'Alice',
sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
}),
},
$transaction: jest.fn().mockImplementation(async (fn: any) => fn({
message: { updateMany: jest.fn().mockResolvedValue({ count: 0 }) },
approval: { create: jest.fn().mockResolvedValue({}) },
})),
} as any;
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result).toBeNull();
expect(prisma.$transaction).toHaveBeenCalled();
});
it('approves message and returns empty array when no sync routes', async () => { it('approves message and returns empty array when no sync routes', async () => {
const prisma = { const prisma = {
message: { message: {
+4
View File
@@ -30,12 +30,14 @@ export async function handleStarReaction(
if (message.status !== 'PENDING') return null; if (message.status !== 'PENDING') return null;
if (message.approval) return null; if (message.approval) return null;
let approved = false;
await prisma.$transaction(async (tx: any) => { await prisma.$transaction(async (tx: any) => {
const updated = await tx.message.updateMany({ const updated = await tx.message.updateMany({
where: { id: message.id, status: 'PENDING' }, where: { id: message.id, status: 'PENDING' },
data: { status: 'APPROVED' }, data: { status: 'APPROVED' },
}); });
if (updated.count === 0) return; // another admin approved first — idempotent if (updated.count === 0) return; // another admin approved first — idempotent
approved = true;
await tx.approval.create({ await tx.approval.create({
data: { data: {
messageId: message.id, messageId: message.id,
@@ -45,6 +47,8 @@ export async function handleStarReaction(
}); });
}); });
if (!approved) return null;
const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom const jobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom
.filter((route: any) => route.targetGroup != null) .filter((route: any) => route.targetGroup != null)
.map((route: any) => ({ .map((route: any) => ({
+10 -1
View File
@@ -48,6 +48,7 @@ async function bootstrap() {
for (const account of accounts) { for (const account of accounts) {
groupMaps.set(account.id, new Map()); groupMaps.set(account.id, new Map());
try {
await pool.add( await pool.add(
account.id, account.id,
account.sessionPath, account.sessionPath,
@@ -55,7 +56,11 @@ async function bootstrap() {
const tags = detectTags(msg.content, msg.senderJid, adminJids); const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return; if (!isFlagged(tags)) return;
const groupMap = groupMaps.get(accountId) ?? new Map(); const groupMap = groupMaps.get(accountId);
if (!groupMap) {
logger.error({ accountId }, 'No group map for account — message dropped');
return;
}
const sourceGroupId = groupMap.get(msg.sourceGroupJid); const sourceGroupId = groupMap.get(msg.sourceGroupJid);
if (!sourceGroupId) { if (!sourceGroupId) {
logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message'); logger.warn({ jid: msg.sourceGroupJid, accountId }, 'Unknown group — skipping message');
@@ -101,12 +106,16 @@ async function bootstrap() {
groupMaps.set(accountId, map); groupMaps.set(accountId, map);
}, },
); );
} catch (err) {
logger.error({ accountId: account.id, err }, 'Failed to start session — skipping account');
}
} }
logger.info({ accountCount: accounts.length }, 'Tower worker ready'); logger.info({ accountCount: accounts.length }, 'Tower worker ready');
const shutdown = async () => { const shutdown = async () => {
logger.info('Shutting down...'); logger.info('Shutting down...');
await pool.closeAll();
await ingestWorker.close(); await ingestWorker.close();
await forwardWorker.close(); await forwardWorker.close();
await ingestQueue.close(); await ingestQueue.close();
@@ -5,6 +5,7 @@ jest.mock('./session', () => ({
createWhatsAppSession: jest.fn().mockResolvedValue({ createWhatsAppSession: jest.fn().mockResolvedValue({
sendMessage: jest.fn().mockResolvedValue({}), sendMessage: jest.fn().mockResolvedValue({}),
logout: jest.fn().mockResolvedValue({}), logout: jest.fn().mockResolvedValue({}),
end: jest.fn(),
}), }),
})); }));
@@ -55,6 +56,21 @@ describe('WhatsAppSessionPool', () => {
await expect(pool.remove('acc_unknown')).resolves.not.toThrow(); await expect(pool.remove('acc_unknown')).resolves.not.toThrow();
}); });
it('closeAll() calls end() on all sessions and clears the pool', async () => {
await pool.add('acc_1', './sessions/1', jest.fn(), jest.fn(), jest.fn());
await pool.add('acc_2', './sessions/2', jest.fn(), jest.fn(), jest.fn());
expect(pool.getAll().size).toBe(2);
const sock1 = pool.get('acc_1')!;
const sock2 = pool.get('acc_2')!;
await pool.closeAll();
expect(sock1.end).toHaveBeenCalledWith(undefined);
expect(sock2.end).toHaveBeenCalledWith(undefined);
expect(pool.getAll().size).toBe(0);
});
it('add() injects accountId into onMessage callback', async () => { it('add() injects accountId into onMessage callback', async () => {
const onMessage = jest.fn(); const onMessage = jest.fn();
const { createWhatsAppSession } = require('./session'); const { createWhatsAppSession } = require('./session');
+17
View File
@@ -28,6 +28,10 @@ export class WhatsAppSessionPool {
(msg) => onMessage(msg, accountId), (msg) => onMessage(msg, accountId),
(reaction) => onReaction(reaction, accountId), (reaction) => onReaction(reaction, accountId),
(groups) => onGroups(groups, accountId), (groups) => onGroups(groups, accountId),
(newSocket) => {
logger.info({ accountId }, 'Session reconnected — updating pool');
this.sessions.set(accountId, newSocket);
},
); );
this.sessions.set(accountId, sock); this.sessions.set(accountId, sock);
} }
@@ -57,4 +61,17 @@ export class WhatsAppSessionPool {
logger.info({ accountId }, 'Session removed'); logger.info({ accountId }, 'Session removed');
} }
} }
async closeAll(): Promise<void> {
logger.info({ count: this.sessions.size }, 'Closing all WhatsApp sessions');
for (const [accountId, sock] of this.sessions) {
try {
sock.end(undefined);
logger.info({ accountId }, 'Session closed');
} catch (err) {
logger.error({ accountId, err }, 'Error closing session');
}
}
this.sessions.clear();
}
} }
+11 -3
View File
@@ -23,6 +23,7 @@ export async function createWhatsAppSession(
onMessage: OnMessageCallback, onMessage: OnMessageCallback,
onReaction: OnReactionCallback, onReaction: OnReactionCallback,
onGroups: OnGroupsCallback, onGroups: OnGroupsCallback,
onReconnect?: (newSocket: WASocket) => void,
): Promise<WASocket> { ): Promise<WASocket> {
const { state, saveCreds } = await useMultiFileAuthState(sessionPath); const { state, saveCreds } = await useMultiFileAuthState(sessionPath);
const { version } = await fetchLatestBaileysVersion(); const { version } = await fetchLatestBaileysVersion();
@@ -46,10 +47,17 @@ export async function createWhatsAppSession(
logger.info({ reason, shouldReconnect }, 'Connection closed'); logger.info({ reason, shouldReconnect }, 'Connection closed');
if (shouldReconnect) { if (shouldReconnect) {
logger.info('Reconnecting in 5s...'); logger.info('Reconnecting in 5s...');
setTimeout( setTimeout(async () => {
() => createWhatsAppSession(accountId, sessionPath, onMessage, onReaction, onGroups), const newSocket = await createWhatsAppSession(
5000, accountId,
sessionPath,
onMessage,
onReaction,
onGroups,
onReconnect,
); );
onReconnect?.(newSocket);
}, 5000);
} }
} else if (connection === 'open') { } else if (connection === 'open') {
try { try {