Technology Apr 29, 2026 · 11 min read

How I Built a Real Time DDoS Detection Engine from Scratch

Imagine you run a cloud storage platform. Thousands of users upload files, share documents, and log in every day. Then one afternoon, traffic suddenly spikes thousands of requests per second hammering your server from a single IP address. Your server slows down. Legitimate users can't log in. You'r...

DE
DEV Community
by marlinekhavele
How I Built a Real Time DDoS Detection Engine from Scratch

Imagine you run a cloud storage platform. Thousands of users upload files, share documents, and log in every day. Then one afternoon, traffic suddenly spikes thousands of requests per second hammering your server from a single IP address. Your server slows down. Legitimate users can't log in. You're under attack.

The traditional answer is Fail2Ban a tool that watches logs and blocks IPs. But what if you had to build that yourself, from first principles? That's exactly what this project is a custom anomaly detection daemon that watches HTTP traffic in real time, learns what "normal" looks like, and automatically blocks attackers via iptables.
No Fail2Ban. No rate limiting libraries. Just Python, math, and Linux.

What the System Does

  1. Nginx receives all incoming HTTP requests and writes each one as a JSON log line
  2. The detector daemon tails that log file continuously, line by line
  3. For every request, it asks:"Is this IP behaving abnormally compared to recent history?"
  4. If yes, it adds an iptables DROP ruleto block that IP at the kernel level
  5. It sends a Slack alertso the team knows what happened
  6. After a timeout (10 minutes, 30 minutes, 2 hours, or permanent depending on repeat offenders), it automatically lifts the ban
  7. A live web dashboard shows banned IPs, traffic rates, and system health in real time

Part 1: Reading the Logs The Monitor

Before we can detect anything, we need to read the Nginx access log. Nginx writes one line per HTTP request. We configure it to write in JSON format so parsing is clean:

{
  "source_ip": "1.2.3.4",
  "timestamp": "2025-01-01T12:00:00+00:00",
  "method": "GET",
  "path": "/index.php",
  "status": 200,
  "response_size": 1024
}

The monitor runs as a background thread and tails this file continuously. Here's the core idea:

while True:
    line = file.readline()
    if not line:
        time.sleep(0.1)   # nothing new yet, wait a moment
        continue
    entry = parse(line)   # turn JSON into a LogEntry object
    queue.put(entry)      # hand it off to the detector

Why a queue? The monitor's job is purely I/O reading lines as fast as they arrive. The detector's job is CPU work running math on each entry. Separating them with a queue means they run independently. If detection is briefly slow, the queue buffers entries and nothing is lost.

Handling log rotation: Nginx periodically creates a new log file. If we don't handle this, our tail would keep reading the old file and miss all new traffic. We detect rotation by watching the file's inode a unique ID the operating system assigns to each file. If the inode changes, we reopen the file from the beginning.

Part 2: The Sliding Window Counting Requests Without Gaps

The most important question is: "How many requests has this IP sent in the last 60 seconds?"

The naive approach is to keep a counter per IP and reset it every minute. But that creates a blind spot if an attacker sends 1000 requests at 00:59 and 1000 more at 01:01, each resets within its own minute and you never see the true burst.

The correct approach is a sliding window: track the exact timestamp of every request, and at any moment count only those within the last 60 seconds.

We use Python's collections.deque for this:

from collections import deque

class SlidingWindowCounter:
    def __init__(self, window_seconds=60):
        self.window_seconds = window_seconds
        self._timestamps = deque()   # stores arrival times

    def record(self, ts: float):
        self._timestamps.append(ts)  # new request arrives at the right end

    def evict_and_count(self, now: float) -> int:
        cutoff = now - self.window_seconds

        # Remove expired entries from the LEFT (oldest end)
        while self._timestamps and self._timestamps[0] < cutoff:
            self._timestamps.popleft()

        return len(self._timestamps)  # everything left is within the window

    def rate(self, now: float) -> float:
        return self.evict_and_count(now) / self.window_seconds

Why a deque and not a list?

A regular Python list is slow at removing from the front it has to shift every element left, which is O(n). A deque (double ended queue) removes from either end in O(1). Since we always append new entries to the right and evict old entries from the left, a deque is the perfect data structure.

The eviction logic: We don't scan the whole deque looking for expired entries. We only look at the leftmost entry (index 0) the oldest one. If it's older than 60 seconds, we pop it and check the next one. Because entries are always appended in time order, once we find an entry that's within the window, everything to its right is also within the window. We stop there.

