Skip to main content
Follow these best practices to build reliable, secure, and maintainable inbound email processing systems.

Architecture

Process Asynchronously

Don’t block webhook responses with long-running processing:
// Good: Respond immediately, process asynchronously
app.post('/webhooks/inbound', express.json(), async (req, res) => {
  // Acknowledge receipt immediately
  res.sendStatus(200);

  // Queue each event for async processing
  for (const event of req.body) {
    const relay = event.msys?.relay_message;
    if (relay) {
      await emailQueue.add('process-inbound', relay);
    }
  }
});

// Process in background
emailQueue.process('process-inbound', async (job) => {
  const relay = job.data;
  await processEmail(relay);
});
// Bad: Blocking the webhook response
app.post('/webhooks/inbound', express.json(), async (req, res) => {
  for (const event of req.body) {
    const relay = event.msys?.relay_message;
    if (!relay) continue;

    // This might timeout
    await downloadAllAttachments(relay);
    await processEmail(relay);
    await sendNotifications(relay);
    await updateDatabase(relay);
  }

  res.sendStatus(200);
});

Implement Idempotency

Handle duplicate webhooks gracefully:
import Redis from 'ioredis';
const redis = new Redis();

async function processEmailOnce(email) {
  const key = `processed:${email.id}`;

  // Try to set the key (only succeeds if it doesn't exist)
  const isNew = await redis.set(key, '1', 'EX', 86400, 'NX');

  if (!isNew) {
    console.log(`Already processed: ${email.id}`);
    return { skipped: true };
  }

  // Process the email
  await processEmail(email);
  return { processed: true };
}

Use Separate Queues by Priority

import Queue from 'bull';

const highPriorityQueue = new Queue('email-high', redisUrl);
const normalQueue = new Queue('email-normal', redisUrl);
const lowPriorityQueue = new Queue('email-low', redisUrl);

function routeToQueue(email) {
  if (isVipCustomer(email.from)) {
    return highPriorityQueue;
  }
  if (analyzeForSpam(email).score > 3) {
    return lowPriorityQueue;
  }
  return normalQueue;
}

// Process with different concurrency
highPriorityQueue.process(10, processEmail);
normalQueue.process(5, processEmail);
lowPriorityQueue.process(2, processEmail);

Reliability

Store Before Processing

Save the raw email before any processing:
app.post('/webhooks/inbound', express.json(), async (req, res) => {
  // Store raw payload first
  await storeRawPayload({
    payload: req.body,
    receivedAt: new Date()
  });

  res.sendStatus(200);

  // Then process each event (can be retried from stored data if it fails)
  for (const event of req.body) {
    const relay = event.msys?.relay_message;
    if (!relay) continue;

    try {
      await processEmail(relay);
      await markAsProcessed(relay.rcpt_to, relay.msg_from);
    } catch (error) {
      await markAsFailed(relay, error);
      await scheduleRetry(relay);
    }
  }
});

Implement Retry Logic

class EmailProcessor {
  constructor() {
    this.maxRetries = 3;
    this.retryDelays = [1000, 5000, 30000]; // 1s, 5s, 30s
  }

  async processWithRetry(email, attempt = 0) {
    try {
      await this.process(email);
      return { success: true };
    } catch (error) {
      if (attempt < this.maxRetries && this.isRetryable(error)) {
        await sleep(this.retryDelays[attempt]);
        return this.processWithRetry(email, attempt + 1);
      }

      await this.handleFailure(email, error);
      return { success: false, error: error.message };
    }
  }

  isRetryable(error) {
    // Retry network errors and 5xx responses
    return error.code === 'ECONNREFUSED' ||
           error.code === 'ETIMEDOUT' ||
           (error.status >= 500 && error.status < 600);
  }

  async handleFailure(email, error) {
    await storeFailedEmail(email, error);
    await alertOpsTeam(email, error);
  }
}

Handle Attachment Expiration

Download attachments promptly before URLs expire:
app.post('/webhooks/inbound', express.json(), async (req, res) => {
  res.sendStatus(200);

  for (const event of req.body) {
    const relay = event.msys?.relay_message;
    if (!relay) continue;

    // Parse raw MIME to extract attachments
    const parsed = await simpleParser(relay.content.email_rfc822);
    const attachments = parsed.attachments || [];

    // Download attachments immediately (URLs expire in 24 hours)
    const savedAttachments = await Promise.all(
      attachments.map(async (attachment) => {
      try {
        const response = await fetch(attachment.url);
        const buffer = await response.arrayBuffer();

        const savedPath = await saveToStorage(
          attachment.filename,
          Buffer.from(buffer)
        );

        return { ...attachment, savedPath };
      } catch (error) {
        console.error(`Failed to download ${attachment.filename}:`, error);
        return { ...attachment, error: error.message };
      }
    })
  );

    // Process with saved attachments
    await processEmail({ relay, attachments: savedAttachments });
  }
});

Error Handling

Graceful Degradation

Handle partial failures gracefully:
async function processEmail(email) {
  const results = {
    stored: false,
    attachmentsProcessed: false,
    notificationsSent: false,
    errors: []
  };

  // Store email (critical)
  try {
    await storeEmail(email);
    results.stored = true;
  } catch (error) {
    results.errors.push({ stage: 'store', error: error.message });
    // Fail fast for critical operations
    throw error;
  }

  // Process attachments (non-critical)
  try {
    await processAttachments(email.attachments);
    results.attachmentsProcessed = true;
  } catch (error) {
    results.errors.push({ stage: 'attachments', error: error.message });
    // Continue despite failure
  }

  // Send notifications (non-critical)
  try {
    await sendNotifications(email);
    results.notificationsSent = true;
  } catch (error) {
    results.errors.push({ stage: 'notifications', error: error.message });
    // Continue despite failure
  }

  return results;
}

