If someone floods your web server with thousands of requests per second, what happens? Your server slows down, legitimate users can't get through, and eventually everything crashes. This kind of attack is called a DDoS — a Distributed Denial of Service attack.
In this post, I'll walk you through exactly how I built a tool that watches incoming traffic in real time, learns what "normal" looks like, detects when something is wrong, and automatically blocks the attacker — all in pure Python, without any rate-limiting libraries.
By the end you'll understand:
- How a sliding window works to measure request rates
- How a rolling baseline learns from your own traffic
- How the detection logic decides when something is suspicious
- How
iptablesblocks an IP at the Linux kernel level
Let's go.
What Are We Building?
Imagine a security guard standing at the door of a nightclub. They don't just check IDs — they also watch how many people are arriving per minute. If suddenly 500 people show up in 60 seconds when the normal rate is 20 per minute, something is wrong.
That's exactly what our tool does, but for HTTP requests to a web server.
Here's the big picture:
Internet
│
▼
iptables (kernel firewall — blocks banned IPs before they reach Nginx)
│
▼
Nginx (reverse proxy — logs every request as JSON)
│
▼
Nextcloud (the actual cloud storage app)
Meanwhile, in parallel:
Nginx log ──► Python daemon reads it ──► Detects anomalies ──► Bans IPs via iptables
└──► Slack alert
Our Python daemon is always running in the background, reading the log, measuring rates, and reacting. It is not a cron job — it is a continuous process with four threads running simultaneously, each with a specific job.
Part 1: The Sliding Window
The Problem with Simple Counters
The most naive approach is a simple counter: "count requests per minute and reset every 60 seconds." But this has a well-known flaw called the boundary problem.
Imagine the window resets at 12:00:00. An attacker sends 99 requests at 11:59:59 and another 99 at 12:00:01. Your counter sees 99 in each window — fine! But in reality, 198 requests arrived in just 2 seconds. The counter completely missed it.
The Deque Solution
We fix this with a sliding window using Python's collections.deque. Instead of counting in a fixed time slot, we store the exact Unix timestamp of every recent request and always look backwards exactly 60 seconds.
from collections import deque
# One deque per IP address — stores Unix timestamps
per_ip_window = {}
def record_request(ip: str, timestamp: float):
if ip not in per_ip_window:
per_ip_window[ip] = deque()
dq = per_ip_window[ip]
# Add this request's timestamp to the right
dq.append(timestamp)
# Evict timestamps older than 60 seconds from the left
cutoff = timestamp - 60
while dq and dq[0] < cutoff:
dq.popleft() # O(1) — deque left-pop is constant time
# What remains = requests in the last 60 seconds
raw_count = len(dq)
rate_per_s = raw_count / 60 # convert to req/s to match baseline units
return rate_per_s
Why deque instead of a regular list? Because removing from the left of a list is O(n) — it shifts every element. A deque removes from the left in O(1) constant time. Under a flood where we might evict thousands of entries per second, this difference is the gap between keeping up with the attack and falling behind.
We maintain two of these structures simultaneously:
- Per-IP window — tracks how fast one specific IP is sending requests
- Global window — tracks total traffic from all IPs combined
The global window catches distributed attacks where no single IP looks bad, but the total is overwhelming.
Tracing Through an Example
12:00:01 — IP sends request. dq = [1.0]. rate = 1/60 = 0.017 req/s
12:00:30 — IP sends 10 more. dq = [1.0, ..., 30.0]. rate = 11/60 = 0.18 req/s
12:01:01 — cutoff = 61.0 - 60 = 1.0 → evict dq[0]=1.0
dq = [2.0, ..., 61.0]. rate still ~0.18 req/s ← normal
12:01:01 — Attacker sends 500 requests in 1 second
dq now has 510 entries
rate = 510/60 = 8.5 req/s ← SUSPICIOUS
Part 2: The Rolling Baseline
Why Not Hardcode a Threshold?
You might think: "Just flag anyone over 100 requests per minute." But this fails immediately in the real world. A news article might suddenly make your site popular — totally normal traffic that would look like an attack. At 3am your server might handle 5 req/s normally; during business hours, 50 req/s is normal. A fixed threshold would either miss real attacks during busy periods or generate constant false positives during quiet ones.
The solution is to learn from your own traffic and adapt constantly.
Rolling Mean and Standard Deviation
Every second, we record how many requests arrived globally. We keep the last 30 minutes of these per-second counts in a deque (maxlen=1800). Every 60 seconds we recalculate:
import math
def recalculate_baseline(counts: list[int]) -> tuple[float, float]:
if len(counts) < 2:
return None, None # not enough data — don't fire yet
n = len(counts)
mean = sum(counts) / n
var = sum((x - mean) ** 2 for x in counts) / n
stddev = math.sqrt(var)
# Proportional stddev floor — prevents near-zero variance
# from causing absurd z-scores on perfectly uniform traffic.
# We scale with mean so this is never a hardcoded constant.
# Note: mean itself is NEVER modified — only stddev gets a floor.
if mean > 0:
stddev = max(stddev, mean * 0.3)
return mean, stddev
Why a proportional stddev floor? Consider a server getting exactly 1 request per second from background scanners. Every second has count=1. Mean=1.0, stddev=0.0. Now a user loads a page and sends 3 requests in one second — z-score = (3-1)/0.000001 = 2,000,000. They get banned for loading a web page.
By enforcing stddev >= mean * 0.3, we ensure z-scores stay meaningful even when traffic is perfectly uniform. The key point: we never modify the mean — only the standard deviation gets a floor, and that floor scales proportionally with traffic.
Cold-Start Guard
There is one more important detail: we do not fire any bans until the baseline has collected at least 120 per-second data points (about 2 minutes of traffic). A brand-new baseline with 5 data points is not reliable enough to justify blocking anyone. This cold-start guard prevents false positives on startup without hardcoding any threshold values.
Per-Hour Slots — Preferring Recent Data
Alongside the 30-minute window, we also maintain per-hour slots:
# { "2026-04-27T14": [count_s1, count_s2, ...] }
hourly_data = {}
current_hour = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H")
if len(hourly_data.get(current_hour, [])) >= 120:
counts = hourly_data[current_hour] # prefer this hour's data
else:
counts = [c for _, c in rolling_window] # fall back to 30-min window
Why? If it's 3pm and you have 2 hours of 3pm data, it is more representative of current traffic than mixing in data from 1pm. The hourly preference kicks in after 120 seconds of same-hour data and gives the baseline a much more accurate picture of what is normal right now.
After the server has been running for several hours, you can see two hourly slots with visibly different mean values on the dashboard — proof the baseline is adapting to real traffic patterns.
Part 3: The Detection Logic
Now we have two things: the current rate in req/s (from the sliding window) and what normal looks like in req/s (from the baseline). How do we decide if something is an attack?
Z-Score: How Many Standard Deviations Away Are We?
A z-score measures how far a value is from the mean, in units of standard deviations:
z-score = (current_rate - mean) / stddev
If traffic is normally 2.0 req/s with a stddev of 0.6, and an IP suddenly sends 10 req/s:
z-score = (10.0 - 2.0) / 0.6 = 13.3
A z-score of 13.3 means this rate would occur by random chance less than 0.000001% of the time under normal conditions. Something is very wrong.
Two Conditions — Whichever Fires First
The spec requires us to flag an anomaly if either condition triggers, whichever comes first:
def check_for_anomaly(rate_per_s: float, mean: float, stddev: float) -> str | None:
# Both rate_per_s and mean are in req/s — consistent units
zscore = (rate_per_s - mean) / stddev
# Condition 1: statistically anomalous
if zscore > 3.0:
return f"zscore={zscore:.2f} > 3.0 | rate={rate_per_s:.4f} req/s | mean={mean:.4f} req/s"
# Condition 2: absolute spike — 5x the mean
# Catches massive floods even when stddev is high
if rate_per_s > 5.0 * mean:
return f"spike: {rate_per_s:.4f} req/s > 5x mean={mean:.4f} req/s"
return None # all good
The z-score catches statistically unusual traffic. The spike multiplier (5×) catches massive floods immediately even when variance is high. Both conditions use req/s throughout — no unit mixing.
Error Surge — Tightening Thresholds
Attackers often probe for vulnerabilities by sending requests that return 404 or 403 errors. Our daemon tracks the error rate (fraction of 4xx/5xx responses) for each IP over the last 60 seconds, and compares it against the global baseline error rate:
# Compare THIS IP's error rate against the GLOBAL baseline
ip_error_rate = errors_from_this_ip / total_from_this_ip
global_error_rate = total_errors_all_ips / total_requests_all_ips
if ip_error_rate >= 3 * global_error_rate:
# This IP is generating 3x more errors than normal
# It's probably scanning/probing — watch it more closely
zscore_threshold *= 0.5 # 3.0 → 1.5 (easier to flag)
spike_multiplier *= 0.5 # 5.0 → 2.5
This means a scanning IP gets flagged much sooner than a regular flooder, because it reveals its intent through error patterns before it even sends enough traffic to trigger the rate-based detection.
Part 4: Blocking with iptables
When we confirm an anomaly from a specific IP, we block it immediately using iptables — the Linux kernel's built-in firewall.
import subprocess
def ban_ip(ip: str) -> None:
subprocess.run([
"iptables",
"-I", "INPUT", "1", # insert at the TOP of the INPUT chain
"-s", ip, # from this source IP
"-j", "DROP" # silently drop all packets
])
Why does this work so effectively? iptables is enforced by the Linux kernel before the packet reaches Nginx, before Docker, before anything. So a banned IP's requests are dropped at the network layer — your server wastes zero CPU processing them. The attacker gets no response at all; their connection just times out.
The -I INPUT 1 flag is important — it inserts the rule at position 1 (the very top of the chain) so it is checked first, before any other existing rules.
To remove a ban:
def unban_ip(ip: str) -> None:
subprocess.run([
"iptables", "-D", "INPUT",
"-s", ip, "-j", "DROP"
])
The Backoff Schedule
We don't ban permanently on the first offence. This is fair to legitimate users who might be behind a misconfigured corporate proxy sharing an IP. Instead, we use an escalating schedule:
First offence: ban 10 minutes
Second offence: ban 30 minutes
Third offence: ban 2 hours
Fourth offence: permanent ban
After each ban expires, the unbanner thread checks whether the IP re-offends. If it does, the next ban is longer. A Slack notification is sent on every transition so the operator always knows what is happening.
Part 5: Putting It All Together
The daemon runs as four background threads that all run simultaneously, forever:
Thread 1: monitor — tails the Nginx JSON log line by line (50ms poll)
Thread 2: baseline — recalculates mean/stddev every 60 seconds
Thread 3: unbanner — checks every 30 seconds if any bans have expired
Thread 4: dashboard — serves the live metrics webpage on port 8080
The monitor is the heart of the system. Here is the flow for every single log line:
New log line arrives from Nginx
│
▼
Parse JSON → extract source_ip, timestamp, method, path, status, response_size
│
▼
baseline.record_request(ip, ts, status) ← always feeds the rolling window
│
▼
Update sliding windows (per-IP deque and global deque)
Evict timestamps older than 60 seconds
Compute rate_per_s = len(deque) / 60
│
▼
Is baseline mature? (sample_count >= 120)
│
├── No → skip detection, keep collecting data
│
└── Yes → run anomaly check
│
├── Per-IP: zscore > 3.0 OR rate > 5x mean?
│ Yes → iptables DROP + Slack ban alert (within 10 seconds)
│
└── Global: total rate anomalous?
Yes → Slack alert only (don't block everyone)
Everything is event-driven — no polling, no sleeping between requests. The daemon reacts to each log line the moment Nginx writes it.
Part 6: The Live Dashboard
The dashboard is served by Python's built-in http.server — no Flask, no FastAPI, no frameworks. Two routes:
-
GET /— serves the HTML page -
GET /api/stats— returns a JSON snapshot of current state
The HTML page uses a 3-second JavaScript setInterval to call /api/stats and update the display without a page refresh:
async function refresh() {
const r = await fetch('/api/stats', { cache: 'no-store' });
const d = await r.json();
document.getElementById('mean').textContent = d.mean; // effective_mean in req/s
document.getElementById('stddev').textContent = d.stddev; // in req/s
document.getElementById('uptime').textContent = d.uptime;
// ... update banned IPs table, top 10 IPs, CPU/memory bars
}
setInterval(refresh, 3000); // every 3 seconds
The dashboard shows:
- Banned IPs — with the condition that triggered the ban, rate, baseline mean at ban time, level, and time remaining
- Global req/s — current rate from the 60-second global window
- Top 10 source IPs — ranked by request count in the last 60 seconds
-
CPU and memory usage — from
psutil - Effective mean and stddev — the real computed baseline values
- Hourly slots — the per-hour mean values, proving the baseline learns over time
- Uptime — how long the daemon has been running
What I Learned
Start with the data structure. The choice of deque over list for the sliding window is what makes the system work under a real flood. O(1) vs O(n) on the eviction path is the difference between keeping up and falling behind.
Never hardcode effective_mean. A fixed threshold makes assumptions about your traffic that will be wrong. A system that learns from its own data handles traffic spikes, quiet nights, and growth automatically.
The proportional stddev floor is subtle but critical. Without it, perfectly uniform traffic (like exactly 1 scanner per second) collapses stddev to zero and makes every fluctuation look like a 3-sigma event. The fix — stddev >= mean * 0.3 — scales with traffic and never substitutes a fake mean value.
Separation of concerns makes debugging much easier. Because each module (monitor, baseline, detector, blocker, notifier) has one job, you can look at the audit log and immediately know which piece fired and why.
The kernel is your friend. iptables drops packets before your application software ever sees them. This is infinitely more efficient than rate-limiting inside Python — by the time Python code runs, the OS has already spent resources accepting the TCP connection.
The Full Code
The complete project is on GitHub: github.com/lucadavid075/hng-anomaly-detector
It includes:
- Docker Compose stack (Nginx + Nextcloud + Detector + Certbot)
- All Python source files, fully commented
- Slack-integrated alerting
This article was originally published by DEV Community and written by Luke David.
Read original article on DEV Community