In steady traffic, this evicts 0 or 1 entries per call effectively instant.

We maintain one counter per IP and one global counter for all traffic combined.

Part 3: Learning What "Normal" Looks Like The Rolling Baseline

The sliding window tells us the current rate. But we can't know if that rate is suspicious without knowing what's normal for this server.

This is the baseline. It answers: "Historically, how many requests per second does this server receive?"

Building the baseline
Every second, we take a snapshot of the global window count and store it:

# maxlen=1800 means we keep 30 minutes of history (30 * 60 = 1800 seconds)
self._per_second_counts = deque(maxlen=1800)

# Every second:
current_count = global_window.evict_and_count(now)
self._per_second_counts.append((now, current_count))

The maxlen=1800 is doing something clever: when the deque is full and we append a new entry, Python automatically drops the oldest entry from the left. We get a rolling 30 minute window with zero manual cleanup code.
Computing mean and standard deviation
Every 60 seconds we recalculate from the stored history:

counts = [c for _, c in self._per_second_counts]
n = len(counts)

mean = sum(counts) / n
variance = sum((x - mean) ** 2 for x in counts) / n
stddev = math.sqrt(variance)

# Apply floors to prevent false positives during idle periods
mean   = max(mean, 1.0)    # assume at least 1 req/s as baseline
stddev = max(stddev, 0.5)  # assume at least 0.5 req/s variation

Why floor values? If the server is idle at 3am with zero traffic, mean=0 and stddev=0. Then a single request creates an infinite z score and triggers a false alarm. The floors prevent this they say "even in the quietest period, assume some baseline activity."

Per-hour slots

Traffic at 3am looks different from traffic at 3pm. We store separate baseline stats per hour of the day

current_hour = datetime.now().hour
self._hour_stats[current_hour] = BaselineStats(mean, stddev, ...)

When looking up the baseline, we prefer the current hour's stats if they have enough samples. This means the detector naturally adapts to day/night traffic patterns without any manual configuration.

Part 4: Making the Decision Z Score and the 5x Rule

  • The current rate for an IP (from the sliding window)
  • The baseline mean and stddev (from the rolling window)

We combine these into a z score:

z = (current_rate - baseline_mean) / baseline_stddev

The z score measures how many standard deviations above normal the current rate is. A z score of 3.0 means the rate is 3 standard deviations above the mean statistically, this happens by chance less than 0.3% of the time under normal conditions.

def _check_ip(self, ip, baseline, now):
    ip_rate = self.tracker.get_ip_rate(ip)
    zscore  = (ip_rate - baseline.mean) / baseline.stddev

    if zscore > 3.0:
        return AnomalyEvent(condition=f"z-score {zscore:.2f} > 3.0", ...)

    if ip_rate > baseline.mean * 5.0:
        return AnomalyEvent(condition=f"rate {ip_rate:.2f}/s > 5x baseline", ...)

Why two rules? They catch different scenarios:

  • Z score catches relative anomalies. If baseline is 10 req/s and someone hits 40 req/s, z score fires because that's unusual relative to history.
  • 5x rule catches absolute spikes even when the baseline is tiny. If baseline is 0.1 req/s (idle server) and someone hits 0.8 req/s, the z-score might not fire (stddev is also tiny), but 8x > 5x catches it immediately.

Error surge tightening

There's a sneaky attack pattern: low and slow scanning. An attacker sends just a few requests per second below detection thresholds but most of them return 404 errors because they're probing for vulnerabilities (/wp-admin, /.env, /phpMyAdmin, etc.).
We detect this separately: if an IP's 4xx/5xx error rate is 3x higher than normal, we tighten the detection thresholds by 30%:

if error_surge_detected:
    zscore_threshold = 3.0 * 0.7   # → 2.1 (easier to trigger)
    rate_multiplier  = 5.0 * 0.7   # → 3.5 (easier to trigger)

This catches the scanner without blocking every IP that occasionally gets a 404.

Part 5: Blocking the IP with iptables

When an anomaly fires, we need to block the IP immediately. We use iptables the Linux kernel's built-in firewall.

import subprocess

def block_ip(ip: str):
    subprocess.run([
        "sudo", "iptables",
        "-I", "INPUT", "1",    # INSERT at position 1 (top of the chain)
        "-s", ip,              # source IP to match
        "-j", "DROP"           # silently discard matching packets
    ])

