good forst commit

This commit is contained in:
2026-06-09 02:02:40 +05:30
parent 801c1d7121
commit 249d759e6a
215 changed files with 15425 additions and 1240 deletions
+59 -99
View File
@@ -1,6 +1,9 @@
import { handleStarReaction } from './approval';
import { handleReaction } from './approval';
import { approveMessage } from './approve-message';
import { NormalizedReaction } from '@tower/types';
jest.mock('./approve-message');
function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedReaction {
return {
reactorJid: '919876543210@s.whatsapp.net',
@@ -12,131 +15,88 @@ function makeReaction(overrides: Partial<NormalizedReaction> = {}): NormalizedRe
};
}
const adminJids = ['919876543210@s.whatsapp.net'];
function makeMessage(overrides: object = {}) {
return {
id: 'msg_1',
tenantId: 'tnt_1',
status: 'PENDING',
approval: null,
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp_1',
tags: ['#important'],
platform: 'whatsapp',
sourceGroup: { name: 'UP Parivar Dallas', syncRoutesFrom: [] },
sourceGroup: { platformId: '120363043312345678@g.us' },
...overrides,
};
}
function makePrisma(messageOverrides: object = {}, txCount = 1) {
function makePool(isAdmin: boolean) {
return {
message: { findUnique: jest.fn().mockResolvedValue(makeMessage(messageOverrides)) },
$transaction: jest.fn().mockImplementation(async (fn: any) =>
fn({
message: { updateMany: jest.fn().mockResolvedValue({ count: txCount }) },
approval: { create: jest.fn().mockResolvedValue({}) },
get: jest.fn().mockReturnValue({
groupMetadata: jest.fn().mockResolvedValue({
participants: [
{ id: '919876543210@s.whatsapp.net', admin: isAdmin ? 'admin' : undefined },
{ id: 'other@s.whatsapp.net', admin: 'superadmin' },
],
}),
),
} as any;
}),
};
}
describe('handleStarReaction', () => {
it('returns null for non-star emoji', async () => {
expect(await handleStarReaction(makeReaction({ emoji: '👍' }), adminJids, {} as any)).toBeNull();
});
const mockPrisma = {
message: { findUnique: jest.fn() },
tenantRule: { findMany: jest.fn() },
};
it('returns null when reactor is not an admin', async () => {
expect(
await handleStarReaction(makeReaction({ reactorJid: 'stranger@s.whatsapp.net' }), adminJids, {} as any),
).toBeNull();
const mockForwardQueue = { add: jest.fn() };
const mockIndexQueue = { add: jest.fn() };
describe('handleReaction', () => {
beforeEach(() => {
jest.clearAllMocks();
mockPrisma.message.findUnique.mockResolvedValue(makeMessage());
(approveMessage as jest.Mock).mockResolvedValue({ forwardJobs: [], indexDoc: {} });
});
it('returns null when message not found', async () => {
const prisma = { message: { findUnique: jest.fn().mockResolvedValue(null) } } as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
expect(prisma.message.findUnique).toHaveBeenCalledWith({
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'TARGET_MSG_123' } },
include: {
approval: true,
sourceGroup: {
include: { syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } } },
},
},
});
mockPrisma.message.findUnique.mockResolvedValue(null);
expect(await handleReaction(makeReaction(), mockPrisma, makePool(true))).toBeNull();
});
it('returns null when message status is not PENDING', async () => {
const prisma = {
message: { findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'REJECTED' })) },
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
it('returns null when message is not PENDING', async () => {
mockPrisma.message.findUnique.mockResolvedValue(makeMessage({ status: 'APPROVED' }));
expect(await handleReaction(makeReaction(), mockPrisma, makePool(true))).toBeNull();
});
it('returns null when approval record already exists', async () => {
const prisma = {
message: {
findUnique: jest.fn().mockResolvedValue(makeMessage({ status: 'APPROVED', approval: { id: 'appr_1' } })),
},
} as any;
expect(await handleStarReaction(makeReaction(), adminJids, prisma)).toBeNull();
it('returns null when no reaction rule matches', async () => {
mockPrisma.tenantRule.findMany.mockResolvedValue([]);
expect(await handleReaction(makeReaction(), mockPrisma, makePool(true))).toBeNull();
});
it('returns null on double-approval race (updateMany count=0)', async () => {
const result = await handleStarReaction(makeReaction(), adminJids, makePrisma({}, 0));
expect(result).toBeNull();
it('returns null when reactor is not a group admin', async () => {
mockPrisma.tenantRule.findMany.mockResolvedValue([
{ id: 'r1', matchType: 'REACTION_EMOJI', matchValue: '⭐', action: 'AUTO_APPROVE', priority: 0 },
]);
expect(await handleReaction(makeReaction(), mockPrisma, makePool(false))).toBeNull();
});
it('returns ApprovalResult with empty forwardJobs and valid indexDoc when no sync routes', async () => {
const result = await handleStarReaction(makeReaction(), adminJids, makePrisma());
it('approves when reactor is admin and reaction rule matches', async () => {
mockPrisma.tenantRule.findMany.mockResolvedValue([
{ id: 'r1', matchType: 'REACTION_EMOJI', matchValue: '⭐', action: 'AUTO_APPROVE', priority: 0 },
]);
const result = await handleReaction(makeReaction(), mockPrisma, makePool(true));
expect(result).not.toBeNull();
expect(result!.forwardJobs).toEqual([]);
expect(result!.indexDoc).toMatchObject({
messageId: 'msg_1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp_1',
sourceGroupName: 'UP Parivar Dallas',
tags: ['#important'],
platform: 'whatsapp',
});
expect(result!.indexDoc.approvedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
expect(approveMessage).toHaveBeenCalledWith('msg_1', 'tnt_1', '919876543210@s.whatsapp.net', mockPrisma);
});
it('returns ForwardJobData for each active sync route', async () => {
const prisma = makePrisma({
content: 'important announcement',
senderName: 'Bob',
sourceGroup: {
name: 'Source Group',
syncRoutesFrom: [
{ targetGroup: { platformId: '999@g.us', accountId: 'acc_2' } },
{ targetGroup: { platformId: '888@g.us', accountId: null } },
],
},
});
it('returns null when socket not available', async () => {
mockPrisma.tenantRule.findMany.mockResolvedValue([
{ id: 'r1', matchType: 'REACTION_EMOJI', matchValue: '⭐', action: 'AUTO_APPROVE', priority: 0 },
]);
const pool = { get: jest.fn().mockReturnValue(undefined) };
expect(await handleReaction(makeReaction(), mockPrisma, pool)).toBeNull();
});
const result = await handleStarReaction(makeReaction(), adminJids, prisma);
expect(result!.forwardJobs).toHaveLength(2);
expect(result!.forwardJobs[0]).toMatchObject({
messageId: 'msg_1',
content: 'important announcement',
sourceGroupName: 'Source Group',
senderName: 'Bob',
toGroupJid: '999@g.us',
fromAccountId: 'acc_2',
});
expect(result!.forwardJobs[1]).toMatchObject({
toGroupJid: '888@g.us',
fromAccountId: 'acc_1',
});
expect(result!.indexDoc).toMatchObject({
messageId: 'msg_1',
content: 'important announcement',
senderName: 'Bob',
sourceGroupId: 'grp_1',
tags: ['#important'],
platform: 'whatsapp',
});
it('returns null when approveMessage returns null', async () => {
mockPrisma.tenantRule.findMany.mockResolvedValue([
{ id: 'r1', matchType: 'REACTION_EMOJI', matchValue: '⭐', action: 'AUTO_APPROVE', priority: 0 },
]);
(approveMessage as jest.Mock).mockResolvedValue(null);
expect(await handleReaction(makeReaction(), mockPrisma, makePool(true))).toBeNull();
});
});
+77 -51
View File
@@ -1,80 +1,106 @@
import { NormalizedReaction, ForwardJobData, IndexJobData } from '@tower/types';
import { createLogger } from '@tower/logger';
import { matchReactionRules, TenantRuleRow } from '../whatsapp/match-rules';
import { approveMessage } from './approve-message';
const logger = createLogger('approval');
export interface ApprovalResult {
forwardJobs: ForwardJobData[];
indexDoc: IndexJobData;
}
export async function handleStarReaction(
export async function handleReaction(
reaction: NormalizedReaction,
adminJids: string[],
prisma: any,
pool: any,
forwardQueue?: any,
indexQueue?: any,
): Promise<ApprovalResult | null> {
if (reaction.emoji !== '⭐') return null;
if (!adminJids.includes(reaction.reactorJid)) return null;
logger.info({ emoji: reaction.emoji, reactorJid: reaction.reactorJid }, 'Reaction received');
// Find the target message to resolve tenant
const message = await prisma.message.findUnique({
where: {
platform_platformMsgId: {
// TODO: derive platform from NormalizedReaction when multi-platform support is added
platform: 'whatsapp',
platformMsgId: reaction.targetMsgId,
},
},
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: { where: { isActive: true }, include: { targetGroup: true } },
},
},
},
select: { id: true, tenantId: true, status: true, sourceGroup: { select: { platformId: true } } },
});
if (!message) return null;
if (message.status !== 'PENDING') return null;
if (message.approval) return null;
let approved = false;
await prisma.$transaction(async (tx: any) => {
const updated = await tx.message.updateMany({
where: { id: message.id, status: 'PENDING' },
data: { status: 'APPROVED' },
});
if (updated.count === 0) return;
approved = true;
await tx.approval.create({
data: {
messageId: message.id,
adminId: reaction.reactorJid,
decision: 'APPROVED',
},
});
// Load active reaction rules for this tenant
const ruleRows: TenantRuleRow[] = await prisma.tenantRule.findMany({
where: { tenantId: message.tenantId, isActive: true, matchType: 'REACTION_EMOJI' },
select: { id: true, matchType: true, matchValue: true, action: true, priority: true },
orderBy: { priority: 'asc' },
});
if (!approved) return null;
const matched = matchReactionRules(reaction.emoji, ruleRows);
if (!matched) {
logger.info({ emoji: reaction.emoji }, 'No reaction rule matched — ignoring');
return null;
}
const forwardJobs: ForwardJobData[] = message.sourceGroup.syncRoutesFrom
.filter((route: any) => route.targetGroup != null)
.map((route: any) => ({
messageId: message.id,
content: message.content,
sourceGroupName: message.sourceGroup.name,
senderName: message.senderName ?? undefined,
toGroupJid: route.targetGroup.platformId,
fromAccountId: route.targetGroup.accountId ?? reaction.accountId,
}));
logger.info({ emoji: reaction.emoji, action: matched.action }, 'Reaction rule matched');
const indexDoc: IndexJobData = {
messageId: message.id,
content: message.content,
senderName: message.senderName ?? null,
sourceGroupId: message.sourceGroupId,
sourceGroupName: message.sourceGroup.name,
tags: message.tags,
platform: message.platform,
approvedAt: new Date().toISOString(),
};
// For approval-type actions, verify the reactor is a group admin
if (matched.action === 'AUTO_APPROVE' || matched.action === 'FLAG') {
const sock = pool.get(reaction.accountId);
if (!sock) {
logger.warn({ accountId: reaction.accountId }, 'No active socket for group admin check');
return null;
}
return { forwardJobs, indexDoc };
let isAdmin = false;
try {
const metadata = await sock.groupMetadata(reaction.sourceGroupJid);
const participantJids = metadata.participants?.map((p: any) => ({ id: p.id, admin: p.admin })) ?? [];
logger.info({ participantJids, reactorJid: reaction.reactorJid }, 'Group metadata participants');
// Compare both with and without server suffix
const reactorBase = reaction.reactorJid.replace(/@.+$/, '');
isAdmin = metadata.participants?.some(
(p: any) => {
const pBase = p.id.replace(/@.+$/, '');
return (p.id === reaction.reactorJid || pBase === reactorBase) &&
(p.admin === 'admin' || p.admin === 'superadmin');
},
) ?? false;
} catch (err) {
logger.warn({ err, reactorJid: reaction.reactorJid }, 'Failed to check group admin status — rejecting reaction');
return null;
}
if (!isAdmin) {
logger.warn({ reactorJid: reaction.reactorJid }, 'Reactor is not a group admin — ignoring reaction');
return null;
}
}
// Handle the matched action
if (matched.action === 'SKIP' || matched.action === 'REJECT') {
// For reactions, SKIP and REJECT don't make sense for already-PENDING messages.
// We simply don't act on them.
logger.info({ action: matched.action }, 'Reaction matched SKIP/REJECT — no action taken');
return null;
}
// AUTO_APPROVE or FLAG — approve the message (both lead to approval for reactions)
const result = await approveMessage(message.id, message.tenantId, reaction.reactorJid, prisma);
if (!result) {
logger.warn({ messageId: message.id }, 'Could not approve message via reaction');
return null;
}
logger.info(
{ messageId: message.id, forwardCount: result.forwardJobs.length },
'Message approved by reaction',
);
return result;
}
+86
View File
@@ -0,0 +1,86 @@
import { ForwardJobData, IndexJobData } from '@tower/types';
import { createLogger } from '@tower/logger';
const logger = createLogger('approve-message');
export interface ApproveMessageResult {
forwardJobs: ForwardJobData[];
indexDoc: IndexJobData;
}
/**
* Approve a PENDING message: mark APPROVED in a transaction, create an Approval row,
* and return the forward jobs + index document.
* Returns null if the message is not in PENDING state or is already approved.
*/
export async function approveMessage(
messageId: string,
tenantId: string,
adminId: string, // JID of the reactor or system
prisma: any,
): Promise<ApproveMessageResult | null> {
const message = await prisma.message.findUnique({
where: { id: messageId },
include: {
approval: true,
sourceGroup: {
include: {
syncRoutesFrom: {
where: { isActive: true, targetGroup: { isActive: true } },
include: { targetGroup: { select: { platformId: true, isActive: true, accountId: true } } },
},
},
},
},
});
if (!message) return null;
if (message.status !== 'PENDING') return null;
if (message.approval) return null;
let approved = false;
await prisma.$transaction(async (tx: any) => {
const updated = await tx.message.updateMany({
where: { id: message.id, status: 'PENDING' },
data: { status: 'APPROVED' },
});
if (updated.count === 0) return;
approved = true;
await tx.approval.create({
data: {
tenantId: message.tenantId,
messageId: message.id,
adminId,
decision: 'APPROVED',
},
});
});
if (!approved) return null;
const forwardJobs: ForwardJobData[] = (message.sourceGroup?.syncRoutesFrom ?? [])
.filter((route: any) => route.targetGroup != null)
.map((route: any) => ({
tenantId: message.tenantId,
messageId: message.id,
content: message.content,
sourceGroupName: message.sourceGroup.name,
senderName: message.senderName ?? undefined,
toGroupJid: route.targetGroup.platformId,
fromAccountId: route.targetGroup.accountId ?? message.sourceGroup.accountId ?? '',
}));
const indexDoc: IndexJobData = {
tenantId: message.tenantId,
messageId: message.id,
content: message.content,
senderName: message.senderName ?? null,
sourceGroupId: message.sourceGroupId,
sourceGroupName: message.sourceGroup.name,
tags: message.tags ?? [],
platform: message.platform,
approvedAt: new Date().toISOString(),
};
return { forwardJobs, indexDoc };
}
@@ -0,0 +1,46 @@
import { createEmailService, EmailService } from './email.service';
describe('email.service', () => {
const originalEnv = process.env;
beforeEach(() => {
process.env = { ...originalEnv };
delete process.env['SMTP_HOST'];
delete process.env['SMTP_PORT'];
delete process.env['SMTP_USER'];
delete process.env['SMTP_PASS'];
delete process.env['SMTP_FROM'];
delete process.env['SMTP_SECURE'];
});
afterAll(() => {
process.env = originalEnv;
});
it('returns a no-op service when SMTP_HOST is unset', () => {
const service = createEmailService();
expect(service).toBeDefined();
expect(typeof service.sendPendingClaimNotification).toBe('function');
});
it('logs but does not throw when no-op service is asked to send', async () => {
const service: EmailService = createEmailService();
await expect(
service.sendPendingClaimNotification({
to: 'owner@example.com',
groupName: 'Family Chat',
expiresAt: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),
}),
).resolves.toBeUndefined();
});
it('creates a transport when SMTP_HOST is set', () => {
process.env['SMTP_HOST'] = 'smtp.ethereal.email';
process.env['SMTP_PORT'] = '587';
process.env['SMTP_USER'] = 'user@test';
process.env['SMTP_PASS'] = 'pass';
process.env['SMTP_FROM'] = 'TOWER <noreply@tower.local>';
const service = createEmailService();
expect(service).toBeDefined();
});
});
+86
View File
@@ -0,0 +1,86 @@
import { createLogger } from '@tower/logger';
import nodemailer, { Transporter } from 'nodemailer';
const logger = createLogger('email');
export interface EmailService {
sendPendingClaimNotification(input: {
to: string;
groupName: string;
expiresAt: Date;
}): Promise<void>;
}
interface SmtpConfig {
host: string;
port: number;
user?: string;
pass?: string;
from: string;
secure: boolean;
}
function readSmtpConfig(): SmtpConfig | null {
const host = process.env['SMTP_HOST']?.trim();
if (!host) return null;
return {
host,
port: Number(process.env['SMTP_PORT'] ?? 587),
user: process.env['SMTP_USER']?.trim() || undefined,
pass: process.env['SMTP_PASS']?.trim() || undefined,
from: process.env['SMTP_FROM']?.trim() || 'TOWER <noreply@tower.local>',
secure: (process.env['SMTP_SECURE'] ?? '').toLowerCase() === 'true',
};
}
export function createEmailService(): EmailService {
const cfg = readSmtpConfig();
if (!cfg) {
logger.warn('SMTP_HOST not set — email notifications disabled');
return new NoopEmailService();
}
const transporter: Transporter = nodemailer.createTransport({
host: cfg.host,
port: cfg.port,
secure: cfg.secure,
...(cfg.user ? { auth: { user: cfg.user, pass: cfg.pass ?? '' } } : {}),
});
logger.info({ host: cfg.host, port: cfg.port, from: cfg.from }, 'SMTP transport ready');
return new SmtpEmailService(transporter, cfg.from);
}
class NoopEmailService implements EmailService {
async sendPendingClaimNotification(input: { to: string; groupName: string; expiresAt: Date }): Promise<void> {
logger.info(
{ to: input.to, groupName: input.groupName, expiresAt: input.expiresAt.toISOString() },
'[email-disabled] would send GROUP_PENDING_CLAIM notification',
);
}
}
class SmtpEmailService implements EmailService {
constructor(
private readonly transporter: Transporter,
private readonly from: string,
) {}
async sendPendingClaimNotification(input: { to: string; groupName: string; expiresAt: Date }): Promise<void> {
const expiresInDays = Math.max(1, Math.ceil((input.expiresAt.getTime() - Date.now()) / (24 * 60 * 60 * 1000)));
const subject = `[TOWER] New group awaiting claim: ${input.groupName}`;
const text = [
`A WhatsApp group joined the TOWER bot and needs to be claimed by a tenant admin.`,
``,
`Group: ${input.groupName}`,
`Claim by: ${input.expiresAt.toISOString()} (within ${expiresInDays} day${expiresInDays === 1 ? '' : 's'})`,
``,
`Sign in to your TOWER portal and go to Groups → Pending Claim to take ownership.`,
`If no owner claims it before the deadline, the bot will stop listening and the group will be marked EXPIRED.`,
].join('\n');
try {
await this.transporter.sendMail({ from: this.from, to: input.to, subject, text });
logger.info({ to: input.to, groupName: input.groupName }, 'Sent GROUP_PENDING_CLAIM notification');
} catch (err) {
logger.error({ err, to: input.to, groupName: input.groupName }, 'Failed to send GROUP_PENDING_CLAIM notification');
}
}
}
+118 -20
View File
@@ -9,9 +9,12 @@ import { createForwardWorker } from './queues/forward.processor';
import { createIndexQueue } from './queues/index.queue';
import { createIndexWorker } from './queues/index.processor';
import { WhatsAppSessionPool } from './whatsapp/session-pool';
import { detectTags, isFlagged } from './whatsapp/tag-detector';
import { matchContentRules, TenantRuleRow } from './whatsapp/match-rules';
import { syncGroups } from './whatsapp/group-sync';
import { handleStarReaction } from './core/approval';
import { handleReaction } from './core/approval';
import { approveMessage } from './core/approve-message';
import { startOtpSenderLoop } from './whatsapp/otp-sender';
import { handleCommand } from './whatsapp/command-handler';
const logger = createLogger('tower-worker');
@@ -19,11 +22,6 @@ async function bootstrap() {
const env = validateEnv();
const prisma = new PrismaClient();
await prisma.$connect();
const adminJids = env.TOWER_ADMIN_JIDS
? env.TOWER_ADMIN_JIDS.split(',').map((j) => j.trim()).filter(Boolean)
: [];
const meiliClient = createMeiliClient(env.MEILI_URL, env.MEILI_MASTER_KEY);
await configureIndex(meiliClient).catch((err) =>
logger.warn({ err }, 'Failed to configure Meilisearch index — search may be degraded'),
@@ -34,7 +32,7 @@ async function bootstrap() {
const indexQueue = createIndexQueue(env.REDIS_URL);
const pool = new WhatsAppSessionPool();
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma);
const ingestWorker = createIngestWorker(env.REDIS_URL, prisma, pool, forwardQueue, indexQueue);
const forwardWorker = createForwardWorker(env.REDIS_URL, pool);
const indexWorker = createIndexWorker(env.REDIS_URL, meiliClient);
@@ -54,8 +52,12 @@ async function bootstrap() {
account.id,
account.sessionPath,
async (msg, accountId) => {
const tags = detectTags(msg.content, msg.senderJid, adminJids);
if (!isFlagged(tags)) return;
logger.info({ groupJid: msg.sourceGroupJid, senderJid: msg.senderJid, content: msg.content?.slice(0, 80) }, 'Message received');
// Command handler intercepts STOP/START/PORTAL from non-bot members
await handleCommand(msg, accountId, prisma, pool).catch((err) =>
logger.error({ err }, 'Command handler error'),
);
const groupMap = groupMaps.get(accountId);
if (!groupMap) {
@@ -68,9 +70,76 @@ async function bootstrap() {
return;
}
// Resolve tenant from the group; drop messages from unclaimed groups
const group = await prisma.group.findUnique({
where: { id: sourceGroupId },
select: { tenantId: true },
});
if (!group || !group.tenantId) {
logger.info({ sourceGroupId }, 'Group not yet claimed — message dropped');
return;
}
// Check tenant state
const tenant = await prisma.tenant.findUnique({
where: { id: group.tenantId },
select: { isActive: true, isForwardingPaused: true },
});
if (!tenant || !tenant.isActive) {
logger.info({ tenantId: group.tenantId, sourceGroupId }, 'Tenant is inactive — message dropped');
return;
}
if (tenant.isForwardingPaused) {
logger.info({ tenantId: group.tenantId, sourceGroupId }, 'Forwarding paused for tenant — message dropped');
return;
}
// Load active rules for this tenant
const ruleRows: TenantRuleRow[] = await prisma.tenantRule.findMany({
where: { tenantId: group.tenantId, isActive: true },
select: { id: true, matchType: true, matchValue: true, action: true, priority: true },
orderBy: { priority: 'asc' },
});
const { tags, effectiveAction } = matchContentRules(msg.content, ruleRows);
logger.info({ tags, effectiveAction }, 'Rule match result');
// No matching rules — drop the message
if (tags.length === 0) return;
// SKIP action — silently drop
if (effectiveAction === 'SKIP') {
logger.info({ platformMsgId: msg.platformMsgId }, 'Message skipped by rule');
return;
}
// For AUTO_APPROVE, check if the sender is a group admin
let finalAction = effectiveAction;
if (effectiveAction === 'AUTO_APPROVE') {
try {
const sock = pool.get(accountId);
if (sock) {
const metadata = await sock.groupMetadata(msg.sourceGroupJid);
const isAdmin = metadata.participants?.some(
(p: any) => p.id === msg.senderJid && (p.admin === 'admin' || p.admin === 'superadmin'),
);
if (isAdmin) {
finalAction = 'AUTO_APPROVE';
} else {
logger.info({ senderJid: msg.senderJid }, 'Sender is not a group admin — downgrading AUTO_APPROVE to FLAG');
finalAction = 'FLAG';
}
}
} catch (err) {
logger.warn({ err, senderJid: msg.senderJid }, 'Failed to check group admin status — downgrading to FLAG');
finalAction = 'FLAG';
}
}
await ingestQueue.add(
'ingest',
{
tenantId: group.tenantId,
platformMsgId: msg.platformMsgId,
platform: 'whatsapp',
accountId,
@@ -79,14 +148,15 @@ async function bootstrap() {
senderName: msg.senderName,
content: msg.content,
tags,
effectiveAction: finalAction ?? undefined,
},
{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } },
);
logger.info({ platformMsgId: msg.platformMsgId, tags }, 'Message enqueued');
logger.info({ platformMsgId: msg.platformMsgId, tags, effectiveAction: finalAction }, 'Message enqueued');
},
async (reaction) => {
const result = await handleStarReaction(reaction, adminJids, prisma);
const result = await handleReaction(reaction, prisma, pool, forwardQueue, indexQueue);
if (!result) return;
const { forwardJobs, indexDoc } = result;
@@ -110,13 +180,13 @@ async function bootstrap() {
},
async (groups, accountId) => {
logger.info({ count: Object.keys(groups).length, accountId }, 'Syncing groups');
const map = await syncGroups(groups, accountId, prisma);
const map = await syncGroups(groups, accountId, prisma, pool);
groupMaps.set(accountId, map);
},
async (qr, accountId) => {
await prisma.account.update({
where: { id: accountId },
data: { qrCode: qr },
data: { qrCode: qr, status: 'PAIRING' },
}).catch((err) => logger.error({ accountId, err }, 'Failed to store QR in DB'));
logger.info({ accountId }, 'QR code updated');
},
@@ -133,6 +203,12 @@ async function bootstrap() {
data: { status: 'DISCONNECTED' },
}).catch((err) => logger.error({ accountId, err }, 'Failed to update account status'));
logger.info({ accountId }, 'Account logged out — awaiting QR scan');
} else if (status === 'disconnected') {
await prisma.account.update({
where: { id: accountId },
data: { status: 'DISCONNECTED' },
}).catch((err) => logger.error({ accountId, err }, 'Failed to update account status'));
logger.info({ accountId }, 'Account disconnected');
}
},
);
@@ -141,14 +217,14 @@ async function bootstrap() {
}
}
// Load ACTIVE and DISCONNECTED accounts at startup (DISCONNECTED ones need re-auth)
// Load bot accounts at startup: ACTIVE, DISCONNECTED (need re-auth), PAIRING (mid-pairing)
const accounts = await prisma.account.findMany({
where: { status: { in: ['ACTIVE', 'DISCONNECTED'] }, platform: 'whatsapp' },
where: { isBot: true, status: { in: ['ACTIVE', 'DISCONNECTED', 'PAIRING'] }, platform: 'whatsapp' },
select: { id: true, sessionPath: true },
});
if (accounts.length === 0) {
logger.warn('No WhatsApp accounts found — add one via the dashboard');
logger.warn('No bot accounts found — pair one via /settings/bot');
}
for (const account of accounts) {
@@ -157,16 +233,18 @@ async function bootstrap() {
logger.info({ accountCount: accounts.length }, 'Tower worker ready');
// Poll every 30s for accounts added via the dashboard while worker is running
// Poll every 30s for accounts added via the dashboard while worker is running.
// Every 60s, also re-fetch groups for each active session so newly-added groups
// are detected without requiring a worker restart.
setInterval(async () => {
try {
const all = await prisma.account.findMany({
where: { status: { in: ['ACTIVE', 'DISCONNECTED'] }, platform: 'whatsapp' },
where: { isBot: true, status: { in: ['ACTIVE', 'DISCONNECTED', 'PAIRING'] }, platform: 'whatsapp' },
select: { id: true, sessionPath: true },
});
for (const account of all) {
if (!pool.get(account.id)) {
logger.info({ accountId: account.id }, 'New account detected — starting session');
logger.info({ accountId: account.id }, 'New bot account detected — starting session');
await startAccount(account);
}
}
@@ -175,6 +253,26 @@ async function bootstrap() {
}
}, 30_000);
setInterval(async () => {
try {
for (const [accountId, sock] of pool.getAll()) {
try {
const groups = await sock.groupFetchAllParticipating();
logger.info({ count: Object.keys(groups).length, accountId }, 'Periodic group re-sync');
const map = await syncGroups(groups, accountId, prisma, pool);
groupMaps.set(accountId, map);
} catch (err) {
logger.warn({ accountId, err }, 'Periodic group re-sync failed');
}
}
} catch (err) {
logger.error({ err }, 'Error in periodic group re-sync loop');
}
}, 60_000);
// OTP-sender loop: pick up unsent OtpChallenges and DM them via the bot
startOtpSenderLoop(prisma, pool, logger);
const shutdown = async () => {
logger.info('Shutting down...');
await pool.closeAll();
@@ -4,6 +4,7 @@ import { ForwardJobData } from '@tower/types';
const mockPool = { sendMessage: jest.fn().mockResolvedValue(undefined) };
const baseJob: ForwardJobData = {
tenantId: 'tnt_1',
messageId: 'msg_1',
content: 'Event this Saturday at the temple',
sourceGroupName: 'UP Parivar Dallas',
@@ -9,6 +9,7 @@ jest.mock('@tower/search', () => ({
function makeJob(overrides: Partial<IndexJobData> = {}): IndexJobData {
return {
tenantId: 'tnt-1',
messageId: 'msg-1',
content: 'hello world',
senderName: 'Alice',
@@ -24,11 +25,12 @@ function makeJob(overrides: Partial<IndexJobData> = {}): IndexJobData {
describe('processIndexJob', () => {
beforeEach(() => jest.clearAllMocks());
it('calls indexMessage with MeiliDocument shape', async () => {
it('calls indexMessage with MeiliDocument shape including tenantId', async () => {
const mockClient = {} as any;
await processIndexJob(makeJob(), mockClient);
expect(indexMessage).toHaveBeenCalledWith(mockClient, {
id: 'msg-1',
tenantId: 'tnt-1',
content: 'hello world',
senderName: 'Alice',
sourceGroupId: 'grp-1',
@@ -10,6 +10,7 @@ export async function processIndexJob(job: IndexJobData, meiliClient: MeiliSearc
}
const doc: MeiliDocument = {
id: job.messageId,
tenantId: job.tenantId,
content: job.content,
senderName: job.senderName ?? '',
sourceGroupId: job.sourceGroupId,
@@ -1,17 +1,24 @@
import { processIngestJob } from './ingest.processor';
import { IngestJobData } from '@tower/types';
import { approveMessage } from '../core/approve-message';
jest.mock('../core/approve-message');
const mockPrisma = {
message: {
upsert: jest.fn(),
},
message: { upsert: jest.fn(), findUnique: jest.fn() },
group: { findUnique: jest.fn() },
tenant: { findUnique: jest.fn() },
memberOptOut: { findFirst: jest.fn() },
towerUser: { upsert: jest.fn() },
approval: { upsert: jest.fn() },
};
const sampleJob: IngestJobData = {
tenantId: 'tnt-1',
platformMsgId: 'WA_MSG_001',
platform: 'whatsapp',
accountId: 'account-1',
sourceGroupId: 'clxxxxxx',
sourceGroupId: 'grp-1',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: '#important update from the committee',
@@ -19,21 +26,40 @@ const sampleJob: IngestJobData = {
};
describe('processIngestJob', () => {
beforeEach(() => jest.clearAllMocks());
it('upserts a message with PENDING status', async () => {
beforeEach(() => {
jest.clearAllMocks();
mockPrisma.group.findUnique.mockResolvedValue({ id: 'grp-1', claimStatus: 'CLAIMED' });
mockPrisma.tenant.findUnique.mockResolvedValue({ isActive: true, isForwardingPaused: false });
mockPrisma.memberOptOut.findFirst.mockResolvedValue(null);
mockPrisma.towerUser.upsert.mockResolvedValue({ id: 'user-1' });
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-db-id' });
(approveMessage as jest.Mock).mockResolvedValue({ forwardJobs: [], indexDoc: {} });
});
it('drops message when group is not CLAIMED', async () => {
mockPrisma.group.findUnique.mockResolvedValue({ id: 'grp-1', claimStatus: 'PENDING_CLAIM' });
await processIngestJob(sampleJob, mockPrisma as any);
expect(mockPrisma.message.upsert).not.toHaveBeenCalled();
});
it('drops message when sender has opted out', async () => {
mockPrisma.memberOptOut.findFirst.mockResolvedValue({ id: 'opt-1' });
await processIngestJob(sampleJob, mockPrisma as any);
expect(mockPrisma.message.upsert).not.toHaveBeenCalled();
});
it('upserts a message with PENDING status by default', async () => {
await processIngestJob(sampleJob, mockPrisma as any);
expect(mockPrisma.message.upsert).toHaveBeenCalledWith({
where: { platform_platformMsgId: { platform: 'whatsapp', platformMsgId: 'WA_MSG_001' } },
create: {
tenantId: 'tnt-1',
platform: 'whatsapp',
platformMsgId: 'WA_MSG_001',
sourceGroupId: 'clxxxxxx',
sourceGroupId: 'grp-1',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
senderTowerUserId: 'user-1',
content: '#important update from the committee',
tags: ['#important'],
status: 'PENDING',
@@ -51,4 +77,35 @@ describe('processIngestJob', () => {
mockPrisma.message.upsert.mockRejectedValue(new Error('DB connection lost'));
await expect(processIngestJob(sampleJob, mockPrisma as any)).rejects.toThrow('DB connection lost');
});
it('creates REJECTED message when effectiveAction is REJECT', async () => {
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-rejected' });
await processIngestJob({ ...sampleJob, effectiveAction: 'REJECT' }, mockPrisma as any);
expect(mockPrisma.message.upsert).toHaveBeenCalledWith(
expect.objectContaining({
create: expect.objectContaining({ status: 'REJECTED' }),
}),
);
expect(mockPrisma.approval.upsert).toHaveBeenCalledWith({
where: { messageId: 'msg-rejected' },
update: {},
create: {
tenantId: 'tnt-1',
messageId: 'msg-rejected',
adminId: 'system',
decision: 'REJECTED',
},
});
});
it('calls approveMessage when effectiveAction is AUTO_APPROVE', async () => {
mockPrisma.message.upsert.mockResolvedValue({ id: 'msg-auto' });
await processIngestJob({ ...sampleJob, effectiveAction: 'AUTO_APPROVE' }, mockPrisma as any);
expect(approveMessage).toHaveBeenCalledWith('msg-auto', 'tnt-1', 'system', mockPrisma);
});
it('does not call approveMessage for FLAG (default)', async () => {
await processIngestJob(sampleJob, mockPrisma as any);
expect(approveMessage).not.toHaveBeenCalled();
});
});
+116 -7
View File
@@ -1,9 +1,71 @@
import { Worker } from 'bullmq';
import { IngestJobData } from '@tower/types';
import { Worker, Queue } from 'bullmq';
import { IngestJobData, ForwardJobData, IndexJobData } from '@tower/types';
import { parseRedisUrl } from './redis-connection';
import { approveMessage } from '../core/approve-message';
import { createLogger } from '@tower/logger';
export async function processIngestJob(job: IngestJobData, prisma: any): Promise<void> {
await prisma.message.upsert({
const logger = createLogger('ingest-processor');
export async function processIngestJob(
job: IngestJobData,
prisma: any,
pool?: any,
forwardQueue?: Queue<ForwardJobData>,
indexQueue?: Queue<IndexJobData>,
): Promise<void> {
// Defensive: drop messages from non-CLAIMED groups
const group = await prisma.group.findUnique({
where: { id: job.sourceGroupId },
select: { claimStatus: true },
});
if (!group || group.claimStatus !== 'CLAIMED') {
return;
}
// Safety net: drop if tenant is inactive or paused
const tenant = await prisma.tenant.findUnique({
where: { id: job.tenantId },
select: { isActive: true, isForwardingPaused: true },
});
if (!tenant || !tenant.isActive || tenant.isForwardingPaused) {
return;
}
// If the sender has opted out of this group, drop the message
const phoneHash = `jid:${job.senderJid}`;
const optOut = await prisma.memberOptOut.findFirst({
where: {
tenantId: job.tenantId,
groupId: job.sourceGroupId,
user: { jid: job.senderJid, phoneHash },
},
select: { id: true },
});
if (optOut) {
return;
}
// Resolve or create TowerUser for sender
const user = await prisma.towerUser.upsert({
where: { tenantId_phoneHash: { tenantId: job.tenantId, phoneHash } },
update: {},
create: {
tenantId: job.tenantId,
phoneHash,
jid: job.senderJid,
displayName: job.senderName ?? job.senderJid,
},
});
// Determine the initial status based on effectiveAction
let initialStatus: string;
if (job.effectiveAction === 'REJECT') {
initialStatus = 'REJECTED';
} else {
initialStatus = 'PENDING';
}
const msg = await prisma.message.upsert({
where: {
platform_platformMsgId: {
platform: job.platform,
@@ -11,23 +73,70 @@ export async function processIngestJob(job: IngestJobData, prisma: any): Promise
},
},
create: {
tenantId: job.tenantId,
platform: job.platform,
platformMsgId: job.platformMsgId,
sourceGroupId: job.sourceGroupId,
senderJid: job.senderJid,
senderName: job.senderName,
senderTowerUserId: user.id,
content: job.content,
tags: job.tags,
status: 'PENDING',
status: initialStatus,
},
update: {},
});
// For REJECT, create an approval record so it's searchable as rejected
if (job.effectiveAction === 'REJECT') {
await prisma.approval.upsert({
where: { messageId: msg.id },
update: {},
create: {
tenantId: job.tenantId,
messageId: msg.id,
adminId: 'system',
decision: 'REJECTED',
},
});
logger.info({ messageId: msg.id }, 'Message rejected by rule');
return;
}
// For AUTO_APPROVE, immediately approve via approveMessage helper
if (job.effectiveAction === 'AUTO_APPROVE') {
const result = await approveMessage(msg.id, job.tenantId, 'system', prisma);
if (result) {
if (forwardQueue) {
for (const fwd of result.forwardJobs) {
await forwardQueue.add('forward', fwd, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});
}
}
if (indexQueue) {
await indexQueue.add('index', result.indexDoc, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
}
logger.info({ messageId: msg.id, forwardCount: result.forwardJobs.length }, 'Message auto-approved');
}
return;
}
}
export function createIngestWorker(redisUrl: string, prisma: any): Worker<IngestJobData> {
export function createIngestWorker(
redisUrl: string,
prisma: any,
pool?: any,
forwardQueue?: Queue<ForwardJobData>,
indexQueue?: Queue<IndexJobData>,
): Worker<IngestJobData> {
return new Worker<IngestJobData>(
'tower-ingest',
async (job) => processIngestJob(job.data, prisma),
async (job) => processIngestJob(job.data, prisma, pool, forwardQueue, indexQueue),
{ connection: parseRedisUrl(redisUrl) },
);
}
@@ -0,0 +1,124 @@
import { handleCommand } from './command-handler';
import type { NormalizedMessage } from '@tower/types';
function makeMsg(overrides: Partial<NormalizedMessage> = {}): NormalizedMessage {
return {
platformMsgId: 'WA_MSG_001',
sourceGroupJid: '120363043312345678@g.us',
senderJid: '919876543210@s.whatsapp.net',
senderName: 'Alice',
content: '',
accountId: 'acc-1',
...overrides,
};
}
const mockPrisma: any = {
group: { findUnique: jest.fn() },
towerUser: { upsert: jest.fn() },
consentRecord: { findFirst: jest.fn(), create: jest.fn(), update: jest.fn(), updateMany: jest.fn() },
memberOptOut: { create: jest.fn() },
auditEvent: { create: jest.fn() },
};
const mockPool: any = {
sendMessage: jest.fn().mockResolvedValue(undefined),
};
describe('handleCommand', () => {
beforeEach(() => {
jest.clearAllMocks();
});
it('returns false for non-command messages', async () => {
const handled = await handleCommand(makeMsg({ content: 'hello world' }), 'acc-1', mockPrisma, mockPool);
expect(handled).toBe(false);
expect(mockPool.sendMessage).not.toHaveBeenCalled();
});
describe('STOP', () => {
it('creates opt-out and revokes consent for claimed group', async () => {
mockPrisma.group.findUnique.mockResolvedValue({ id: 'grp-1', tenantId: 'tnt-1', claimStatus: 'CLAIMED' });
mockPrisma.towerUser.upsert.mockResolvedValue({ id: 'user-1' });
mockPrisma.consentRecord.updateMany.mockResolvedValue({ count: 1 });
mockPrisma.memberOptOut.create.mockResolvedValue({});
mockPrisma.auditEvent.create.mockResolvedValue({});
const handled = await handleCommand(
makeMsg({ content: 'STOP' }),
'acc-1',
mockPrisma,
mockPool,
);
expect(handled).toBe(true);
expect(mockPrisma.consentRecord.updateMany).toHaveBeenCalled();
expect(mockPrisma.memberOptOut.create).toHaveBeenCalledWith(
expect.objectContaining({ data: expect.objectContaining({ reason: 'STOP_KEYWORD' }) }),
);
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'acc-1',
'919876543210@s.whatsapp.net',
expect.stringContaining("opted out"),
);
});
it('is a no-op for non-claimed groups', async () => {
mockPrisma.group.findUnique.mockResolvedValue({ id: 'grp-1', claimStatus: 'PENDING_CLAIM' });
const handled = await handleCommand(
makeMsg({ content: 'STOP' }),
'acc-1',
mockPrisma,
mockPool,
);
expect(handled).toBe(false);
expect(mockPrisma.consentRecord.updateMany).not.toHaveBeenCalled();
});
});
describe('START', () => {
it('re-grants default scopes', async () => {
mockPrisma.group.findUnique.mockResolvedValue({ id: 'grp-1', tenantId: 'tnt-1', claimStatus: 'CLAIMED' });
mockPrisma.towerUser.upsert.mockResolvedValue({ id: 'user-1' });
mockPrisma.consentRecord.findFirst.mockResolvedValue({
id: 'c-1',
scopes: ['INGEST'],
retentionDays: 90,
});
mockPrisma.consentRecord.update.mockResolvedValue({});
const handled = await handleCommand(
makeMsg({ content: 'start' }),
'acc-1',
mockPrisma,
mockPool,
);
expect(handled).toBe(true);
expect(mockPrisma.consentRecord.update).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: 'c-1' },
data: expect.objectContaining({ status: 'GRANTED' }),
}),
);
});
});
describe('PORTAL', () => {
it('sends an onboarding link', async () => {
mockPrisma.group.findUnique.mockResolvedValue({ id: 'grp-1', tenantId: 'tnt-1', claimStatus: 'CLAIMED' });
mockPrisma.towerUser.upsert.mockResolvedValue({ id: 'user-1' });
const handled = await handleCommand(
makeMsg({ content: 'portal' }),
'acc-1',
mockPrisma,
mockPool,
);
expect(handled).toBe(true);
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'acc-1',
'919876543210@s.whatsapp.net',
expect.stringContaining('/onboard?token='),
);
});
});
});
+232
View File
@@ -0,0 +1,232 @@
import type { NormalizedMessage } from '@tower/types';
import { createLogger } from '@tower/logger';
import { WhatsAppSessionPool } from './session-pool';
const logger = createLogger('command-handler');
const PORTAL_BASE = process.env['TOWER_PORTAL_BASE_URL'] ?? 'http://localhost:3000';
const STOP_REGEX = /^\s*stop\s*$/i;
const START_REGEX = /^\s*start\s*$/i;
const PORTAL_REGEX = /^\s*portal\s*$/i;
const COMMANDS_REGEX = /^\s*commands\s*$/i;
export async function handleCommand(
msg: NormalizedMessage,
accountId: string,
prisma: any,
pool: WhatsAppSessionPool,
): Promise<boolean> {
if (!msg.content) return false;
const text = msg.content.trim();
if (STOP_REGEX.test(text)) {
return await handleStop(msg, accountId, prisma, pool);
}
if (START_REGEX.test(text)) {
return await handleStart(msg, accountId, prisma, pool);
}
if (PORTAL_REGEX.test(text)) {
return await handlePortal(msg, accountId, prisma, pool);
}
if (COMMANDS_REGEX.test(text)) {
return await handleCommands(msg, accountId, pool);
}
return false;
}
async function handleStop(
msg: NormalizedMessage,
accountId: string,
prisma: any,
pool: WhatsAppSessionPool,
): Promise<boolean> {
const group = await prisma.group.findUnique({
where: { platform_platformId: { platform: 'whatsapp', platformId: msg.sourceGroupJid } },
select: { id: true, tenantId: true, claimStatus: true },
});
if (!group || group.claimStatus !== 'CLAIMED' || !group.tenantId) {
return false;
}
// Find or create a TowerUser by jid (phone not yet known for STOP-only flow)
const phoneHash = `stop:${msg.senderJid}`;
const user = await prisma.towerUser.upsert({
where: { tenantId_phoneHash: { tenantId: group.tenantId, phoneHash } },
update: { jid: msg.senderJid },
create: {
tenantId: group.tenantId,
phoneHash,
jid: msg.senderJid,
displayName: msg.senderName ?? msg.senderJid,
},
});
// Revoke all active consents in this group
await prisma.consentRecord.updateMany({
where: { userId: user.id, tenantId: group.tenantId, groupId: group.id, status: 'GRANTED' },
data: { status: 'REVOKED', revokedAt: new Date() },
});
await prisma.memberOptOut.create({
data: {
tenantId: group.tenantId,
userId: user.id,
groupId: group.id,
reason: 'STOP_KEYWORD',
},
});
await prisma.auditEvent.create({
data: {
tenantId: group.tenantId,
actorType: 'MEMBER',
actorId: user.id,
action: 'MEMBER_OPT_OUT',
resourceType: 'TowerUser',
resourceId: user.id,
payload: { jid: msg.senderJid, groupId: group.id, reason: 'STOP_KEYWORD' },
},
});
try {
await pool.sendMessage(
accountId,
msg.senderJid,
"You've been opted out. Type START in this group to rejoin.",
);
} catch (err) {
logger.warn({ err, jid: msg.senderJid }, 'Failed to send STOP confirmation DM');
}
logger.info({ jid: msg.senderJid, groupId: group.id }, 'STOP processed');
return true;
}
async function handleStart(
msg: NormalizedMessage,
accountId: string,
prisma: any,
pool: WhatsAppSessionPool,
): Promise<boolean> {
const group = await prisma.group.findUnique({
where: { platform_platformId: { platform: 'whatsapp', platformId: msg.sourceGroupJid } },
select: { id: true, tenantId: true, claimStatus: true },
});
if (!group || group.claimStatus !== 'CLAIMED' || !group.tenantId) {
return false;
}
const phoneHash = `stop:${msg.senderJid}`;
const user = await prisma.towerUser.upsert({
where: { tenantId_phoneHash: { tenantId: group.tenantId, phoneHash } },
update: { jid: msg.senderJid },
create: {
tenantId: group.tenantId,
phoneHash,
jid: msg.senderJid,
displayName: msg.senderName ?? msg.senderJid,
},
});
const existing = await prisma.consentRecord.findFirst({
where: { userId: user.id, tenantId: group.tenantId, groupId: group.id },
});
if (existing) {
await prisma.consentRecord.update({
where: { id: existing.id },
data: {
status: 'GRANTED',
scopes: existing.scopes.length > 0 ? existing.scopes : ['INGEST', 'DISPLAY'],
retentionDays: existing.retentionDays,
revokedAt: null,
effectiveAt: new Date(),
},
});
} else {
await prisma.consentRecord.create({
data: {
tenantId: group.tenantId,
groupId: group.id,
userId: user.id,
scopes: ['INGEST', 'DISPLAY'],
retentionDays: 90,
policyVersion: 'v1',
status: 'GRANTED',
proofEventId: user.id,
},
});
}
try {
await pool.sendMessage(
accountId,
msg.senderJid,
"Welcome back. Your default scopes (INGEST, DISPLAY) are re-granted. Visit your portal to customize: " +
`${PORTAL_BASE}/my`,
);
} catch (err) {
logger.warn({ err, jid: msg.senderJid }, 'Failed to send START confirmation DM');
}
logger.info({ jid: msg.senderJid, groupId: group.id }, 'START processed');
return true;
}
async function handlePortal(
msg: NormalizedMessage,
accountId: string,
prisma: any,
pool: WhatsAppSessionPool,
): Promise<boolean> {
const group = await prisma.group.findUnique({
where: { platform_platformId: { platform: 'whatsapp', platformId: msg.sourceGroupJid } },
select: { id: true, tenantId: true, claimStatus: true },
});
if (!group || group.claimStatus !== 'CLAIMED' || !group.tenantId) {
return false;
}
// Reuse same phoneHash scheme as STOP/START so the same user is found
const phoneHash = `stop:${msg.senderJid}`;
const user = await prisma.towerUser.upsert({
where: { tenantId_phoneHash: { tenantId: group.tenantId, phoneHash } },
update: { jid: msg.senderJid },
create: {
tenantId: group.tenantId,
phoneHash,
jid: msg.senderJid,
displayName: msg.senderName ?? msg.senderJid,
},
});
const onboardingToken = await issueOnboardingToken(prisma, group.tenantId, group.id, msg.senderJid);
try {
await pool.sendMessage(
accountId,
msg.senderJid,
`Manage your data: ${PORTAL_BASE}/onboard?token=${onboardingToken}`,
);
} catch (err) {
logger.warn({ err, jid: msg.senderJid }, 'Failed to send PORTAL link DM');
}
return true;
}
async function handleCommands(
msg: NormalizedMessage,
accountId: string,
pool: WhatsAppSessionPool,
): Promise<boolean> {
try {
await pool.sendMessage(
accountId,
msg.senderJid,
'TOWER commands: STOP (opt out), START (rejoin), PORTAL (get your data link).',
);
} catch (err) {
logger.warn({ err, jid: msg.senderJid }, 'Failed to send COMMANDS reply');
}
return true;
}
async function issueOnboardingToken(
prisma: any,
tenantId: string,
groupId: string,
jid: string,
): Promise<string> {
// For Phase 2B the onboarding token is the base64url({groupId, jid, tenantId}) shape.
// The API re-verifies this token (decodes the payload) and trusts the OTP step
// for actual authentication. In a future phase we can sign with JWT_SECRET here.
const payload = { tenantId, groupId, jid };
return Buffer.from(JSON.stringify(payload), 'utf8').toString('base64url');
}
+193 -62
View File
@@ -1,45 +1,48 @@
import { syncGroups } from './group-sync';
import { GroupMetadata } from '@whiskeysockets/baileys';
const makeGroup = (id: string, name: string, desc?: string, participants?: any[]): GroupMetadata => ({
id,
subject: name,
desc,
participants: participants ?? [],
creation: 0,
owner: undefined,
restrict: false,
announce: false,
subjectOwner: undefined,
subjectTime: 0,
size: 0,
ephemeralDuration: 0,
inviteCode: undefined,
});
const mockGroups: Record<string, GroupMetadata> = {
'120363043312345678@g.us': {
id: '120363043312345678@g.us',
subject: 'UP Parivar Dallas',
desc: 'Main community group',
participants: [],
creation: 0,
owner: undefined,
restrict: false,
announce: false,
subjectOwner: undefined,
subjectTime: 0,
size: 0,
ephemeralDuration: 0,
inviteCode: undefined,
},
'999999999@g.us': {
id: '999999999@g.us',
subject: 'Events Committee',
desc: undefined,
participants: [],
creation: 0,
owner: undefined,
restrict: false,
announce: false,
subjectOwner: undefined,
subjectTime: 0,
size: 0,
ephemeralDuration: 0,
inviteCode: undefined,
},
'120363043312345678@g.us': makeGroup('120363043312345678@g.us', 'UP Parivar Dallas', 'Main community group', [
{ id: 'superadmin@s.whatsapp.net', admin: 'superadmin' },
{ id: 'admin@s.whatsapp.net', admin: 'admin' },
]),
'999999999@g.us': makeGroup('999999999@g.us', 'Events Committee', undefined, [
{ id: 'creator@s.whatsapp.net', admin: 'superadmin' },
]),
};
const mockPrisma = {
group: {
findUnique: jest.fn().mockResolvedValue(null),
findMany: jest.fn().mockResolvedValue([]),
upsert: jest.fn(),
update: jest.fn(),
},
tenant: { findFirst: jest.fn().mockResolvedValue({ id: 'tenant-system' }) },
groupClaimToken: { create: jest.fn().mockResolvedValue({ id: 'tok_1' }) },
auditEvent: { create: jest.fn().mockResolvedValue(undefined) },
};
const mockPool = {
sendMessage: jest.fn().mockResolvedValue(undefined),
} as any;
describe('syncGroups', () => {
beforeEach(() => jest.clearAllMocks());
@@ -48,59 +51,187 @@ describe('syncGroups', () => {
.mockResolvedValueOnce({ id: 'db-group-1' })
.mockResolvedValueOnce({ id: 'db-group-2' });
const result = await syncGroups(mockGroups, 'account-1', mockPrisma as any);
const result = await syncGroups(mockGroups, 'account-1', mockPrisma as any, mockPool);
expect(mockPrisma.group.upsert).toHaveBeenCalledTimes(2);
expect(result.get('120363043312345678@g.us')).toBe('db-group-1');
expect(result.get('999999999@g.us')).toBe('db-group-2');
});
it('calls upsert with correct create payload', async () => {
it('upserts with no claimStatus, isActive, and accountId', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
await syncGroups(
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
'account-1',
mockPrisma as any,
);
expect(mockPrisma.group.upsert).toHaveBeenCalledWith({
where: { platform_platformId: { platform: 'whatsapp', platformId: '120363043312345678@g.us' } },
create: {
platform: 'whatsapp',
platformId: '120363043312345678@g.us',
name: 'UP Parivar Dallas',
description: 'Main community group',
isActive: true,
accountId: 'account-1',
},
update: {
name: 'UP Parivar Dallas',
description: 'Main community group',
accountId: 'account-1',
},
});
});
it('handles groups with no description', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-2' });
await syncGroups(
{ '999999999@g.us': mockGroups['999999999@g.us'] },
'account-1',
mockPrisma as any,
mockPool,
);
expect(mockPrisma.group.upsert).toHaveBeenCalledWith(
expect.objectContaining({
create: expect.objectContaining({ description: undefined, accountId: 'account-1' }),
where: { platform_platformId: { platform: 'whatsapp', platformId: '120363043312345678@g.us' } },
create: expect.objectContaining({
isActive: true,
accountId: 'account-1',
}),
update: expect.objectContaining({
name: 'UP Parivar Dallas',
accountId: 'account-1',
}),
}),
);
// No claimStatus or claimExpiresAt in upsert
expect(mockPrisma.group.upsert.mock.calls[0][0].create).not.toHaveProperty('claimStatus');
});
it('sends intro message AND DMs claim link to superadmin on new group', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
await syncGroups(
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
'account-1',
mockPrisma as any,
mockPool,
);
// Intro message to group
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'account-1',
'120363043312345678@g.us',
expect.stringContaining("I'm TOWER"),
);
// Claim link DM to superadmin
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'account-1',
'superadmin@s.whatsapp.net',
expect.stringContaining('tower.app/claim-group?token='),
);
});
it('generates GroupClaimToken for new group', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
await syncGroups(
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
'account-1',
mockPrisma as any,
mockPool,
);
expect(mockPrisma.groupClaimToken.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
groupId: 'db-group-1',
creatorJid: 'superadmin@s.whatsapp.net',
}),
}),
);
});
it('returns an empty map when given empty groups', async () => {
const result = await syncGroups({}, 'account-1', mockPrisma as any);
it('emits GROUP_CLAIM_TOKEN_SENT audit event', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
await syncGroups(
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
'account-1',
mockPrisma as any,
mockPool,
);
expect(mockPrisma.auditEvent.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
action: 'GROUP_CLAIM_TOKEN_SENT',
resourceType: 'Group',
resourceId: 'db-group-1',
actorType: 'SYSTEM',
}),
}),
);
});
it('does NOT send claim link when group is already claimed (bot re-added)', async () => {
mockPrisma.group.findUnique.mockResolvedValue({ id: 'db-group-1', tenantId: 'tnt-A', accountId: 'old-account' });
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
await syncGroups(
{ '120363043312345678@g.us': mockGroups['120363043312345678@g.us'] },
'account-1',
mockPrisma as any,
mockPool,
);
// Intro still sent
expect(mockPool.sendMessage).toHaveBeenCalledWith(
'account-1',
'120363043312345678@g.us',
expect.any(String),
);
// But no token generated
expect(mockPrisma.groupClaimToken.create).not.toHaveBeenCalled();
});
it('returns empty map when given empty groups', async () => {
const result = await syncGroups({}, 'account-1', mockPrisma as any, mockPool);
expect(result.size).toBe(0);
expect(mockPrisma.group.upsert).not.toHaveBeenCalled();
});
// --- removal / re-add detection -------------------------------------------
it('marks groups inactive and emits GROUP_BOT_REMOVED when bot is removed', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-current' });
mockPrisma.group.findMany
.mockResolvedValueOnce([{ id: 'db-group-stale', platformId: 'old-group@g.us', tenantId: 'tnt-A', name: 'Old Group' }])
.mockResolvedValueOnce([]);
await syncGroups(mockGroups, 'account-1', mockPrisma as any, mockPool);
expect(mockPrisma.group.update).toHaveBeenCalledWith(
expect.objectContaining({ where: { id: 'db-group-stale' }, data: { isActive: false } }),
);
expect(mockPrisma.auditEvent.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
action: 'GROUP_BOT_REMOVED',
tenantId: 'tnt-A',
resourceId: 'db-group-stale',
}),
}),
);
});
it('marks groups active and emits GROUP_BOT_RE_ADDED when bot is re-added', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
mockPrisma.group.findMany
.mockResolvedValueOnce([])
.mockResolvedValueOnce([{ id: 'db-group-returned', platformId: '120363043312345678@g.us', tenantId: 'tnt-A', name: 'Returned Group' }]);
await syncGroups(mockGroups, 'account-1', mockPrisma as any, mockPool);
expect(mockPrisma.group.update).toHaveBeenCalledWith(
expect.objectContaining({ where: { id: 'db-group-returned' }, data: { isActive: true } }),
);
expect(mockPrisma.auditEvent.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
action: 'GROUP_BOT_RE_ADDED',
tenantId: 'tnt-A',
resourceId: 'db-group-returned',
}),
}),
);
});
it('does not touch any groups when all current groups are accounted for', async () => {
mockPrisma.group.upsert.mockResolvedValue({ id: 'db-group-1' });
mockPrisma.group.findMany.mockResolvedValue([]);
await syncGroups(mockGroups, 'account-1', mockPrisma as any, mockPool);
expect(mockPrisma.group.update).not.toHaveBeenCalled();
});
});
+129
View File
@@ -1,16 +1,32 @@
import { randomBytes } from 'crypto';
import { GroupMetadata } from '@whiskeysockets/baileys';
import { createLogger } from '@tower/logger';
import { WhatsAppSessionPool } from './session-pool';
const logger = createLogger('group-sync');
const TOKEN_TTL_MS = 48 * 60 * 60 * 1000;
const INTRO_MESSAGE =
"Hi, I'm TOWER. I'll be archiving messages from this group.";
const CLAIM_DM = (groupName: string, token: string) =>
`TOWER was added to "${groupName}". Claim it here (one-time, expires 48h): tower.app/claim-group?token=${token}`;
export async function syncGroups(
groups: Record<string, GroupMetadata>,
accountId: string,
prisma: any,
pool: WhatsAppSessionPool,
): Promise<Map<string, string>> {
const jidToDbId = new Map<string, string>();
for (const [jid, meta] of Object.entries(groups)) {
const existing = await prisma.group.findUnique({
where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
select: { id: true, tenantId: true, accountId: true },
});
const group = await prisma.group.upsert({
where: { platform_platformId: { platform: 'whatsapp', platformId: jid } },
create: {
@@ -28,8 +44,121 @@ export async function syncGroups(
},
});
jidToDbId.set(jid, group.id);
// New or re-added group — send intro + DM claim link to superadmin
if (!existing || existing.accountId !== accountId) {
logger.info({ jid, groupId: group.id, name: meta.subject }, 'New group detected — sending intro');
// Intro message to group
try {
await pool.sendMessage(accountId, jid, INTRO_MESSAGE);
} catch (err) {
logger.warn({ jid, err }, 'Failed to post intro message');
}
// If the group is already claimed (bot re-added), don't generate a new token
if (existing?.tenantId) {
logger.info({ jid, groupId: group.id, tenantId: existing.tenantId }, 'Group already claimed — skipping token');
continue;
}
// Find superadmin (the group creator)
const superadmin = meta.participants?.find((p: any) => p.admin === 'superadmin');
if (!superadmin) {
logger.warn({ jid, groupId: group.id }, 'No superadmin found in group metadata — cannot send claim link');
continue;
}
// Generate claim token
const token = randomBytes(32).toString('hex');
await prisma.groupClaimToken.create({
data: {
groupId: group.id,
token,
creatorJid: superadmin.id,
expiresAt: new Date(Date.now() + TOKEN_TTL_MS),
},
});
// DM the superadmin
try {
await pool.sendMessage(accountId, superadmin.id, CLAIM_DM(meta.subject, token));
logger.info({ jid, groupId: group.id, superadminJid: superadmin.id }, 'Claim link DM sent');
} catch (err) {
logger.warn({ jid, superadminJid: superadmin.id, err }, 'Failed to DM claim link to superadmin');
}
// Audit event
const auditTenantId = await firstTenantId(prisma);
await prisma.auditEvent
.create({
data: {
tenantId: auditTenantId ?? 'system',
actorType: 'SYSTEM',
action: 'GROUP_CLAIM_TOKEN_SENT',
resourceType: 'Group',
resourceId: group.id,
payload: { jid, name: meta.subject, superadminJid: superadmin.id },
},
})
.catch((err: unknown) => logger.warn({ err }, 'Failed to write GROUP_CLAIM_TOKEN_SENT audit event'));
}
}
// --- Detect removed groups ------------------------------------------------
const currentJids = new Set(Object.keys(groups));
const activeDbGroups = await prisma.group.findMany({
where: { accountId, isActive: true },
select: { id: true, platformId: true, tenantId: true, name: true },
});
const removedGroups = activeDbGroups.filter((g: any) => !currentJids.has(g.platformId));
for (const g of removedGroups) {
logger.info({ groupId: g.id, name: g.name, platformId: g.platformId }, 'Bot removed from group');
await prisma.group.update({ where: { id: g.id }, data: { isActive: false } });
await prisma.auditEvent
.create({
data: {
tenantId: g.tenantId ?? (await firstTenantId(prisma)) ?? 'system',
actorType: 'SYSTEM',
action: 'GROUP_BOT_REMOVED',
resourceType: 'Group',
resourceId: g.id,
payload: { name: g.name, platformId: g.platformId },
},
})
.catch((err: unknown) => logger.warn({ err }, 'Failed to write GROUP_BOT_REMOVED audit event'));
}
// --- Detect re-added groups -----------------------------------------------
const inactiveDbGroups = await prisma.group.findMany({
where: { accountId, isActive: false },
select: { id: true, platformId: true, tenantId: true, name: true },
});
const reAddedGroups = inactiveDbGroups.filter((g: any) => currentJids.has(g.platformId));
for (const g of reAddedGroups) {
logger.info({ groupId: g.id, name: g.name, platformId: g.platformId }, 'Bot re-added to group');
await prisma.group.update({ where: { id: g.id }, data: { isActive: true } });
await prisma.auditEvent
.create({
data: {
tenantId: g.tenantId ?? (await firstTenantId(prisma)) ?? 'system',
actorType: 'SYSTEM',
action: 'GROUP_BOT_RE_ADDED',
resourceType: 'Group',
resourceId: g.id,
payload: { name: g.name, platformId: g.platformId },
},
})
.catch((err: unknown) => logger.warn({ err }, 'Failed to write GROUP_BOT_RE_ADDED audit event'));
}
logger.info({ count: jidToDbId.size, accountId }, 'Groups synced');
return jidToDbId;
}
async function firstTenantId(prisma: any): Promise<string | null> {
const t = await prisma.tenant.findFirst({ select: { id: true } });
return t?.id ?? null;
}
@@ -0,0 +1,103 @@
import { matchContentRules, matchReactionRules, TenantRuleRow } from './match-rules';
const makeRule = (overrides: Partial<TenantRuleRow> = {}): TenantRuleRow => ({
id: 'r1',
matchType: 'HASHTAG',
matchValue: '#gooo',
action: 'FLAG',
priority: 0,
...overrides,
});
describe('matchContentRules', () => {
it('matches hashtag at start of message', () => {
const result = matchContentRules('#gooo hello', [makeRule()]);
expect(result.tags).toContain('#gooo');
expect(result.effectiveAction).toBe('FLAG');
});
it('matches hashtag in middle of message', () => {
const result = matchContentRules('hello #gooo world', [makeRule()]);
expect(result.tags).toContain('#gooo');
});
it('matches hashtag at end of message', () => {
const result = matchContentRules('hello #gooo', [makeRule()]);
expect(result.tags).toContain('#gooo');
});
it('matches hashtag with punctuation after', () => {
const result = matchContentRules('check #gooo, please', [makeRule()]);
expect(result.tags).toContain('#gooo');
});
it('does not match hashtag as part of another word', () => {
const result = matchContentRules('this is #goooogle', [makeRule()]);
expect(result.tags).not.toContain('#gooo');
});
it('does not match hashtag attached to preceding text', () => {
const result = matchContentRules('foo#gooo bar', [makeRule()]);
expect(result.tags).not.toContain('#gooo');
});
it('matches PREFIX rule when text starts with prefix', () => {
const result = matchContentRules('/tower help', [
makeRule({ matchType: 'PREFIX', matchValue: '/tower', action: 'FLAG' }),
]);
expect(result.tags).toContain('/tower...');
});
it('does not match PREFIX when text does not start with it', () => {
const result = matchContentRules('do /tower help', [
makeRule({ matchType: 'PREFIX', matchValue: '/tower', action: 'FLAG' }),
]);
expect(result.tags).toEqual([]);
});
it('returns no matches when no rules match', () => {
const result = matchContentRules('hello world', [makeRule()]);
expect(result.tags).toEqual([]);
expect(result.effectiveAction).toBeNull();
});
it('SKIP takes precedence over FLAG', () => {
const result = matchContentRules('#gooo event', [
makeRule({ matchValue: '#gooo', action: 'FLAG', priority: 0 }),
makeRule({ id: 'r2', matchValue: '#gooo', action: 'SKIP', priority: 1, matchType: 'HASHTAG' }),
]);
expect(result.effectiveAction).toBe('SKIP');
});
it('REJECT takes precedence over AUTO_APPROVE', () => {
const result = matchContentRules('#gooo event', [
makeRule({ matchValue: '#gooo', action: 'AUTO_APPROVE', priority: 0 }),
makeRule({ id: 'r2', matchValue: '#gooo', action: 'REJECT', priority: 1, matchType: 'HASHTAG' }),
]);
expect(result.effectiveAction).toBe('REJECT');
});
it('AUTO_APPROVE takes precedence over FLAG', () => {
const result = matchContentRules('#gooo event', [
makeRule({ matchValue: '#gooo', action: 'FLAG', priority: 0 }),
makeRule({ id: 'r2', matchValue: '#gooo', action: 'AUTO_APPROVE', priority: 1, matchType: 'HASHTAG' }),
]);
expect(result.effectiveAction).toBe('AUTO_APPROVE');
});
});
describe('matchReactionRules', () => {
it('matches reaction emoji', () => {
const result = matchReactionRules('⭐', [
makeRule({ matchType: 'REACTION_EMOJI', matchValue: '⭐', action: 'AUTO_APPROVE' }),
]);
expect(result).toEqual({ action: 'AUTO_APPROVE' });
});
it('returns null when no reaction rule matches', () => {
const result = matchReactionRules('👍', [
makeRule({ matchType: 'REACTION_EMOJI', matchValue: '⭐', action: 'AUTO_APPROVE' }),
]);
expect(result).toBeNull();
});
});
+109
View File
@@ -0,0 +1,109 @@
/**
* DB-driven rule matching — replaces the old hardcoded tag-detector.ts.
* Loads active TenantRule rows from the DB and matches them against
* message content (for HASHTAG / PREFIX rules) or reaction emoji (for REACTION_EMOJI rules).
*/
export interface TenantRuleRow {
id: string;
matchType: 'HASHTAG' | 'PREFIX' | 'REACTION_EMOJI';
matchValue: string;
action: 'FLAG' | 'AUTO_APPROVE' | 'SKIP' | 'REJECT';
priority: number;
}
export interface MatchResult {
tags: string[];
effectiveAction: 'FLAG' | 'AUTO_APPROVE' | 'SKIP' | 'REJECT' | null;
}
/**
* Match message content against a set of HASHTAG and PREFIX rules.
* Returns detected tags and the most important effective action.
* SKIP and REJECT take precedence over AUTO_APPROVE and FLAG.
* Within the same precedence tier, the highest priority (lowest number) wins.
*/
export function matchContentRules(
text: string,
rules: TenantRuleRow[],
): MatchResult {
const tags: string[] = [];
const relevantRules = rules.filter(
(r) => r.matchType === 'HASHTAG' || r.matchType === 'PREFIX',
);
let effectiveAction: MatchResult['effectiveAction'] = null;
// Action precedence: SKIP > REJECT > AUTO_APPROVE > FLAG
const actionRank: Record<string, number> = {
SKIP: 4,
REJECT: 3,
AUTO_APPROVE: 2,
FLAG: 1,
};
for (const rule of relevantRules) {
let matched = false;
if (rule.matchType === 'HASHTAG') {
// Match the hashtag as a standalone token.
// `\b` doesn't work here because `#` is a non-word character,
// so `\b#gooo` fails at the start of a message. Use `(?:^|\W)`
// to allow start-of-string or a non-word char before the hashtag.
const escaped = escapeRegex(rule.matchValue);
const pattern = new RegExp(`(?:^|\\W)${escaped}(?:$|\\W)`, 'i');
if (pattern.test(text)) {
tags.push(rule.matchValue);
matched = true;
}
} else if (rule.matchType === 'PREFIX') {
// Match if text starts with the prefix
if (text.trimStart().startsWith(rule.matchValue)) {
tags.push(`${rule.matchValue}...`);
matched = true;
}
}
if (matched) {
const rank = actionRank[rule.action] ?? 0;
const currentRank = effectiveAction ? (actionRank[effectiveAction] ?? 0) : 0;
if (rank > currentRank) {
effectiveAction = rule.action;
}
}
}
return { tags, effectiveAction };
}
/**
* Match a reaction emoji against REACTION_EMOJI rules.
* Returns the first matching rule action, or null.
*/
/**
* Strip Unicode variation selectors (U+FE0FU+FE0F) and other
* formatting codepoints that can differ between platforms.
*/
function stripVariationSelectors(s: string): string {
return [...s].filter((c) => c.codePointAt(0) !== 0xfe0f).join('');
}
export function matchReactionRules(
emoji: string,
rules: TenantRuleRow[],
): { action: 'FLAG' | 'AUTO_APPROVE' | 'SKIP' | 'REJECT' } | null {
const reactionRules = rules.filter((r) => r.matchType === 'REACTION_EMOJI');
const cleanEmoji = stripVariationSelectors(emoji);
for (const rule of reactionRules) {
if (cleanEmoji === stripVariationSelectors(rule.matchValue)) {
return { action: rule.action };
}
}
return null;
}
function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
+4 -4
View File
@@ -46,6 +46,7 @@ export function normalizeMessage(
export function normalizeReaction(
msg: proto.IWebMessageInfo,
accountId: string,
selfJid?: string,
): NormalizedReaction | null {
const key = msg.key;
if (!key) return null;
@@ -53,8 +54,7 @@ export function normalizeReaction(
const remoteJid = key.remoteJid ?? '';
// only group messages (group JIDs end with @g.us)
if (!remoteJid.endsWith('@g.us')) return null;
// skip our own outgoing messages
if (key.fromMe) return null;
// Allow fromMe reactions — admin may star from the connected account
const reaction = msg.message?.reactionMessage;
if (!reaction) return null;
@@ -62,8 +62,8 @@ export function normalizeReaction(
const targetMsgId = reaction.key?.id;
if (!targetMsgId) return null;
// Ensure reactorJid is not empty (group message must have a participant)
const reactorJid = key.participant;
// For fromMe reactions Baileys uses LID internally; use selfJid (PN format) to match TOWER_ADMIN_JIDS
const reactorJid = key.fromMe ? (selfJid ?? key.participant) : key.participant;
if (!reactorJid) return null;
return {
+57
View File
@@ -0,0 +1,57 @@
import { WhatsAppSessionPool } from './session-pool';
import { createLogger } from '@tower/logger';
const POLL_INTERVAL_MS = 5_000;
const MAX_ATTEMPTS = 3;
const OTP_MESSAGE = (code: string): string =>
`Your TOWER verification code is ${code}. It expires in 5 minutes. Reply STOP to opt out.`;
export function startOtpSenderLoop(
prisma: any,
pool: WhatsAppSessionPool,
logger: ReturnType<typeof createLogger>,
): void {
let running = false;
const tick = async (): Promise<void> => {
if (running) return;
running = true;
try {
const pending = await prisma.otpChallenge.findMany({
where: { sentAt: null, expiresAt: { gt: new Date() } },
orderBy: { createdAt: 'asc' },
take: 10,
});
for (const challenge of pending) {
const accounts = await prisma.account.findMany({
where: { isBot: true, status: 'ACTIVE' },
select: { id: true },
});
if (accounts.length === 0) {
logger.warn({ challengeId: challenge.id }, 'No active bot — OTP delivery deferred');
continue;
}
const account = accounts[0];
try {
await pool.sendMessage(account.id, challenge.jid, OTP_MESSAGE(challenge.code));
await prisma.otpChallenge.update({
where: { id: challenge.id },
data: { sentAt: new Date() },
});
logger.info({ challengeId: challenge.id, jid: challenge.jid }, 'OTP sent');
} catch (err) {
logger.error({ err, challengeId: challenge.id }, 'Failed to send OTP');
}
}
} catch (err) {
logger.error({ err }, 'otp-sender tick failed');
} finally {
running = false;
}
};
setTimeout(() => void tick(), 3_000);
setInterval(() => void tick(), POLL_INTERVAL_MS);
logger.info(`otp-sender loop scheduled (every ${POLL_INTERVAL_MS / 1000}s)`);
}
+3 -2
View File
@@ -37,7 +37,7 @@ export async function createWhatsAppSession(
version,
auth: state,
printQRInTerminal: false,
logger: logger as any,
logger: { level: 'silent', trace: () => {}, debug: () => {}, info: () => {}, warn: () => {}, error: () => {}, fatal: () => {}, child: () => ({ level: 'silent', trace: () => {}, debug: () => {}, info: () => {}, warn: () => {}, error: () => {}, fatal: () => {}, child: () => ({} as any) }) } as any,
});
sock.ev.on('creds.update', saveCreds);
@@ -91,10 +91,11 @@ export async function createWhatsAppSession(
});
sock.ev.on('messages.upsert', ({ messages, type }) => {
logger.info({ type, count: messages.length }, 'messages.upsert received');
if (type !== 'notify') return;
for (const msg of messages) {
if (msg.message?.reactionMessage) {
const reaction = normalizeReaction(msg, accountId);
const reaction = normalizeReaction(msg, accountId, sock.user?.id);
if (reaction) {
void Promise.resolve(onReaction(reaction)).catch((err) =>
logger.error({ err }, 'Error processing reaction'),