Architecture
Process Asynchronously
Don’t block webhook responses with long-running processing:Copy
// Good: Respond immediately, process asynchronously
app.post('/webhooks/inbound', express.json(), async (req, res) => {
// Acknowledge receipt immediately
res.sendStatus(200);
// Queue each event for async processing
for (const event of req.body) {
const relay = event.msys?.relay_message;
if (relay) {
await emailQueue.add('process-inbound', relay);
}
}
});
// Process in background
emailQueue.process('process-inbound', async (job) => {
const relay = job.data;
await processEmail(relay);
});
Copy
// Bad: Blocking the webhook response
app.post('/webhooks/inbound', express.json(), async (req, res) => {
for (const event of req.body) {
const relay = event.msys?.relay_message;
if (!relay) continue;
// This might timeout
await downloadAllAttachments(relay);
await processEmail(relay);
await sendNotifications(relay);
await updateDatabase(relay);
}
res.sendStatus(200);
});
Implement Idempotency
Handle duplicate webhooks gracefully:Copy
import Redis from 'ioredis';
const redis = new Redis();
async function processEmailOnce(email) {
const key = `processed:${email.id}`;
// Try to set the key (only succeeds if it doesn't exist)
const isNew = await redis.set(key, '1', 'EX', 86400, 'NX');
if (!isNew) {
console.log(`Already processed: ${email.id}`);
return { skipped: true };
}
// Process the email
await processEmail(email);
return { processed: true };
}
Use Separate Queues by Priority
Copy
import Queue from 'bull';
const highPriorityQueue = new Queue('email-high', redisUrl);
const normalQueue = new Queue('email-normal', redisUrl);
const lowPriorityQueue = new Queue('email-low', redisUrl);
function routeToQueue(email) {
if (isVipCustomer(email.from)) {
return highPriorityQueue;
}
if (analyzeForSpam(email).score > 3) {
return lowPriorityQueue;
}
return normalQueue;
}
// Process with different concurrency
highPriorityQueue.process(10, processEmail);
normalQueue.process(5, processEmail);
lowPriorityQueue.process(2, processEmail);
Reliability
Store Before Processing
Save the raw email before any processing:Copy
app.post('/webhooks/inbound', express.json(), async (req, res) => {
// Store raw payload first
await storeRawPayload({
payload: req.body,
receivedAt: new Date()
});
res.sendStatus(200);
// Then process each event (can be retried from stored data if it fails)
for (const event of req.body) {
const relay = event.msys?.relay_message;
if (!relay) continue;
try {
await processEmail(relay);
await markAsProcessed(relay.rcpt_to, relay.msg_from);
} catch (error) {
await markAsFailed(relay, error);
await scheduleRetry(relay);
}
}
});
Implement Retry Logic
Copy
class EmailProcessor {
constructor() {
this.maxRetries = 3;
this.retryDelays = [1000, 5000, 30000]; // 1s, 5s, 30s
}
async processWithRetry(email, attempt = 0) {
try {
await this.process(email);
return { success: true };
} catch (error) {
if (attempt < this.maxRetries && this.isRetryable(error)) {
await sleep(this.retryDelays[attempt]);
return this.processWithRetry(email, attempt + 1);
}
await this.handleFailure(email, error);
return { success: false, error: error.message };
}
}
isRetryable(error) {
// Retry network errors and 5xx responses
return error.code === 'ECONNREFUSED' ||
error.code === 'ETIMEDOUT' ||
(error.status >= 500 && error.status < 600);
}
async handleFailure(email, error) {
await storeFailedEmail(email, error);
await alertOpsTeam(email, error);
}
}
Handle Attachment Expiration
Download attachments promptly before URLs expire:Copy
app.post('/webhooks/inbound', express.json(), async (req, res) => {
res.sendStatus(200);
for (const event of req.body) {
const relay = event.msys?.relay_message;
if (!relay) continue;
// Parse raw MIME to extract attachments
const parsed = await simpleParser(relay.content.email_rfc822);
const attachments = parsed.attachments || [];
// Download attachments immediately (URLs expire in 24 hours)
const savedAttachments = await Promise.all(
attachments.map(async (attachment) => {
try {
const response = await fetch(attachment.url);
const buffer = await response.arrayBuffer();
const savedPath = await saveToStorage(
attachment.filename,
Buffer.from(buffer)
);
return { ...attachment, savedPath };
} catch (error) {
console.error(`Failed to download ${attachment.filename}:`, error);
return { ...attachment, error: error.message };
}
})
);
// Process with saved attachments
await processEmail({ relay, attachments: savedAttachments });
}
});
Error Handling
Graceful Degradation
Handle partial failures gracefully:Copy
async function processEmail(email) {
const results = {
stored: false,
attachmentsProcessed: false,
notificationsSent: false,
errors: []
};
// Store email (critical)
try {
await storeEmail(email);
results.stored = true;
} catch (error) {
results.errors.push({ stage: 'store', error: error.message });
// Fail fast for critical operations
throw error;
}
// Process attachments (non-critical)
try {
await processAttachments(email.attachments);
results.attachmentsProcessed = true;
} catch (error) {
results.errors.push({ stage: 'attachments', error: error.message });
// Continue despite failure
}
// Send notifications (non-critical)
try {
await sendNotifications(email);
results.notificationsSent = true;
} catch (error) {
results.errors.push({ stage: 'notifications', error: error.message });
// Continue despite failure
}
return results;
}
Structured Error Logging
Copy
function logError(context, error) {
console.error(JSON.stringify({
level: 'error',
timestamp: new Date().toISOString(),
context: {
emailId: context.emailId,
sender: context.from,
stage: context.stage
},
error: {
message: error.message,
stack: error.stack,
code: error.code
}
}));
}
Performance
Batch Database Operations
Copy
// Bad: Individual inserts
for (const email of emails) {
await db.insert('emails', email);
}
// Good: Batch insert
await db.batchInsert('emails', emails);
Limit Concurrent Processing
Copy
import pLimit from 'p-limit';
const limit = pLimit(10); // Max 10 concurrent operations
async function processEmails(emails) {
const promises = emails.map(email =>
limit(() => processEmail(email))
);
return Promise.allSettled(promises);
}
Cache Common Lookups
Copy
import NodeCache from 'node-cache';
const customerCache = new NodeCache({ stdTTL: 300 }); // 5 minute TTL
async function getCustomer(email) {
const domain = email.split('@')[1];
let customer = customerCache.get(domain);
if (!customer) {
customer = await db.findCustomerByDomain(domain);
if (customer) {
customerCache.set(domain, customer);
}
}
return customer;
}
Monitoring
Track Key Metrics
Copy
import { Counter, Histogram, Gauge } from 'prom-client';
const emailsReceived = new Counter({
name: 'inbound_emails_total',
help: 'Total inbound emails received',
labelNames: ['domain', 'status']
});
const processingTime = new Histogram({
name: 'inbound_email_processing_seconds',
help: 'Email processing time',
labelNames: ['type'],
buckets: [0.1, 0.5, 1, 2, 5, 10]
});
const queueSize = new Gauge({
name: 'inbound_email_queue_size',
help: 'Current email queue size'
});
async function processEmail(email) {
const timer = processingTime.startTimer({ type: 'full' });
try {
await doProcessing(email);
emailsReceived.inc({ domain: email.domain, status: 'success' });
} catch (error) {
emailsReceived.inc({ domain: email.domain, status: 'error' });
throw error;
} finally {
timer();
}
}
Set Up Alerts
Copy
// Alert on high spam rate
async function checkSpamRate() {
const recent = await getRecentEmails(100);
const spamCount = recent.filter(e => e.isSpam).length;
const spamRate = spamCount / recent.length;
if (spamRate > 0.3) { // 30% spam
await alertOps({
type: 'high_spam_rate',
rate: spamRate,
message: `Spam rate is ${(spamRate * 100).toFixed(1)}%`
});
}
}
// Alert on processing failures
async function checkFailureRate() {
const stats = await getProcessingStats('1h');
if (stats.failureRate > 0.05) { // 5% failure
await alertOps({
type: 'high_failure_rate',
rate: stats.failureRate,
message: `Processing failure rate is ${(stats.failureRate * 100).toFixed(1)}%`
});
}
}
Testing
Test Webhook Handling
Copy
import { describe, it, expect } from 'vitest';
import request from 'supertest';
import app from './app';
describe('Inbound Webhook', () => {
const validCredentials = Buffer.from('webhook_user:webhook_pass').toString('base64');
it('should process valid relay event', async () => {
const payload = [{
msys: {
relay_message: {
msg_from: 'sender@example.com',
rcpt_to: 'support@mail.example.com',
friendly_from: 'Sender <sender@example.com>',
content: {
subject: 'Test email',
text: 'Test content',
headers: [
{ 'Message-ID': '<test-123@example.com>' }
],
to: ['support@mail.example.com']
}
}
}
}];
const response = await request(app)
.post('/webhooks/inbound')
.set('Authorization', `Basic ${validCredentials}`)
.send(payload);
expect(response.status).toBe(200);
});
it('should reject missing credentials', async () => {
const response = await request(app)
.post('/webhooks/inbound')
.send([]);
expect(response.status).toBe(401);
});
});
Test Edge Cases
Copy
describe('Email Processing', () => {
it('should handle missing fields', async () => {
const email = {
id: 'test_456',
from: 'sender@example.com',
to: ['support@example.com']
// Missing subject, text, html
};
const result = await processEmail(email);
expect(result.success).toBe(true);
});
it('should handle expired attachment URLs', async () => {
const email = {
id: 'test_789',
attachments: [{
filename: 'file.pdf',
url: 'https://expired-url.com/file.pdf'
}]
};
const result = await processEmail(email);
expect(result.attachmentErrors).toHaveLength(1);
});
});
Checklist
Setup
Setup
- Configure inbound domain with MX records
- Set up webhook endpoint with credential verification
- Configure spam filtering sensitivity
- Set up monitoring and alerting
Processing
Processing
- Implement async processing (don’t block webhooks)
- Store raw emails before processing
- Implement idempotency (handle duplicates)
- Download attachments promptly
- Implement retry logic for failures
Security
Security
- Verify webhook credentials
- Validate and sanitize all inputs
- Validate attachment types from content
- Implement rate limiting
- Scan attachments for malware
Operations
Operations
- Monitor processing metrics
- Set up alerts for anomalies
- Log all processing for debugging
- Plan for handling failures
- Document your email routing rules