feat: add BullMQ ingest queue and processor

Idempotent message persistence via prisma.message.upsert. Uses URL parsing
to pass connection options to BullMQ, avoiding ioredis version conflicts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 15:18:04 +05:30
parent a4135fe983
commit 81da0d483e
2 changed files with 13 additions and 5 deletions
+6 -2
View File
@@ -1,6 +1,11 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import { IngestJobData } from '@tower/types'; import { IngestJobData } from '@tower/types';
function parseRedisUrl(url: string) {
const { hostname, port } = new URL(url);
return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null };
}
export async function processIngestJob(job: IngestJobData, prisma: any): Promise<void> { export async function processIngestJob(job: IngestJobData, prisma: any): Promise<void> {
await prisma.message.upsert({ await prisma.message.upsert({
where: { where: {
@@ -24,10 +29,9 @@ export async function processIngestJob(job: IngestJobData, prisma: any): Promise
} }
export function createIngestWorker(redisUrl: string, prisma: any): Worker<IngestJobData> { export function createIngestWorker(redisUrl: string, prisma: any): Worker<IngestJobData> {
const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any;
return new Worker<IngestJobData>( return new Worker<IngestJobData>(
'tower:ingest', 'tower:ingest',
async (job) => processIngestJob(job.data, prisma), async (job) => processIngestJob(job.data, prisma),
{ connection }, { connection: parseRedisUrl(redisUrl) },
); );
} }
+7 -3
View File
@@ -1,7 +1,11 @@
import { Queue } from 'bullmq'; import { Queue } from 'bullmq';
import { IngestJobData } from '@tower/types'; import { IngestJobData } from '@tower/types';
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> { function parseRedisUrl(url: string) {
const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any; const { hostname, port } = new URL(url);
return new Queue<IngestJobData>('tower:ingest', { connection }); return { host: hostname, port: parseInt(port || '6379', 10), maxRetriesPerRequest: null };
}
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
return new Queue<IngestJobData>('tower:ingest', { connection: parseRedisUrl(redisUrl) });
} }