From 5ad33fd416e9b3dbedb69083465f5dee4bf04048 Mon Sep 17 00:00:00 2001 From: maaz519 Date: Wed, 27 May 2026 17:23:08 +0530 Subject: [PATCH] feat(worker): forward queue + processor with 20/min rate limiter --- .../src/queues/forward.processor.test.ts | 52 +++++++++++++++++++ apps/worker/src/queues/forward.processor.ts | 30 +++++++++++ apps/worker/src/queues/forward.queue.ts | 9 ++++ 3 files changed, 91 insertions(+) create mode 100644 apps/worker/src/queues/forward.processor.test.ts create mode 100644 apps/worker/src/queues/forward.processor.ts create mode 100644 apps/worker/src/queues/forward.queue.ts diff --git a/apps/worker/src/queues/forward.processor.test.ts b/apps/worker/src/queues/forward.processor.test.ts new file mode 100644 index 0000000..f93c822 --- /dev/null +++ b/apps/worker/src/queues/forward.processor.test.ts @@ -0,0 +1,52 @@ +import { processForwardJob } from './forward.processor'; +import { ForwardJobData } from '@tower/types'; + +const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) }; + +const baseJob: ForwardJobData = { + messageId: 'msg_1', + content: 'Event this Saturday at the temple', + sourceGroupName: 'UP Parivar Dallas', + senderName: 'Rajesh', + toGroupJid: '120363099999@g.us', + fromAccountId: 'acc_1', +}; + +describe('processForwardJob', () => { + beforeEach(() => jest.clearAllMocks()); + + it('sends a formatted message via the pool', async () => { + await processForwardJob(baseJob, mockPool as any); + expect(mockPool.sendMessage).toHaveBeenCalledWith( + 'acc_1', + '120363099999@g.us', + expect.stringContaining('Event this Saturday at the temple'), + ); + }); + + it('includes source group name in the forwarded text', async () => { + await processForwardJob(baseJob, mockPool as any); + const [, , text] = mockPool.sendMessage.mock.calls[0]; + expect(text).toContain('UP Parivar Dallas'); + }); + + it('includes sender name in the forwarded text', async () => { + await processForwardJob(baseJob, mockPool as any); + const [, , text] = mockPool.sendMessage.mock.calls[0]; + expect(text).toContain('Rajesh'); + }); + + it('handles missing senderName without throwing', async () => { + await processForwardJob({ ...baseJob, senderName: undefined }, mockPool as any); + expect(mockPool.sendMessage).toHaveBeenCalledTimes(1); + }); + + it('throws when pool has no session for the account', async () => { + const brokenPool = { + sendMessage: jest.fn().mockRejectedValue(new Error('No active session for account acc_99')), + }; + await expect( + processForwardJob({ ...baseJob, fromAccountId: 'acc_99' }, brokenPool as any), + ).rejects.toThrow('No active session'); + }); +}); diff --git a/apps/worker/src/queues/forward.processor.ts b/apps/worker/src/queues/forward.processor.ts new file mode 100644 index 0000000..2c68ba3 --- /dev/null +++ b/apps/worker/src/queues/forward.processor.ts @@ -0,0 +1,30 @@ +import { Worker } from 'bullmq'; +import { ForwardJobData } from '@tower/types'; +import { parseRedisUrl } from './redis-connection'; +import { WhatsAppSessionPool } from '../whatsapp/session-pool'; + +function formatForwardText(job: ForwardJobData): string { + const sender = job.senderName ? `_${job.senderName}_` : '_Unknown_'; + return `📢 *${job.sourceGroupName}*\n${sender}:\n\n${job.content}\n\n_— Forwarded by TOWER_`; +} + +export async function processForwardJob( + job: ForwardJobData, + pool: WhatsAppSessionPool, +): Promise { + await pool.sendMessage(job.fromAccountId, job.toGroupJid, formatForwardText(job)); +} + +export function createForwardWorker( + redisUrl: string, + pool: WhatsAppSessionPool, +): Worker { + return new Worker( + 'tower-forward', + async (job) => processForwardJob(job.data, pool), + { + connection: parseRedisUrl(redisUrl), + limiter: { max: 20, duration: 60_000 }, // 20 forwards per minute — avoids WhatsApp bans + }, + ); +} diff --git a/apps/worker/src/queues/forward.queue.ts b/apps/worker/src/queues/forward.queue.ts new file mode 100644 index 0000000..9d6150e --- /dev/null +++ b/apps/worker/src/queues/forward.queue.ts @@ -0,0 +1,9 @@ +import { Queue } from 'bullmq'; +import { ForwardJobData } from '@tower/types'; +import { parseRedisUrl } from './redis-connection'; + +export function createForwardQueue(redisUrl: string): Queue { + return new Queue('tower-forward', { + connection: parseRedisUrl(redisUrl), + }); +}