Structured Error Logging

function logError(context, error) {
  console.error(JSON.stringify({
    level: 'error',
    timestamp: new Date().toISOString(),
    context: {
      emailId: context.emailId,
      sender: context.from,
      stage: context.stage
    },
    error: {
      message: error.message,
      stack: error.stack,
      code: error.code
    }
  }));
}

Performance

Batch Database Operations

// Bad: Individual inserts
for (const email of emails) {
  await db.insert('emails', email);
}

// Good: Batch insert
await db.batchInsert('emails', emails);

Limit Concurrent Processing

import pLimit from 'p-limit';

const limit = pLimit(10); // Max 10 concurrent operations

async function processEmails(emails) {
  const promises = emails.map(email =>
    limit(() => processEmail(email))
  );

  return Promise.allSettled(promises);
}

Cache Common Lookups

import NodeCache from 'node-cache';

const customerCache = new NodeCache({ stdTTL: 300 }); // 5 minute TTL

async function getCustomer(email) {
  const domain = email.split('@')[1];

  let customer = customerCache.get(domain);
  if (!customer) {
    customer = await db.findCustomerByDomain(domain);
    if (customer) {
      customerCache.set(domain, customer);
    }
  }

  return customer;
}

Monitoring

Track Key Metrics

import { Counter, Histogram, Gauge } from 'prom-client';

const emailsReceived = new Counter({
  name: 'inbound_emails_total',
  help: 'Total inbound emails received',
  labelNames: ['domain', 'status']
});

const processingTime = new Histogram({
  name: 'inbound_email_processing_seconds',
  help: 'Email processing time',
  labelNames: ['type'],
  buckets: [0.1, 0.5, 1, 2, 5, 10]
});

const queueSize = new Gauge({
  name: 'inbound_email_queue_size',
  help: 'Current email queue size'
});

async function processEmail(email) {
  const timer = processingTime.startTimer({ type: 'full' });

  try {
    await doProcessing(email);
    emailsReceived.inc({ domain: email.domain, status: 'success' });
  } catch (error) {
    emailsReceived.inc({ domain: email.domain, status: 'error' });
    throw error;
  } finally {
    timer();
  }
}

Set Up Alerts

// Alert on high spam rate
async function checkSpamRate() {
  const recent = await getRecentEmails(100);
  const spamCount = recent.filter(e => e.isSpam).length;
  const spamRate = spamCount / recent.length;

  if (spamRate > 0.3) { // 30% spam
    await alertOps({
      type: 'high_spam_rate',
      rate: spamRate,
      message: `Spam rate is ${(spamRate * 100).toFixed(1)}%`
    });
  }
}

// Alert on processing failures
async function checkFailureRate() {
  const stats = await getProcessingStats('1h');

  if (stats.failureRate > 0.05) { // 5% failure
    await alertOps({
      type: 'high_failure_rate',
      rate: stats.failureRate,
      message: `Processing failure rate is ${(stats.failureRate * 100).toFixed(1)}%`
    });
  }
}

Testing

Test Webhook Handling

import { describe, it, expect } from 'vitest';
import request from 'supertest';
import app from './app';

describe('Inbound Webhook', () => {
  const validCredentials = Buffer.from('webhook_user:webhook_pass').toString('base64');

  it('should process valid relay event', async () => {
    const payload = [{
      msys: {
        relay_message: {
          msg_from: 'sender@example.com',
          rcpt_to: 'support@mail.example.com',
          friendly_from: 'Sender <sender@example.com>',
          content: {
            subject: 'Test email',
            text: 'Test content',
            headers: [
              { 'Message-ID': '<test-123@example.com>' }
            ],
            to: ['support@mail.example.com']
          }
        }
      }
    }];

    const response = await request(app)
      .post('/webhooks/inbound')
      .set('Authorization', `Basic ${validCredentials}`)
      .send(payload);

    expect(response.status).toBe(200);
  });

  it('should reject missing credentials', async () => {
    const response = await request(app)
      .post('/webhooks/inbound')
      .send([]);

    expect(response.status).toBe(401);
  });
});

Test Edge Cases

describe('Email Processing', () => {
  it('should handle missing fields', async () => {
    const email = {
      id: 'test_456',
      from: 'sender@example.com',
      to: ['support@example.com']
      // Missing subject, text, html
    };

    const result = await processEmail(email);
    expect(result.success).toBe(true);
  });

  it('should handle expired attachment URLs', async () => {
    const email = {
      id: 'test_789',
      attachments: [{
        filename: 'file.pdf',
        url: 'https://expired-url.com/file.pdf'
      }]
    };

    const result = await processEmail(email);
    expect(result.attachmentErrors).toHaveLength(1);
  });
});

Checklist

  • Configure inbound domain with MX records
  • Set up webhook endpoint with credential verification
  • Configure spam filtering sensitivity
  • Set up monitoring and alerting
  • Implement async processing (don’t block webhooks)
  • Store raw emails before processing
  • Implement idempotency (handle duplicates)
  • Download attachments promptly
  • Implement retry logic for failures
  • Verify webhook credentials
  • Validate and sanitize all inputs
  • Validate attachment types from content
  • Implement rate limiting
  • Scan attachments for malware
  • Monitor processing metrics
  • Set up alerts for anomalies
  • Log all processing for debugging
  • Plan for handling failures
  • Document your email routing rules