feat: add BullMQ ingest queue and processor

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 15:15:08 +05:30
parent 7151e0dd6d
commit a4135fe983
3 changed files with 93 additions and 0 deletions
@@ -0,0 +1,53 @@
import { processIngestJob } from './ingest.processor';
import { IngestJobData } from '@tower/types';
const mockPrisma = {
message: {
upsert: jest.fn(),
},
};
const sampleJob: IngestJobData = {
platformMsgId: 'WA_MSG_001',
platform: 'whatsapp',
sourceGroupId: 'clxxxxxx',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: '#important update from the committee',
tags: ['#important'],
};
describe('processIngestJob', () => {
beforeEach(() => jest.clearAllMocks());
it('upserts a message with PENDING status', async () => {
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
await processIngestJob(sampleJob, mockPrisma as any);
expect(mockPrisma.message.upsert).toHaveBeenCalledWith({
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } },
create: {
platform: 'whatsapp',
platformMsgId: 'WA_MSG_001',
sourceGroupId: 'clxxxxxx',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: '#important update from the committee',
tags: ['#important'],
status: 'PENDING',
},
update: {},
});
});
it('does not throw when message already exists (idempotent upsert)', async () => {
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
await expect(processIngestJob(sampleJob, mockPrisma as any)).resolves.not.toThrow();
});
it('propagates DB errors', async () => {
mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost'));
await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost');
});
});
@@ -0,0 +1,33 @@
import { Worker } from 'bullmq';
import { IngestJobData } from '@tower/types';
export async function processIngestJob(job: IngestJobData, prisma: any): Promise<void> {
await prisma.message.upsert({
where: {
platform_platformMsgId: {
platform: job.platform,
platformMsgId: job.platformMsgId,
},
},
create: {
platform: job.platform,
platformMsgId: job.platformMsgId,
sourceGroupId: job.sourceGroupId,
senderJid: job.senderJid,
senderName: job.senderName,
content: job.content,
tags: job.tags,
status: 'PENDING',
},
update: {},
});
}
export function createIngestWorker(redisUrl: string, prisma: any): Worker<IngestJobData> {
const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any;
return new Worker<IngestJobData>(
'tower:ingest',
async (job) => processIngestJob(job.data, prisma),
{ connection },
);
}
+7
View File
@@ -0,0 +1,7 @@
import { Queue } from 'bullmq';
import { IngestJobData } from '@tower/types';
export function createIngestQueue(redisUrl: string): Queue<IngestJobData> {
const connection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null } as any;
return new Queue<IngestJobData>('tower:ingest', { connection });
}