Skip to content

danieleschmidt/agent-orchestrated-etl

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

πŸ€– Agent-Orchestrated-ETL

Intelligent, Self-Optimizing Data Pipelines Powered by AI Agents

A revolutionary hybrid system combining Apache Airflow's robust workflow orchestration with LangChain's intelligent agent capabilities to create adaptive, self-healing data pipelines that learn and optimize automatically.

Build Status Coverage Documentation License

πŸ“‹ Table of Contents

✨ Key Features

🧠 AI-Driven Pipeline Generation - Automatically analyzes data sources and generates optimal ETL workflows
πŸ”„ Self-Healing Pipelines - Agents detect failures and implement recovery strategies autonomously
⚑ Dynamic Optimization - Real-time performance tuning based on execution patterns
πŸ”Œ Universal Connectors - Native support for S3, PostgreSQL, APIs, files, and more
πŸ“Š Intelligent Monitoring - Proactive issue detection with automated resolution suggestions
πŸš€ Production Ready - Enterprise-grade security, scaling, and compliance features
🎯 Autonomous SDLC - Self-improving system with progressive enhancement generations
⚑ Advanced Scaling - Multi-level caching, load balancing, and predictive auto-scaling
πŸ›‘οΈ Comprehensive Security - Multi-layer validation, encryption, and threat detection
πŸ”¬ Research-Ready - Built-in frameworks for ML experimentation and benchmarking
πŸ“‘ Real-time Streaming - High-performance message processing with backpressure handling
πŸ€– AutoML Integration - Intelligent pipeline optimization with automated model selection
☁️ Cross-Cloud Federation - Seamless multi-provider deployment and orchestration
πŸ›‘οΈ Enhanced Security - Multi-layer threat detection with real-time validation
πŸ“ˆ Predictive Scaling - ML-powered resource optimization and cost management
πŸ‘οΈ Smart Observability - Anomaly detection with intelligent alerting and SLI/SLO monitoring

πŸš€ Quick Start

Prerequisites

  • Python 3.8+
  • Docker & Docker Compose (optional, for containerized setup)

Installation

# Clone the repository
git clone https://github.com/danieleschmidt/agent-orchestrated-etl.git
cd agent-orchestrated-etl

# Install dependencies
pip install -r requirements.txt

# Setup Airflow
export AIRFLOW_HOME=$(pwd)/airflow
airflow db init

# Initialize agent system
python setup_agent.py

# Start the system
airflow webserver --port 8080

Docker Setup (Recommended)

# Start all services
docker-compose up -d

# Access Airflow UI
open http://localhost:8080

πŸ—οΈ Architecture Overview

Our intelligent agent architecture consists of three core components:

  • 🎯 Orchestrator Agent: Analyzes data sources and orchestrates optimal pipeline creation
  • βš™οΈ ETL Agents: Specialized agents handling extraction, transformation, and loading operations
  • πŸ‘οΈ Monitor Agent: Continuously monitors pipeline health and suggests performance optimizations

πŸ“– Detailed Architecture Documentation

πŸ’‘ Usage Examples

from agent_orchestrated_etl import DataOrchestrator

orch = DataOrchestrator()
pipeline = orch.create_pipeline(
    source="s3://my-bucket/data/",
)
pipeline.execute()

# override the load step
custom_pipeline = orch.create_pipeline(
    source="s3://my-bucket/data/",
    operations={"load": lambda data: print("loaded", data)},
)
custom_pipeline.execute()

# create a pipeline from a REST API
api_pipeline = orch.create_pipeline(
    source="api://example.com/endpoint",
)
api_pipeline.execute()

πŸ–₯️ CLI Commands

Execute pipelines directly from the command line with powerful options:

# Run pipeline with monitoring
run_pipeline s3 --output results.json --monitor events.log

# Preview pipeline without execution
run_pipeline s3 --list-tasks

# Generate Airflow DAG
generate_dag s3 dag.py --dag-id my_dag

# List available data sources
run_pipeline --list-sources

CLI Options:

  • --list-tasks: Preview execution order without running
  • --list-sources: Show supported data sources
  • --monitor <file>: Capture task events to file
  • --output <file>: Specify output location
  • --dag-id <id>: Set custom DAG identifier

πŸ”§ Development Setup

Install development dependencies and enable pre-commit hooks:

# Install in development mode with all dependencies
pip install -e .[dev]

# Setup pre-commit hooks for code quality
pre-commit install

# Run tests with coverage
pytest -q
coverage run -m pytest -q
coverage report -m

# Check code complexity
radon cc src/agent_orchestrated_etl -s -a

# Run linting and formatting
black . && isort . && flake8

🀝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

Quick Contributing Steps

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ“š Documentation

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


⭐ Star us on GitHub | πŸ“ Report Issues | πŸ’¬ Join Discussions

Made with ❀️ by the Agent-Orchestrated-ETL team

About

LangChain/Airflow hybrid orchestrator

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages