Skip to content

Data pipeline and MLOps techniques to automatically scrape data, version it using DVC, run models using MLFlow and pick best model. Build Docker container and deploy it and api performance testing using Grafana and Prometheus

Notifications You must be signed in to change notification settings

muhammadhani18/Data-Pipeline-for-Time-Series-Pollution-Prediction

Repository files navigation

Task 1: Managing Environmental Data with DVC

Fetching Data

Weatherapi is used to collect weather data and Air Quality data The collected data is stored in JSON file

DVC integration

  1. run dvc init to initialize the dvc repo
  2. run dvc add data/data.json to keep track of data.json file in data.json.dvc
  3. run dvc remote add --default myremote gdrive://{PATH_ID}/path -f to link Google drive
  4. run dvc remote modify myremote gdrive_acknowledge_abuse true to add some configurations of GDrive
  5. run dvc push to push the data to GDrive
  6. push the changes to github also

Task Scheduling

Prepare Your Python Script

  1. Save your Python script as weather_data_collector.py in a directory, e.g., C:\MyScripts.
  2. Test the script by running it manually in the command prompt: python C:\MyScripts\weather_data_collector.py

Create a .bat File to Run the Script

  1. Create a new file named getData.bat in the same directory as your script (e.g., C:\MyScripts).
  2. run the .bat file so that it is executing the properly

Schedule the Task

  1. Open Task Scheduler:
    • Press Win + R, type taskschd.msc, and press Enter.
  2. Click Create Task in the right-hand panel.
  3. Configure the task:
    • General Tab:
      • Name the task, e.g., "Weather Data Collector".
      • Select "Run whether user is logged on or not".
      • Check "Run with highest privileges".
    • Triggers Tab:
      • Click New.
      • Choose Daily and set the Start time (e.g., the current time).
      • Set "Repeat task every" to 20 minutes.
      • Set "For a duration of" to Indefinitely.
      • Click OK.
    • Actions Tab:
      • Click New.
      • Choose Start a Program.
      • Browse to the run_weather_script.bat file you created earlier.
      • Click OK.
    • Conditions Tab:
      • Uncheck "Start the task only if the computer is on AC power" (optional).
    • Settings Tab:
      • Ensure the following options are selected:
        • "Allow task to be run on demand".
        • "If the task is already running, then the following rule applies: Stop the existing instance".
  4. Click OK and enter your password if prompted.

Test the Task

  1. In Task Scheduler, find your task under the Task Scheduler Library.
  2. Right-click the task and choose Run.
  3. Verify that the script executed by checking the data.json file.
  4. Wait 20 minutes to ensure the task runs on schedule.

alt text

Task 2: Air Quality Prediction Documentation

This documentation provides a step-by-step explanation of the workflow for preprocessing the data, training and saving the ARIMA model, and setting up a Flask API for generating predictions.


Table of Contents

  1. Data Preprocessing
  2. Model Training and Saving
  3. Flask API

Data Preprocessing

The first step involves preparing the data for training and testing the model. Below is the logical flow of the preprocessing:

  1. Load Raw Data:

    • The raw air quality dataset is loaded. This dataset contains timestamps and various air quality indicators like pm2_5, o3, pm10, etc.
  2. Handle Missing Values:

    • Missing values in the dataset are handled by either filling them using imputation techniques or dropping incomplete rows. Consistent and complete data is essential for training time-series models.
  3. Set Datetime Index:

    • The dataset is indexed by the datetime column to ensure it is in a time-series format. This allows models to interpret temporal dependencies.
  4. Normalize Data (Optional):

    • Depending on the dataset, the feature values may be scaled or normalized for better model performance.
  5. Split into Train and Test Sets:

    • The preprocessed data is divided into a training set (80%) and a test set (20%) to evaluate the model's accuracy.

Model Training and Saving

