Dynamic queue selection for heterogeneous workers #57477
Replies: 2 comments
-
|
I can provide some context for question 1. As for question 2, that's a very good question and I would also like to know the answer. Question 1 Question 2 |
Beta Was this translation helpful? Give feedback.
-
|
Not sure about 2 either. Worth checking. But I think the right solution for this would be to built-in a dynamic queue capability in Airflow to do it differently than via cloud policy. This however would require similar approach as custom timetables - likely this should require a dynamic "custom queue selector" object that you should pass instead of queue. The issue here is that queue is deterrmined in scheduler before task is run (classic chicken-egg - we cannot run code in a worker to determine the queue, because we need to have a queue to run the task!). So the issue here is that - due to security model of Airflow - Dag Authors should not bve write any dynamic code to determine the queue. Such code has to be installed as a plugin by Deployment Manager (similarly as cluster policies are) - not DagAuthors. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Our Airflow deployment uses heterogeneous workers, where we route tasks to respective workers. We have use-cases where we would dynamically determine the routing. Examples include:
I've experimented with using triggers setting the
TaskInstance.queue:Unfortunately, when testing in practice, we run into issues as there multiple instance where
TaskInstance.queueis refreshed from the actual DAG's task definition (airflow.models.taskinstance._refresh_from_task), taking the original value fromserialized_dagtable. Ultimately, to stop the worker/scheduler from putting back the original queue, I'd need to make sure thatserialized_daggets the new value (permanently).Question 1: Is there an alternative way to dynamically select queues for task, or do I have to solve it in the DAG processor?
As potential solution I've been playing with the thought of creating a
@propertythat shadowsBaseOperator.queue, where we influence DAG parsing (and thusserialized_dag). However, with Airflow's DAG versioning from Airflow 3.x in mind I'm not to happy about creatingcount(DAGRun)entries in the serialized DAG table.Question 2: Does modifying queue argument create new DAG versions in Airflow 3.x? I could not locate any information on DAG versioning in the documentation. Astronomer documentation on Dag versioning together with some initial code walkthorugh seems to suggest that the
queueis included when evaluating new DAG versions, as it is part of theserialized_dagtable.Beta Was this translation helpful? Give feedback.
All reactions