- Project Overview
- Technology Stack
- Directory Structure
- Detailed Implementation Plan
- Dataset Selection
- Building the Project: Step-by-Step Instructions
- Additional Resources
Anomaly Detection in IoT Streams aims to identify unusual patterns or behaviors in data generated by IoT devices. Detecting anomalies is crucial for maintaining the integrity, security, and efficiency of IoT systems.
- Python: Primary language for data processing, machine learning, and backend services.
- scikit-learn: For traditional ML algorithms.
- TensorFlow / PyTorch: For deep learning models, if needed.
- Pandas: Data manipulation and analysis.
- NumPy: Numerical computations.
- Apache Kafka: Real-time data streaming.
- Apache Spark Streaming: Real-time data processing.
- Docker: Containerization of services.
- Kubernetes: Orchestration of containerized applications.
- MLflow: Experiment tracking and model management.
- GitHub Actions: CI/CD pipelines.
- InfluxDB: Time-series database for IoT data.
- PostgreSQL: For storing metadata and model information.
- Grafana: Dashboarding and visualization of metrics.
- Plotly / Dash: Interactive data visualization.
- Git: Source code management.
- DVC (Data Version Control): Data and model versioning.
Anomaly-detection-in-IoT-streams/
├── data/
│ ├── raw/
│ ├── processed/
│ └── external/
├── notebooks/
│ ├── EDA.ipynb
│ └── Model_Training.ipynb
├── src/
│ ├── data/
│ │ ├── load_data.py
│ │ └── preprocess.py
│ ├── models/
│ │ ├── train_model.py
│ │ └── predict.py
│ ├── utils/
│ │ └── helpers.py
│ └── main.py
├── config/
│ └── config.yaml
├── tests/
│ ├── test_data.py
│ ├── test_models.py
│ └── test_utils.py
├── docker/
│ ├── Dockerfile
│ └── docker-compose.yaml
├── scripts/
│ └── deploy.sh
├── mlruns/
├── .gitignore
├── README.md
├── requirements.txt
└── setup.py
- Initialize a Git repository.
- Set up the Python environment using
virtualenv
orconda
. - Create the directory structure as outlined above.
- Define project configurations in
config/config.yaml
.
- Data Sources: Identify IoT devices and data sources.
- Streaming Setup: Configure Apache Kafka for real-time data ingestion.
- Storage: Store raw data in
data/raw/
.
- Cleaning: Handle missing values, outliers.
- Transformation: Normalize/scale data, feature engineering.
- Storage: Save processed data in
data/processed/
.
- Use Jupyter notebooks to perform EDA.
- Visualize data distributions, correlations, and temporal patterns.
- Training: Develop and train ML models for anomaly detection.
- Examples: Isolation Forest, Autoencoders, LSTM-based models.
- Evaluation: Assess model performance using metrics like Precision, Recall, F1-Score.
- Selection: Choose the best-performing model.
- Containerize the application using Docker.
- Use Kubernetes for orchestrating containers.
- Implement REST APIs for model inference using frameworks like FastAPI or Flask.
- CI/CD: Set up GitHub Actions for automated testing and deployment.
- Experiment Tracking: Use MLflow to track experiments and manage models.
- Monitoring: Use Grafana and Prometheus to monitor system and model performance.
- Develop dashboards to visualize real-time anomaly detection results.
- Provide insights and alerts for detected anomalies.
- Write unit and integration tests for data processing, model training, and API endpoints.
- Ensure code quality and reliability.
- Maintain comprehensive documentation in
README.md
. - Document APIs and usage instructions.
-
Kaggle IoT Datasets:
-
UCI Machine Learning Repository:
-
Yahoo Webscope:
-
Custom Dataset:
- If existing datasets do not fit your use case, consider collecting data from IoT devices relevant to your application.
Ensure that the chosen dataset includes:
- Time-series data: Timestamped sensor readings.
- Multi-dimensional features: Multiple sensor metrics.
- Labeled anomalies: For supervised learning, if available.
git clone https://github.com/yourusername/Anomaly-detection-in-IoT-streams.git
cd Anomaly-detection-in-IoT-streams
Create a virtual environment and install dependencies.
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
-
Install Kafka:
Follow the Kafka Quickstart to install and start Kafka.
-
Create Topics:
kafka-topics.sh --create --topic iot-data --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Develop a producer script to simulate IoT data or connect to actual IoT devices.
# src/data/load_data.py
import kafka
import json
import time
import random
def produce_iot_data():
producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
while True:
data = {
'sensor1': random.random(),
'sensor2': random.random(),
'timestamp': int(time.time())
}
producer.send('iot-data', data)
time.sleep(1)
if __name__ == "__main__":
produce_iot_data()
Implement data preprocessing steps.
# src/data/preprocess.py
import pandas as pd
def preprocess(raw_data_path, processed_data_path):
df = pd.read_csv(raw_data_path)
# Handle missing values
df.fillna(method='ffill', inplace=True)
# Feature scaling
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
df[['sensor1', 'sensor2']] = scaler.fit_transform(df[['sensor1', 'sensor2']])
df.to_csv(processed_data_path, index=False)
if __name__ == "__main__":
preprocess('data/raw/iot_data.csv', 'data/processed/iot_data_processed.csv')
Use Jupyter notebooks for EDA.
jupyter notebook notebooks/EDA.ipynb
Train anomaly detection models.
# src/models/train_model.py
import pandas as pd
from sklearn.ensemble import IsolationForest
import joblib
def train_model(data_path, model_path):
df = pd.read_csv(data_path)
model = IsolationForest(contamination=0.05)
model.fit(df[['sensor1', 'sensor2']])
joblib.dump(model, model_path)
if __name__ == "__main__":
train_model('data/processed/iot_data_processed.csv', 'models/anomaly_model.pkl')
Create a REST API for model inference.
# src/main.py
from fastapi import FastAPI
import joblib
import pandas as pd
from pydantic import BaseModel
app = FastAPI()
model = joblib.load('models/anomaly_model.pkl')
class DataPoint(BaseModel):
sensor1: float
sensor2: float
timestamp: int
@app.post("/predict")
def predict(data: DataPoint):
df = pd.DataFrame([data.dict()])
prediction = model.predict(df[['sensor1', 'sensor2']])
return {"anomaly": bool(prediction[0] == -1)}
# docker/Dockerfile
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Build and run the Docker container.
docker build -t anomaly-detector .
docker run -d -p 8000:8000 anomaly-detector
pip install mlflow
mlflow ui
Create a .github/workflows/ci-cd.yml
file.
name: CI/CD Pipeline
on:
push:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run Tests
run: |
pytest
- name: Build Docker Image
run: |
docker build -t anomaly-detector .
- name: Push Docker Image
uses: docker/build-push-action@v2
with:
push: true
tags: yourdockerhubusername/anomaly-detector:latest
Configure Grafana to visualize metrics from the application and Kafka.
docker run -d -p 3000:3000 grafana/grafana
For this project, it's essential to select a dataset that reflects real-world IoT scenarios. Here are some recommended options:
-
Kaggle IoT Network Intrusion Dataset
- Description: Contains network traffic data from IoT devices with labeled anomalies.
- Link: Kaggle IoT Network Intrusion Dataset
-
Kaggle ESP32 IoT Smart Home Dataset
- Description: Captures sensor data from a smart home environment.
- Link: Kaggle ESP32 IoT Smart Home Dataset
-
Yahoo Webscope Anomaly Detection Dataset
- Description: Provides time-series data with annotated anomalies.
- Link: Yahoo Webscope
-
Custom Dataset
- Description: If existing datasets don't meet your specific requirements, consider generating synthetic data or collecting data from actual IoT devices.
Recommendation: Start with the Kaggle ESP32 IoT Smart Home Dataset for sensor readings and labeled anomalies, which aligns well with the project objectives.
- FastAPI Documentation: https://fastapi.tiangolo.com/
- Docker Documentation: https://docs.docker.com/
- Kubernetes Documentation: https://kubernetes.io/docs/home/
- MLflow Documentation: https://mlflow.org/docs/latest/index.html
- Grafana Documentation: https://grafana.com/docs/
- Apache Kafka Documentation: https://kafka.apache.org/documentation/
- Scikit-learn Anomaly Detection: https://scikit-learn.org/stable/modules/outlier_detection.html