Skip to content

Dynamically determine queue for a dag or task by calling a function that takes context as input #20498

@lssatvik

Description

@lssatvik

Description

Right now the queue of a dag/task is determined by the queue parameter in dag/task definition or the config file. I want the queue parameter to take a function as input. If a function is given as input, it should pass a context variable and use the return value as the queue for the dag/task.

Airflow dags and tasks support callback functions like for on_success_callback, which is given the context variable and executed on success. I want similar capability for determining queue.

def getQueue(context):
    return f"{context.dag_id}-{context.run_id}"

def successCallback(context):
    log.info("Success")

dag = DAG(
            'test',
            schedule_interval='0 * * * *',
            default_args=default_args,
            start_date=datetime(2021, 12, 25),
            on_success_callback=successCallback
            queue=getQueue # or queue_generator=getQueue
        )

task = BashOperator(
            task_id='test',
            bash_command=f"""echo Namaste World""",
            on_success_callback=successCallback
            queue=getQueue # or queue_generator=getQueue
        )

Use case/motivation

I use an independent ec2 instance as a celery worker for every "dagrun". The queue for any dagrun is dag_id-run_id. In all my dags my first task is always an operator working in the "master" queue that sets up an ec2 instance and starts the worker with the custom queue name.

I modified a line in the _enqueue_task_instances_with_queued_state function in scheduler_job.py:-
queue = ti.queue if ti.queue == "master" else f"{ti.dag_id}-{ti.run_id}"

So finally the queue for every task that is not the ec2 operator is dag_id-run_id. As the ec2_operator starts a celery worker with that specific queue name all tasks not defined with "master" queue (which has a worker running locally) are executed in the celery worker.

So my setup required a small modification to the airflow code base. It would be helpful if the scheduler can determine the queue name through a user-defined function taking the context variable as input.

Presently airflow allows defining a queue only for each dag but not for each dag-run, as the run_id is determined at runtime. Also changing the queue name at the enqueue step does not reflect in the UI as the queue name is only changed w.r.t the scheduler only.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions