From a4135fe9834abc23153643eab6caccf487737ca7 Mon Sep 17 00:00:00 2001 From: maaz519 Date: Wed, 27 May 2026 15:15:08 +0530 Subject: [PATCH] feat: add BullMQ ingest queue and processor Co-Authored-By: Claude Sonnet 4.6 --- .../src/queues/ingest.processor.test.ts | 53 +++++++++++++++++++ apps/worker/src/queues/ingest.processor.ts | 33 ++++++++++++ apps/worker/src/queues/ingest.queue.ts | 7 +++ 3 files changed, 93 insertions(+) create mode 100644 apps/worker/src/queues/ingest.processor.test.ts create mode 100644 apps/worker/src/queues/ingest.processor.ts create mode 100644 apps/worker/src/queues/ingest.queue.ts diff --git a/apps/worker/src/queues/ingest.processor.test.ts b/apps/worker/src/queues/ingest.processor.test.ts new file mode 100644 index 0000000..594db90 --- /dev/null +++ b/apps/worker/src/queues/ingest.processor.test.ts @@ -0,0 +1,53 @@ +import { processIngestJob } from './ingest.processor'; +import { IngestJobData } from '@tower/types'; + +const mockPrisma = { + message: { + upsert: jest.fn(), + }, +}; + +const sampleJob: IngestJobData = { + platformMsgId: 'WA_MSG_001', + platform: 'whatsapp', + sourceGroupId: 'clxxxxxx', + senderJid: '919876543210@s.whatsapp.net', + senderName: 'Alice', + content: '#important update from the committee', + tags: ['#important'], +}; + +describe('processIngestJob', () => { + beforeEach(() => jest.clearAllMocks()); + + it('upserts a message with PENDING status', async () => { + mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' }); + + await processIngestJob(sampleJob, mockPrisma as any); + + expect(mockPrisma.message.upsert).toHaveBeenCalledWith({ + where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } }, + create: { + platform: 'whatsapp', + platformMsgId: 'WA_MSG_001', + sourceGroupId: 'clxxxxxx', + senderJid: '919876543210@s.whatsapp.net', + senderName: 'Alice', + content: '#important update from the committee', + tags: ['#important'], + status: 'PENDING', + }, + update: {}, + }); + }); + + it('does not throw when message already exists (idempotent upsert)', async () => { + mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' }); + await expect(processIngestJob(sampleJob, mockPrisma as any)).resolves.not.toThrow(); + }); + + it('propagates DB errors', async () => { + mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost')); + await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost'); + }); +}); diff --git a/apps/worker/src/queues/ingest.processor.ts b/apps/worker/src/queues/ingest.processor.ts new file mode 100644 index 0000000..34eee0d --- /dev/null +++ b/apps/worker/src/queues/ingest.processor.ts @@ -0,0 +1,33 @@ +import { Worker } from 'bullmq'; +import { IngestJobData } from '@tower/types'; + +export async function processIngestJob(job: IngestJobData, prisma: any): Promise { + await prisma.message.upsert({ + where: { + platform_platformMsgId: { + platform: job.platform, + platformMsgId: job.platformMsgId, + }, + }, + create: { + platform: job.platform, + platformMsgId: job.platformMsgId, + sourceGroupId: job.sourceGroupId, + senderJid: job.senderJid, + senderName: job.senderName, + content: job.content, + tags: job.tags, + status: 'PENDING', + }, + update: {}, + }); +} + +export function createIngestWorker(redisUrl: string, prisma: any): Worker { + const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; + return new Worker( + 'tower:ingest', + async (job) => processIngestJob(job.data, prisma), + { connection }, + ); +} diff --git a/apps/worker/src/queues/ingest.queue.ts b/apps/worker/src/queues/ingest.queue.ts new file mode 100644 index 0000000..4b4bdc8 --- /dev/null +++ b/apps/worker/src/queues/ingest.queue.ts @@ -0,0 +1,7 @@ +import { Queue } from 'bullmq'; +import { IngestJobData } from '@tower/types'; + +export function createIngestQueue(redisUrl: string): Queue { + const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; + return new Queue('tower:ingest', { connection }); +}