diff --git a/apps/worker/src/queues/ingest.processor.ts b/apps/worker/src/queues/ingest.processor.ts index a7f5ce8..9904607 100644 --- a/apps/worker/src/queues/ingest.processor.ts +++ b/apps/worker/src/queues/ingest.processor.ts @@ -1,10 +1,6 @@ import { Worker } from 'bullmq'; import { IngestJobData } from '@tower/types'; - -function parseRedisUrl(url: string) { - const { hostname, port } = new URL(url); - return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; -} +import { parseRedisUrl } from './redis-connection'; export async function processIngestJob(job: IngestJobData, prisma: any): Promise { await prisma.message.upsert({ diff --git a/apps/worker/src/queues/ingest.queue.ts b/apps/worker/src/queues/ingest.queue.ts index 813167a..e6d7a13 100644 --- a/apps/worker/src/queues/ingest.queue.ts +++ b/apps/worker/src/queues/ingest.queue.ts @@ -1,10 +1,6 @@ import { Queue } from 'bullmq'; import { IngestJobData } from '@tower/types'; - -function parseRedisUrl(url: string) { - const { hostname, port } = new URL(url); - return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; -} +import { parseRedisUrl } from './redis-connection'; export function createIngestQueue(redisUrl: string): Queue { return new Queue('tower:ingest', { connection: parseRedisUrl(redisUrl) }); diff --git a/apps/worker/src/queues/redis-connection.ts b/apps/worker/src/queues/redis-connection.ts new file mode 100644 index 0000000..05d70b4 --- /dev/null +++ b/apps/worker/src/queues/redis-connection.ts @@ -0,0 +1,4 @@ +export function parseRedisUrl(url: string) { + const { hostname, port } = new URL(url); + return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; +} diff --git a/apps/worker/src/whatsapp/normalizer.test.ts b/apps/worker/src/whatsapp/normalizer.test.ts index 274e97c..9ab8393 100644 --- a/apps/worker/src/whatsapp/normalizer.test.ts +++ b/apps/worker/src/whatsapp/normalizer.test.ts @@ -69,4 +69,9 @@ describe('normalizeMessage', () => { const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo); expect(result).toBeNull(); }); + + it('returns null when message id is missing (prevents empty-key upsert collisions)', () => { + const result = normalizeMessage(makeMsg({ key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: undefined, participant: '919876543210@s.whatsapp.net' } })); + expect(result).toBeNull(); + }); }); diff --git a/apps/worker/src/whatsapp/normalizer.ts b/apps/worker/src/whatsapp/normalizer.ts index 585f154..4c6f3a9 100644 --- a/apps/worker/src/whatsapp/normalizer.ts +++ b/apps/worker/src/whatsapp/normalizer.ts @@ -35,10 +35,13 @@ export function normalizeMessage(msg: proto.IWebMessageInfo): NormalizedMessage if (!msg.message) return null; + const platformMsgId = key.id; + if (!platformMsgId) return null; + const content = extractText(msg); return { - platformMsgId: key.id ?? '', + platformMsgId, sourceGroupJid: remoteJid, senderJid: key.participant ?? '', senderName: msg.pushName ?? undefined, diff --git a/apps/worker/src/whatsapp/session.ts b/apps/worker/src/whatsapp/session.ts index dccac70..0ce4535 100644 --- a/apps/worker/src/whatsapp/session.ts +++ b/apps/worker/src/whatsapp/session.ts @@ -41,16 +41,24 @@ export async function createWhatsAppSession( setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000); } } else if (connection === 'open') { - logger.info('WhatsApp connected'); - const groups = await sock.groupFetchAllParticipating(); - onGroups(groups); + try { + logger.info('WhatsApp connected'); + const groups = await sock.groupFetchAllParticipating(); + await Promise.resolve(onGroups(groups)).catch((err) => + logger.error({ err }, 'Group sync error'), + ); + } catch (err) { + logger.error({ err }, 'Failed to fetch groups on connect'); + } } }); sock.ev.on('messages.upsert', ({ messages, type }) => { if (type !== 'notify') return; for (const msg of messages) { - onMessage(msg); + void Promise.resolve(onMessage(msg)).catch((err) => + logger.error({ err }, 'Error processing message'), + ); } });