The second step involves training and saving an ARIMA model for air quality prediction.

  1. Select Target Columns:

    • Multiple target columns (e.g., pm2_5, o3) are identified. Each column is treated as an independent time-series for prediction.
  2. Hyperparameter Optimization:

    • A grid search is performed over a range of ARIMA parameters (p, d, q) to identify the best configuration for each target column. The parameters are:
      • p: Number of lag observations.
      • d: Degree of differencing.
      • q: Size of the moving average window.
  3. Train ARIMA Model:

    • The ARIMA model is trained on the training set of the target column using the best hyperparameters.
  4. Evaluate the Model:

    • The trained model is evaluated on the test set using metrics like:
      • RMSE (Root Mean Squared Error): Measures prediction error.
      • MAE (Mean Absolute Error): Measures average magnitude of error.
  5. Select the Best Column:

    • The column with the lowest RMSE is selected as the target for predictions.
  6. Save the Model:

    • The trained ARIMA model for the best column is saved as a serialized file (arima_model.pkl) using pickle. Additionally, the name of the best column is saved for use in the Flask API.

Flask API

The third step involves setting up a Flask API to serve predictions using the saved ARIMA model.

  1. API Setup:

    • A Flask application is set up to expose a prediction endpoint (/predict).
  2. Load the Trained Model:

    • The saved ARIMA model (arima_model.pkl) and the best column information are loaded when the API starts.
  3. Prediction Logic:

    • When a GET request is made to the /predict endpoint, the following steps are performed:
      • The number of prediction steps (steps) is extracted from the request parameters.
      • The ARIMA model generates predictions for the specified number of steps.
  4. Return Predictions:

    • The predictions are returned in JSON format for easy integration into frontend applications.
  5. Run the API:

    • The Flask API is set to run on http://localhost:4000 by default.

Steps to Run the Workflow

  1. Preprocess the Data:

    • Ensure the data/preprocessed_data.csv file is ready.
    • Run the preprocessing script or code to clean and split the data.
  2. Train and Save the Model:

    • Execute the training script to train the ARIMA model and save the best model to models/arima_model.pkl.
  3. Start the Flask API:

    • Navigate to the api/ folder.
    • Install dependencies using pip install -r requirements.txt.
    • Run the Flask app using python app.py.
  4. Test the API:

    • Use a tool like Postman or cURL to send a GET request to http://localhost:4000/predict?steps=10.
    • Verify that predictions are returned in JSON format.

Task 3: Monitoring Flask Application with Prometheus and Grafana

1. Prerequisites

Ensure you have the following installed:

  1. Docker and Docker Compose for containerized setup.
  2. Basic knowledge of Flask and Python.

2. Setting Up Prometheus and Grafana

2.1. Create a docker-compose.yml File

version: '3'
services:
  prometheus:
    image: prom/prometheus
    ports:
      - 9090:9090
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - monitoring
  
  grafana:
    image: grafana/grafana
    ports:
      - 3000:3000
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    networks:
      - monitoring

networks:
  monitoring:
    external: true

2.2. Create the prometheus.yml Configuration File

global:
  scrape_interval: 15s  # How often Prometheus scrapes metrics

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  - job_name: 'flask-app'
    static_configs:
      - targets: ['host.docker.internal:4000']  # Replace with your Flask app's host and port

3. Exposing Metrics in Flask App

3.1. Install Dependencies

pip install flask prometheus-client prometheus-flask-exporter

3.2. Modify app.py to Expose Metrics

Here’s an example of how to modify your Flask app to expose metrics at /metrics.

from flask import Flask, request, jsonify, render_template
from prometheus_flask_exporter import PrometheusMetrics

# Initialize Flask app
app = Flask(__name__)

# Attach Prometheus Metrics to Flask app
metrics = PrometheusMetrics(app)

@app.route("/")
def index():
    """Serve the homepage."""
    return render_template("index.html")

@app.route("/predict", methods=["GET"])
def predict():
    """API endpoint for predictions."""
    steps = request.args.get("steps", default=10, type=int)
    # Dummy response for demonstration
    return jsonify({"predictions": [i for i in range(steps)]})

if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=4000)

4. Running Prometheus and Grafana

run docker-compose up -d to run grafana, prometheus and your flask app. Make sure they are in the same network run queries like flask_http_request_total for getting data from prometheus. Below is the same dashboard of grafana

alt text

About

Data pipeline and MLOps techniques to automatically scrape data, version it using DVC, run models using MLFlow and pick best model. Build Docker container and deploy it and api performance testing using Grafana and Prometheus

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published