diff --git a/apps/worker/src/queues/ingest.processor.test.ts b/apps/worker/src/queues/ingest.processor.test.ts index 594db90..33dd3e4 100644 --- a/apps/worker/src/queues/ingest.processor.test.ts +++ b/apps/worker/src/queues/ingest.processor.test.ts @@ -10,6 +10,7 @@ const mockPrisma = { const sampleJob: IngestJobData = { platformMsgId: 'WA_MSG_001', platform: 'whatsapp', + accountId: 'account-1', sourceGroupId: 'clxxxxxx', senderJid: '919876543210@s.whatsapp.net', senderName: 'Alice', diff --git a/apps/worker/src/whatsapp/group-sync.test.ts b/apps/worker/src/whatsapp/group-sync.test.ts index c5a4d18..7488503 100644 --- a/apps/worker/src/whatsapp/group-sync.test.ts +++ b/apps/worker/src/whatsapp/group-sync.test.ts @@ -48,7 +48,7 @@ describe('syncGroups', () => { .mockResolvedValueOnce({ id: 'db-group-1' }) .mockResolvedValueOnce({ id: 'db-group-2' }); - const result = await syncGroups(mockGroups, mockPrisma as any); + const result = await syncGroups(mockGroups, 'account-1', mockPrisma as any); expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2); expect(result.get('120363043312345678@g.us')).toBe('db-group-1'); @@ -60,6 +60,7 @@ describe('syncGroups', () => { await syncGroups( { '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] }, + 'account-1', mockPrisma as any, ); @@ -71,8 +72,13 @@ describe('syncGroups', () => { name: 'UP Parivar Dallas', description: 'Main community group', isActive: true, + accountId: 'account-1', + }, + update: { + name: 'UP Parivar Dallas', + description: 'Main community group', + accountId: 'account-1', }, - update: { name: 'UP Parivar Dallas', description: 'Main community group' }, }); }); @@ -81,18 +87,19 @@ describe('syncGroups', () => { await syncGroups( { '999999999@g.us': mockGroups['999999999@g.us'] }, + 'account-1', mockPrisma as any, ); expect(mockPrisma.group.upsert).toHaveBeenCalledWith( expect.objectContaining({ - create: expect.objectContaining({ description: undefined }), + create: expect.objectContaining({ description: undefined, accountId: 'account-1' }), }), ); }); it('returns an empty map when given empty groups', async () => { - const result = await syncGroups({}, mockPrisma as any); + const result = await syncGroups({}, 'account-1', mockPrisma as any); expect(result.size).toBe(0); expect(mockPrisma.group.upsert).not.toHaveBeenCalled(); }); diff --git a/apps/worker/src/whatsapp/group-sync.ts b/apps/worker/src/whatsapp/group-sync.ts index 9198f63..11dfc5b 100644 --- a/apps/worker/src/whatsapp/group-sync.ts +++ b/apps/worker/src/whatsapp/group-sync.ts @@ -1,7 +1,11 @@ import { GroupMetadata } from '@whiskeysockets/baileys'; +import { createLogger } from '@tower/logger'; + +const logger = createLogger('group-sync'); export async function syncGroups( groups: Record, + accountId: string, prisma: any, ): Promise> { const jidToDbId = new Map(); @@ -15,11 +19,17 @@ export async function syncGroups( name: meta.subject, description: meta.desc ?? undefined, isActive: true, + accountId, + }, + update: { + name: meta.subject, + description: meta.desc ?? undefined, + accountId, }, - update: { name: meta.subject, description: meta.desc ?? undefined }, }); jidToDbId.set(jid, group.id); } + logger.info({ count: jidToDbId.size, accountId }, 'Groups synced'); return jidToDbId; } diff --git a/apps/worker/src/whatsapp/session-pool.ts b/apps/worker/src/whatsapp/session-pool.ts new file mode 100644 index 0000000..816991b --- /dev/null +++ b/apps/worker/src/whatsapp/session-pool.ts @@ -0,0 +1,57 @@ +import { WASocket } from '@whiskeysockets/baileys'; +import { NormalizedMessage, NormalizedReaction } from '@tower/types'; +import { createWhatsAppSession } from './session'; +import { createLogger } from '@tower/logger'; + +const logger = createLogger('session-pool'); + +// Callbacks the pool exposes — accountId injected by pool, not caller +export type PoolMessageCallback = (msg: NormalizedMessage, accountId: string) => Promise | void; +export type PoolReactionCallback = (reaction: NormalizedReaction, accountId: string) => Promise | void; +// groups typed as `any` to avoid leaking GroupMetadata (Baileys type) into main.ts +export type PoolGroupsCallback = (groups: any, accountId: string) => Promise | void; + +export class WhatsAppSessionPool { + private sessions = new Map(); + + async add( + accountId: string, + sessionPath: string, + onMessage: PoolMessageCallback, + onReaction: PoolReactionCallback, + onGroups: PoolGroupsCallback, + ): Promise { + logger.info({ accountId }, 'Starting session'); + const sock = await createWhatsAppSession( + accountId, + sessionPath, + (msg) => onMessage(msg, accountId), + (reaction) => onReaction(reaction, accountId), + (groups) => onGroups(groups, accountId), + ); + this.sessions.set(accountId, sock); + } + + get(accountId: string): WASocket | undefined { + return this.sessions.get(accountId); + } + + getAll(): Map { + return this.sessions; + } + + async sendMessage(accountId: string, groupJid: string, text: string): Promise { + const sock = this.sessions.get(accountId); + if (!sock) throw new Error(`No active session for account ${accountId}`); + await sock.sendMessage(groupJid, { text }); + } + + async remove(accountId: string): Promise { + const sock = this.sessions.get(accountId); + if (sock) { + await sock.logout().catch(() => {}); + this.sessions.delete(accountId); + logger.info({ accountId }, 'Session removed'); + } + } +}