feat(worker): WhatsAppSessionPool + group-sync accepts accountId
This commit is contained in:
@@ -10,6 +10,7 @@ const mockPrisma = {
|
||||
const sampleJob: IngestJobData = {
|
||||
platformMsgId: 'WA_MSG_001',
|
||||
platform: 'whatsapp',
|
||||
accountId: 'account-1',
|
||||
sourceGroupId: 'clxxxxxx',
|
||||
senderJid: '919876543210@s.whatsapp.net',
|
||||
senderName: 'Alice',
|
||||
|
||||
@@ -48,7 +48,7 @@ describe('syncGroups', () => {
|
||||
.mockResolvedValueOnce({ id: 'db-group-1' })
|
||||
.mockResolvedValueOnce({ id: 'db-group-2' });
|
||||
|
||||
const result = await syncGroups(mockGroups, mockPrisma as any);
|
||||
const result = await syncGroups(mockGroups, 'account-1', mockPrisma as any);
|
||||
|
||||
expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2);
|
||||
expect(result.get('120363043312345678@g.us')).toBe('db-group-1');
|
||||
@@ -60,6 +60,7 @@ describe('syncGroups', () => {
|
||||
|
||||
await syncGroups(
|
||||
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
|
||||
'account-1',
|
||||
mockPrisma as any,
|
||||
);
|
||||
|
||||
@@ -71,8 +72,13 @@ describe('syncGroups', () => {
|
||||
name: 'UP Parivar Dallas',
|
||||
description: 'Main community group',
|
||||
isActive: true,
|
||||
accountId: 'account-1',
|
||||
},
|
||||
update: {
|
||||
name: 'UP Parivar Dallas',
|
||||
description: 'Main community group',
|
||||
accountId: 'account-1',
|
||||
},
|
||||
update: { name: 'UP Parivar Dallas', description: 'Main community group' },
|
||||
});
|
||||
});
|
||||
|
||||
@@ -81,18 +87,19 @@ describe('syncGroups', () => {
|
||||
|
||||
await syncGroups(
|
||||
{ '999999999@g.us': mockGroups['999999999@g.us'] },
|
||||
'account-1',
|
||||
mockPrisma as any,
|
||||
);
|
||||
|
||||
expect(mockPrisma.group.upsert).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
create: expect.objectContaining({ description: undefined }),
|
||||
create: expect.objectContaining({ description: undefined, accountId: 'account-1' }),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('returns an empty map when given empty groups', async () => {
|
||||
const result = await syncGroups({}, mockPrisma as any);
|
||||
const result = await syncGroups({}, 'account-1', mockPrisma as any);
|
||||
expect(result.size).toBe(0);
|
||||
expect(mockPrisma.group.upsert).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import { GroupMetadata } from '@whiskeysockets/baileys';
|
||||
import { createLogger } from '@tower/logger';
|
||||
|
||||
const logger = createLogger('group-sync');
|
||||
|
||||
export async function syncGroups(
|
||||
groups: Record<string, GroupMetadata>,
|
||||
accountId: string,
|
||||
prisma: any,
|
||||
): Promise<Map<string, string>> {
|
||||
const jidToDbId = new Map<string, string>();
|
||||
@@ -15,11 +19,17 @@ export async function syncGroups(
|
||||
name: meta.subject,
|
||||
description: meta.desc ?? undefined,
|
||||
isActive: true,
|
||||
accountId,
|
||||
},
|
||||
update: {
|
||||
name: meta.subject,
|
||||
description: meta.desc ?? undefined,
|
||||
accountId,
|
||||
},
|
||||
update: { name: meta.subject, description: meta.desc ?? undefined },
|
||||
});
|
||||
jidToDbId.set(jid, group.id);
|
||||
}
|
||||
|
||||
logger.info({ count: jidToDbId.size, accountId }, 'Groups synced');
|
||||
return jidToDbId;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
import { WASocket } from '@whiskeysockets/baileys';
|
||||
import { NormalizedMessage, NormalizedReaction } from '@tower/types';
|
||||
import { createWhatsAppSession } from './session';
|
||||
import { createLogger } from '@tower/logger';
|
||||
|
||||
const logger = createLogger('session-pool');
|
||||
|
||||
// Callbacks the pool exposes — accountId injected by pool, not caller
|
||||
export type PoolMessageCallback = (msg: NormalizedMessage, accountId: string) => Promise<void> | void;
|
||||
export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: string) => Promise<void> | void;
|
||||
// groups typed as `any` to avoid leaking GroupMetadata (Baileys type) into main.ts
|
||||
export type PoolGroupsCallback = (groups: any, accountId: string) => Promise<void> | void;
|
||||
|
||||
export class WhatsAppSessionPool {
|
||||
private sessions = new Map<string, WASocket>();
|
||||
|
||||
async add(
|
||||
accountId: string,
|
||||
sessionPath: string,
|
||||
onMessage: PoolMessageCallback,
|
||||
onReaction: PoolReactionCallback,
|
||||
onGroups: PoolGroupsCallback,
|
||||
): Promise<void> {
|
||||
logger.info({ accountId }, 'Starting session');
|
||||
const sock = await createWhatsAppSession(
|
||||
accountId,
|
||||
sessionPath,
|
||||
(msg) => onMessage(msg, accountId),
|
||||
(reaction) => onReaction(reaction, accountId),
|
||||
(groups) => onGroups(groups, accountId),
|
||||
);
|
||||
this.sessions.set(accountId, sock);
|
||||
}
|
||||
|
||||
get(accountId: string): WASocket | undefined {
|
||||
return this.sessions.get(accountId);
|
||||
}
|
||||
|
||||
getAll(): Map<string, WASocket> {
|
||||
return this.sessions;
|
||||
}
|
||||
|
||||
async sendMessage(accountId: string, groupJid: string, text: string): Promise<void> {
|
||||
const sock = this.sessions.get(accountId);
|
||||
if (!sock) throw new Error(`No active session for account ${accountId}`);
|
||||
await sock.sendMessage(groupJid, { text });
|
||||
}
|
||||
|
||||
async remove(accountId: string): Promise<void> {
|
||||
const sock = this.sessions.get(accountId);
|
||||
if (sock) {
|
||||
await sock.logout().catch(() => {});
|
||||
this.sessions.delete(accountId);
|
||||
logger.info({ accountId }, 'Session removed');
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user