Breaking this down:

  • I INPUT 1 inserts our rule at the top of the INPUT chain, so it's checked before any other rules. The attacker's packets are dropped immediately.
  • j DROP silently discard. We don't send any response back (as opposed to -j REJECT which sends an ICMP error). This is intentional: the attacker gets no feedback that they're blocked.

To verify blocks are active, you can run:

sudo iptables -L INPUT -n --line-numbers

The backoff schedule escalating punishment
Not all attacks are equal. A first time offender might get an automatic unban after 10 minutes. A repeat offender escalates through longer bans

We track how many times each IP has been banned across its entire history. When re banning, we move to the next duration:

ban_durations = [600, 1800, 7200, -1]   # seconds; -1 = permanent

count = self._ban_history.get(ip, 0)          # how many times banned before
duration = ban_durations[min(count, len(ban_durations) - 1)]

Auto-unban runs in a background thread that wakes every 30 seconds, checks for expired bans, removes the iptables rule, and sends a Slack notification.

Part 6: The Audit Log
Every significant event ban, unban, baseline recalculation is written to a structured log file:

[2025-01-01T12:00:00+00:00] BAN 1.2.3.4 | IP z-score 4.5 > 3.0 | rate=42.300/s | baseline=5.000/0.500 | 600s
[2025-01-01T12:10:00+00:00] UNBAN 1.2.3.4 | expired | rate=0.000/s | baseline=5.000/0.500 |
[2025-01-01T12:01:00+00:00] BASELINE_RECALC GLOBAL | samples=1800 hour=12 | rate=5.200/s | baseline=5.100/0.480 |

This gives you a full timeline of what happened and why invaluable for post incident analysis.

Part 7: The Live Dashboard
The dashboard is served by FastAPI and auto refreshes every 3 seconds. It shows:

  • Current global req/s vs baseline mean
  • Number of currently banned IPs
  • CPU and memory usage
  • Daemon uptime
  • Table of all active bans with time remaining
  • Top 10 source IPs by request rate

The frontend is plain JavaScript polling /api/metrics. No React, no build step just a fetch() on a timer.

Putting It All Together
The main loop is intentionally simple:

while running:
    tracker.maybe_recalculate()    # recalc baseline every 60s

    # Drain the log queue in batches
    for _ in range(500):
        entry = queue.get_nowait()
        if blocker.is_banned(entry.source_ip):
            continue               # already blocked, skip

        tracker.record(entry)      # update sliding windows
        event = detector.evaluate(entry)  # check for anomaly

        if event and event.kind == PER_IP:
            record = blocker.ban(event.ip, event.condition)
            notifier.send_ban(event, record)
            audit_log.write(...)

        elif event and event.kind == GLOBAL:
            notifier.send_global_alert(event)

Four threads run concurrently:
Main thread: the detection loop above
LogMonitor thread: tails the log file, feeds the queue
Unbanner thread: wakes every 30s to release expired bans
Dashboard thread: serves the FastAPI web UI

Key Lessons

  1. Use the right data structure. A dequemakes the sliding window O(1). The wrong choice (a list, a dict of minute buckets) would have made it O(n) or introduced measurement gaps.

  2. Don't hardcode thresholds. Every threshold in this system lives in config.yaml. Tuning detection sensitivity is a matter of editing one file, not hunting through code.

  3. The baseline needs a warmup period. The system needs 30 minutes of real traffic before the baseline is trustworthy. Floor values prevent false alarms during warmup.

  4. Two detection rules are better than one. Z score and the 5x multiplier cover different attack shapes. Neither alone is sufficient.

  5. Be careful with iptables in Docker. The detector container needs cap_add: [NET_ADMIN] and must run as root. Rules applied inside the container affect the host's iptables chain which is exactly what we want, but worth understanding before you do it.

Internet → Nginx (JSON logs) → shared Docker volume
                                        ↓
                               Detection daemon
                                  ├── monitor.py    (tail logs)
                                  ├── baseline.py   (rolling stats)
                                  ├── detector.py   (z-score logic)
                                  ├── blocker.py    (iptables)
                                  ├── unbanner.py   (backoffreleases)
                                  ├── notifier.py   (Slack)
                                  └── dashboard.py  (FastAPI UI)
DE
Source

This article was originally published by DEV Community and written by marlinekhavele.

Read original article on DEV Community
Back to Discover

Reading List