Ever tried to find a needle in a haystack? Now imagine the haystack is moving at 250Hz, and that "needle" is a life-threatening heart arrhythmia.
In the world of wearable technology, handling high-frequency ECG/EKG data isn't just a "big data" problem—it's a "fast data" problem. To keep users safe, we need real-time anomaly detection that can distinguish between a jogger's racing heart and a genuine medical emergency.
In this guide, we'll build a sophisticated monitoring pipeline using the Isolation Forest algorithm for unsupervised learning, InfluxDB for high-velocity time-series storage, and Tornado for an asynchronous ingestion layer. We’ll also tackle the "Advanced" challenge: keeping inference latency low enough for edge-adjacent processing.
The Architecture
Monitoring heart rates requires a system that never sleeps. We need a non-blocking ingestion layer to handle the UDP/HTTP streams from wearables, a lightning-fast database, and a mathematical model that doesn't need labeled "bad" data to find outliers.
graph TD
A[ECG Wearable Device] -->|High-Freq Stream| B[Tornado Async Server]
B -->|Write Batch| C[(InfluxDB TSDB)]
C -->|Windowed Query| D[Feature Engineering]
D -->|Feature Vector| E[Isolation Forest Model]
E -->|Anomaly Score| F{Decision Engine}
F -->|Score > Threshold| G[🚨 Trigger Alert]
F -->|Normal| H[✅ Log Metrics]
subgraph "Inference Loop"
D
E
F
end
Prerequisites
To follow this advanced tutorial, you’ll need:
- Python 3.9+
- Scikit-learn: For our Isolation Forest implementation.
- InfluxDB: The gold standard for ECG time series analysis.
- Tornado: For handling concurrent high-throughput connections.
Step 1: High-Speed Ingestion with Tornado
Standard Flask or Django won't cut it here. We need Tornado's non-blocking I/O to ensure we don't drop packets when a wearable sends 250 samples per second.
import tornado.ioloop
import tornado.web
from influxdb_client import InfluxDBClient, Point, WriteOptions
# InfluxDB Configuration
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=1000))
class ECGDataHandler(tornado.web.RequestHandler):
async def post(self):
# High-frequency data usually arrives as a JSON array of voltage values
data = tornado.escape.json_decode(self.request.body)
device_id = data.get("device_id")
voltages = data.get("samples") # List of floats
points = [
Point("ecg_signal")
.tag("device_id", device_id)
.field("voltage", v)
for v in voltages
]
write_api.write(bucket="health_data", record=points)
self.set_status(202) # Accepted
def make_app():
return tornado.web.Application([
(r"/ingest", ECGDataHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
Step 2: Why Isolation Forest?
For Real-time anomaly detection, traditional thresholding (e.g., "if HR > 100") fails to catch subtle arrhythmia patterns like PVCs (Premature Ventricular Contractions).
Isolation Forest is perfect here because:
- Unsupervised: We don't need 10,000 labeled heart attacks to train it.
- Efficiency: It has a linear time complexity, making it suitable for high-frequency streams.
- The Logic: It works by randomly selecting a feature and a split value. Anomalies are "easier" to isolate and thus end up with shorter paths in the tree.
Step 3: The Inference Engine
We’ll pull a sliding window of data from InfluxDB, extract features (like R-R intervals), and run them through our model.
import numpy as np
from sklearn.ensemble import IsolationForest
def detect_anomalies(ecg_window):
"""
Input: A window of normalized ECG voltage values.
Logic: Uses Isolation Forest to detect morphology outliers.
"""
# Reshape for Scikit-learn (N samples, M features)
# In a real scenario, you'd extract features like Mean, Std Dev, or FFT components
data = np.array(ecg_window).reshape(-1, 1)
# contamination=0.01 means we expect 1% of data to be anomalous
model = IsolationForest(n_estimators=100, contamination=0.01, random_state=42)
# Fit and predict
# -1 for anomaly, 1 for normal
preds = model.fit_predict(data)
return preds
# Mocking a window of 1000ms of data
sample_window = [0.5, 0.52, 0.48, 1.2, 0.51] # 1.2 is a spike!
results = detect_anomalies(sample_window)
print(f"Anomaly detected at indices: {np.where(results == -1)}")
The "Official" Way: Scaling to Production
Building a prototype is easy; scaling it to handle 100,000 concurrent patients is where the real engineering begins. You'll need to consider model quantization for edge deployment and distributed stream processing.
For a deeper dive into production-ready patterns, including advanced signal processing and MLOps for medical IoT, I highly recommend checking out the technical deep-dives at WellAlly Tech Blog. They cover how to bridge the gap between "it works on my machine" and "it works on a patient's wrist."
Step 4: Optimizing Latency for Wearables
To achieve "Advanced" level performance, we can't retrain the model on every heartbeat. Use these strategies:
- Warm Starting: Only retrain the Isolation Forest every few hours. Use the existing tree structure for real-time
decision_functioncalls. - Dimensionality Reduction: Instead of raw voltage, use PCA (Principal Component Analysis) to reduce the input features before feeding the forest.
- Tornado PeriodicCallback: Run the inference loop as a background task in Tornado so it doesn't block the ingestion of new data.
from tornado.ioloop import PeriodicCallback
def run_inference_cycle():
# 1. Query last 10 seconds of data from InfluxDB
# 2. Run Isolation Forest
# 3. If anomaly score > threshold, trigger AlertHandler
pass
# Run inference every 5 seconds
PeriodicCallback(run_inference_cycle, 5000).start()
Conclusion
Detecting arrhythmias using Isolation Forest and InfluxDB turns a chaotic stream of electricity into actionable health insights. By leveraging the asynchronous nature of Tornado, we ensure that our system remains responsive even under heavy load.
Your turn: Have you tried using unsupervised learning for time-series data? What’s your biggest struggle with high-frequency ingestion? Drop a comment below! 👇
This article was originally published by DEV Community and written by Beck_Moulton.
Read original article on DEV Community