From 81da0d483edd1ea3ae15b4b9d5c64b96b08976e7 Mon Sep 17 00:00:00 2001 From: maaz519 Date: Wed, 27 May 2026 15:18:04 +0530 Subject: [PATCH] feat: add BullMQ ingest queue and processor Idempotent message persistence via prisma.message.upsert. Uses URL parsing to pass connection options to BullMQ, avoiding ioredis version conflicts. Co-Authored-By: Claude Sonnet 4.6 --- apps/worker/src/queues/ingest.processor.ts | 8 ++++++-- apps/worker/src/queues/ingest.queue.ts | 10 +++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/apps/worker/src/queues/ingest.processor.ts b/apps/worker/src/queues/ingest.processor.ts index 34eee0d..a7f5ce8 100644 --- a/apps/worker/src/queues/ingest.processor.ts +++ b/apps/worker/src/queues/ingest.processor.ts @@ -1,6 +1,11 @@ import { Worker } from 'bullmq'; import { IngestJobData } from '@tower/types'; +function parseRedisUrl(url: string) { + const { hostname, port } = new URL(url); + return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; +} + export async function processIngestJob(job: IngestJobData, prisma: any): Promise { await prisma.message.upsert({ where: { @@ -24,10 +29,9 @@ export async function processIngestJob(job: IngestJobData, prisma: any): Promise } export function createIngestWorker(redisUrl: string, prisma: any): Worker { - const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; return new Worker( 'tower:ingest', async (job) => processIngestJob(job.data, prisma), - { connection }, + { connection: parseRedisUrl(redisUrl) }, ); } diff --git a/apps/worker/src/queues/ingest.queue.ts b/apps/worker/src/queues/ingest.queue.ts index 4b4bdc8..813167a 100644 --- a/apps/worker/src/queues/ingest.queue.ts +++ b/apps/worker/src/queues/ingest.queue.ts @@ -1,7 +1,11 @@ import { Queue } from 'bullmq'; import { IngestJobData } from '@tower/types'; -export function createIngestQueue(redisUrl: string): Queue { - const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; - return new Queue('tower:ingest', { connection }); +function parseRedisUrl(url: string) { + const { hostname, port } = new URL(url); + return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; +} + +export function createIngestQueue(redisUrl: string): Queue { + return new Queue('tower:ingest', { connection: parseRedisUrl(redisUrl) }); }