feat(worker): forward queue + processor with 20/min rate limiter

This commit is contained in:
2026-05-27 17:23:08 +05:30
parent 57a06bc517
commit 5ad33fd416
3 changed files with 91 additions and 0 deletions
@@ -0,0 +1,52 @@
import { processForwardJob } from './forward.processor';
import { ForwardJobData } from '@tower/types';
const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) };
const baseJob: ForwardJobData = {
messageId: 'msg_1',
content: 'Event this Saturday at the temple',
sourceGroupName: 'UP Parivar Dallas',
senderName: 'Rajesh',
toGroupJid: '120363099999@g.us',
fromAccountId: 'acc_1',
};
describe('processForwardJob', () => {
beforeEach(() => jest.clearAllMocks());
it('sends a formatted message via the pool', async () => {
await processForwardJob(baseJob, mockPool as any);
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'acc_1',
'120363099999@g.us',
expect.stringContaining('Event this Saturday at the temple'),
);
});
it('includes source group name in the forwarded text', async () => {
await processForwardJob(baseJob, mockPool as any);
const [, , text] = mockPool.sendMessage.mock.calls[0];
expect(text).toContain('UP Parivar Dallas');
});
it('includes sender name in the forwarded text', async () => {
await processForwardJob(baseJob, mockPool as any);
const [, , text] = mockPool.sendMessage.mock.calls[0];
expect(text).toContain('Rajesh');
});
it('handles missing senderName without throwing', async () => {
await processForwardJob({ ...baseJob, senderName: undefined }, mockPool as any);
expect(mockPool.sendMessage).toHaveBeenCalledTimes(1);
});
it('throws when pool has no session for the account', async () => {
const brokenPool = {
sendMessage: jest.fn().mockRejectedValue(new Error('No active session for account acc_99')),
};
await expect(
processForwardJob({ ...baseJob, fromAccountId: 'acc_99' }, brokenPool as any),
).rejects.toThrow('No active session');
});
});
@@ -0,0 +1,30 @@
import { Worker } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
import { WhatsAppSessionPool } from '../whatsapp/session-pool';
function formatForwardText(job: ForwardJobData): string {
const sender = job.senderName ? `_${job.senderName}_` : '_Unknown_';
return `📢 *${job.sourceGroupName}*\n${sender}:\n\n${job.content}\n\n_— Forwarded by TOWER_`;
}
export async function processForwardJob(
job: ForwardJobData,
pool: WhatsAppSessionPool,
): Promise<void> {
await pool.sendMessage(job.fromAccountId, job.toGroupJid, formatForwardText(job));
}
export function createForwardWorker(
redisUrl: string,
pool: WhatsAppSessionPool,
): Worker<ForwardJobData> {
return new Worker<ForwardJobData>(
'tower-forward',
async (job) => processForwardJob(job.data, pool),
{
connection: parseRedisUrl(redisUrl),
limiter: { max: 20, duration: 60_000 }, // 20 forwards per minute — avoids WhatsApp bans
},
);
}
+9
View File
@@ -0,0 +1,9 @@
import { Queue } from 'bullmq';
import { ForwardJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
export function createForwardQueue(redisUrl: string): Queue<ForwardJobData> {
return new Queue<ForwardJobData>('tower-forward', {
connection: parseRedisUrl(redisUrl),
});
}