diff --git a/apps/worker/src/queues/ingest.processor.ts b/apps/worker/src/queues/ingest.processor.ts index 34eee0d..a7f5ce8 100644 --- a/apps/worker/src/queues/ingest.processor.ts +++ b/apps/worker/src/queues/ingest.processor.ts @@ -1,6 +1,11 @@ import { Worker } from 'bullmq'; import { IngestJobData } from '@tower/types'; +function parseRedisUrl(url: string) { + const { hostname, port } = new URL(url); + return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; +} + export async function processIngestJob(job: IngestJobData, prisma: any): Promise { await prisma.message.upsert({ where: { @@ -24,10 +29,9 @@ export async function processIngestJob(job: IngestJobData, prisma: any): Promise } export function createIngestWorker(redisUrl: string, prisma: any): Worker { - const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; return new Worker( 'tower:ingest', async (job) => processIngestJob(job.data, prisma), - { connection }, + { connection: parseRedisUrl(redisUrl) }, ); } diff --git a/apps/worker/src/queues/ingest.queue.ts b/apps/worker/src/queues/ingest.queue.ts index 4b4bdc8..813167a 100644 --- a/apps/worker/src/queues/ingest.queue.ts +++ b/apps/worker/src/queues/ingest.queue.ts @@ -1,7 +1,11 @@ import { Queue } from 'bullmq'; import { IngestJobData } from '@tower/types'; -export function createIngestQueue(redisUrl: string): Queue { - const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; - return new Queue('tower:ingest', { connection }); +function parseRedisUrl(url: string) { + const { hostname, port } = new URL(url); + return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null }; +} + +export function createIngestQueue(redisUrl: string): Queue { + return new Queue('tower:ingest', { connection: parseRedisUrl(redisUrl) }); }