Technology Apr 18, 2026 · 5 min read

Pulse Check: Real-time Arrhythmia Detection with Isolation Forest and InfluxDB

Ever tried to find a needle in a haystack? Now imagine the haystack is moving at 250Hz, and that "needle" is a life-threatening heart arrhythmia. In the world of wearable technology, handling high-frequency ECG/EKG data isn't just a "big data" problem—it's a "fast data" problem. To keep users safe...

DE
DEV Community
by Beck_Moulton
Pulse Check: Real-time Arrhythmia Detection with Isolation Forest and InfluxDB

Ever tried to find a needle in a haystack? Now imagine the haystack is moving at 250Hz, and that "needle" is a life-threatening heart arrhythmia.

In the world of wearable technology, handling high-frequency ECG/EKG data isn't just a "big data" problem—it's a "fast data" problem. To keep users safe, we need real-time anomaly detection that can distinguish between a jogger's racing heart and a genuine medical emergency.

In this guide, we'll build a sophisticated monitoring pipeline using the Isolation Forest algorithm for unsupervised learning, InfluxDB for high-velocity time-series storage, and Tornado for an asynchronous ingestion layer. We’ll also tackle the "Advanced" challenge: keeping inference latency low enough for edge-adjacent processing.

The Architecture

Monitoring heart rates requires a system that never sleeps. We need a non-blocking ingestion layer to handle the UDP/HTTP streams from wearables, a lightning-fast database, and a mathematical model that doesn't need labeled "bad" data to find outliers.

graph TD
    A[ECG Wearable Device] -->|High-Freq Stream| B[Tornado Async Server]
    B -->|Write Batch| C[(InfluxDB TSDB)]
    C -->|Windowed Query| D[Feature Engineering]
    D -->|Feature Vector| E[Isolation Forest Model]
    E -->|Anomaly Score| F{Decision Engine}
    F -->|Score > Threshold| G[🚨 Trigger Alert]
    F -->|Normal| H[✅ Log Metrics]

    subgraph "Inference Loop"
    D
    E
    F
    end

Prerequisites

To follow this advanced tutorial, you’ll need:

  • Python 3.9+
  • Scikit-learn: For our Isolation Forest implementation.
  • InfluxDB: The gold standard for ECG time series analysis.
  • Tornado: For handling concurrent high-throughput connections.

Step 1: High-Speed Ingestion with Tornado

Standard Flask or Django won't cut it here. We need Tornado's non-blocking I/O to ensure we don't drop packets when a wearable sends 250 samples per second.

import tornado.ioloop
import tornado.web
from influxdb_client import InfluxDBClient, Point, WriteOptions

# InfluxDB Configuration
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=1000))

class ECGDataHandler(tornado.web.RequestHandler):
    async def post(self):
        # High-frequency data usually arrives as a JSON array of voltage values
        data = tornado.escape.json_decode(self.request.body)
        device_id = data.get("device_id")
        voltages = data.get("samples") # List of floats

        points = [
            Point("ecg_signal")
            .tag("device_id", device_id)
            .field("voltage", v)
            for v in voltages
        ]

        write_api.write(bucket="health_data", record=points)
        self.set_status(202) # Accepted

def make_app():
    return tornado.web.Application([
        (r"/ingest", ECGDataHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Step 2: Why Isolation Forest?

For Real-time anomaly detection, traditional thresholding (e.g., "if HR > 100") fails to catch subtle arrhythmia patterns like PVCs (Premature Ventricular Contractions).

Isolation Forest is perfect here because:

  1. Unsupervised: We don't need 10,000 labeled heart attacks to train it.
  2. Efficiency: It has a linear time complexity, making it suitable for high-frequency streams.
  3. The Logic: It works by randomly selecting a feature and a split value. Anomalies are "easier" to isolate and thus end up with shorter paths in the tree.

Step 3: The Inference Engine

We’ll pull a sliding window of data from InfluxDB, extract features (like R-R intervals), and run them through our model.

import numpy as np
from sklearn.ensemble import IsolationForest

def detect_anomalies(ecg_window):
    """
    Input: A window of normalized ECG voltage values.
    Logic: Uses Isolation Forest to detect morphology outliers.
    """
    # Reshape for Scikit-learn (N samples, M features)
    # In a real scenario, you'd extract features like Mean, Std Dev, or FFT components
    data = np.array(ecg_window).reshape(-1, 1)

    # contamination=0.01 means we expect 1% of data to be anomalous
    model = IsolationForest(n_estimators=100, contamination=0.01, random_state=42)

    # Fit and predict
    # -1 for anomaly, 1 for normal
    preds = model.fit_predict(data)

    return preds

# Mocking a window of 1000ms of data
sample_window = [0.5, 0.52, 0.48, 1.2, 0.51] # 1.2 is a spike!
results = detect_anomalies(sample_window)
print(f"Anomaly detected at indices: {np.where(results == -1)}")

The "Official" Way: Scaling to Production

Building a prototype is easy; scaling it to handle 100,000 concurrent patients is where the real engineering begins. You'll need to consider model quantization for edge deployment and distributed stream processing.

For a deeper dive into production-ready patterns, including advanced signal processing and MLOps for medical IoT, I highly recommend checking out the technical deep-dives at WellAlly Tech Blog. They cover how to bridge the gap between "it works on my machine" and "it works on a patient's wrist."

Step 4: Optimizing Latency for Wearables

To achieve "Advanced" level performance, we can't retrain the model on every heartbeat. Use these strategies:

  1. Warm Starting: Only retrain the Isolation Forest every few hours. Use the existing tree structure for real-time decision_function calls.
  2. Dimensionality Reduction: Instead of raw voltage, use PCA (Principal Component Analysis) to reduce the input features before feeding the forest.
  3. Tornado PeriodicCallback: Run the inference loop as a background task in Tornado so it doesn't block the ingestion of new data.
from tornado.ioloop import PeriodicCallback

def run_inference_cycle():
    # 1. Query last 10 seconds of data from InfluxDB
    # 2. Run Isolation Forest
    # 3. If anomaly score > threshold, trigger AlertHandler
    pass

# Run inference every 5 seconds
PeriodicCallback(run_inference_cycle, 5000).start()

Conclusion

Detecting arrhythmias using Isolation Forest and InfluxDB turns a chaotic stream of electricity into actionable health insights. By leveraging the asynchronous nature of Tornado, we ensure that our system remains responsive even under heavy load.

Your turn: Have you tried using unsupervised learning for time-series data? What’s your biggest struggle with high-frequency ingestion? Drop a comment below! 👇

DE
Source

This article was originally published by DEV Community and written by Beck_Moulton.

Read original article on DEV Community
Back to Discover

Reading List