fix: harden session error handling, normalizer null guard, extract redis-connection
- Wrap groupFetchAllParticipating in try/catch to prevent unhandled rejection on connect - Catch errors from async onMessage/onGroups callbacks via Promise.resolve().catch - Return null from normalizer when key.id is missing (prevents empty upsert key collision) - Extract parseRedisUrl to redis-connection.ts to eliminate duplication Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,10 +1,6 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
import { IngestJobData } from '@tower/types';
|
import { IngestJobData } from '@tower/types';
|
||||||
|
import { parseRedisUrl } from './redis-connection';
|
||||||
function parseRedisUrl(url: string) {
|
|
||||||
const { hostname, port } = new URL(url);
|
|
||||||
return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function processIngestJob(job: IngestJobData, prisma: any): Promise<void> {
|
export async function processIngestJob(job: IngestJobData, prisma: any): Promise<void> {
|
||||||
await prisma.message.upsert({
|
await prisma.message.upsert({
|
||||||
|
|||||||
@@ -1,10 +1,6 @@
|
|||||||
import { Queue } from 'bullmq';
|
import { Queue } from 'bullmq';
|
||||||
import { IngestJobData } from '@tower/types';
|
import { IngestJobData } from '@tower/types';
|
||||||
|
import { parseRedisUrl } from './redis-connection';
|
||||||
function parseRedisUrl(url: string) {
|
|
||||||
const { hostname, port } = new URL(url);
|
|
||||||
return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
|
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
|
||||||
return new Queue<IngestJobData>('tower:ingest', { connection: parseRedisUrl(redisUrl) });
|
return new Queue<IngestJobData>('tower:ingest', { connection: parseRedisUrl(redisUrl) });
|
||||||
|
|||||||
@@ -0,0 +1,4 @@
|
|||||||
|
export function parseRedisUrl(url: string) {
|
||||||
|
const { hostname, port } = new URL(url);
|
||||||
|
return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null };
|
||||||
|
}
|
||||||
@@ -69,4 +69,9 @@ describe('normalizeMessage', () => {
|
|||||||
const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo);
|
const result = normalizeMessage({ message: { conversation: 'hi' } } as proto.IWebMessageInfo);
|
||||||
expect(result).toBeNull();
|
expect(result).toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('returns null when message id is missing (prevents empty-key upsert collisions)', () => {
|
||||||
|
const result = normalizeMessage(makeMsg({ key: { remoteJid: '120363043312345678@g.us', fromMe: false, id: undefined, participant: '919876543210@s.whatsapp.net' } }));
|
||||||
|
expect(result).toBeNull();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -35,10 +35,13 @@ export function normalizeMessage(msg: proto.IWebMessageInfo): NormalizedMessage
|
|||||||
|
|
||||||
if (!msg.message) return null;
|
if (!msg.message) return null;
|
||||||
|
|
||||||
|
const platformMsgId = key.id;
|
||||||
|
if (!platformMsgId) return null;
|
||||||
|
|
||||||
const content = extractText(msg);
|
const content = extractText(msg);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
platformMsgId: key.id ?? '',
|
platformMsgId,
|
||||||
sourceGroupJid: remoteJid,
|
sourceGroupJid: remoteJid,
|
||||||
senderJid: key.participant ?? '',
|
senderJid: key.participant ?? '',
|
||||||
senderName: msg.pushName ?? undefined,
|
senderName: msg.pushName ?? undefined,
|
||||||
|
|||||||
@@ -41,16 +41,24 @@ export async function createWhatsAppSession(
|
|||||||
setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000);
|
setTimeout(() => createWhatsAppSession(sessionPath, onMessage, onGroups), 5000);
|
||||||
}
|
}
|
||||||
} else if (connection === 'open') {
|
} else if (connection === 'open') {
|
||||||
|
try {
|
||||||
logger.info('WhatsApp connected');
|
logger.info('WhatsApp connected');
|
||||||
const groups = await sock.groupFetchAllParticipating();
|
const groups = await sock.groupFetchAllParticipating();
|
||||||
onGroups(groups);
|
await Promise.resolve(onGroups(groups)).catch((err) =>
|
||||||
|
logger.error({ err }, 'Group sync error'),
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error({ err }, 'Failed to fetch groups on connect');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
sock.ev.on('messages.upsert', ({ messages, type }) => {
|
sock.ev.on('messages.upsert', ({ messages, type }) => {
|
||||||
if (type !== 'notify') return;
|
if (type !== 'notify') return;
|
||||||
for (const msg of messages) {
|
for (const msg of messages) {
|
||||||
onMessage(msg);
|
void Promise.resolve(onMessage(msg)).catch((err) =>
|
||||||
|
logger.error({ err }, 'Error processing message'),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user