如何在触发DAG后安排DAG任务?
我正在使用TriggerDagRunOperator,以便一个控制器DAG可以触发一个目标DAG。但是,一旦控制器DAG触发了目标DAG,目标DAG就会切换到"running",但它的任何任务都不会被调度。我希望一旦控制器DAG触发目标DAG,就安排目标DAG的任务。
# Controller DAG's callable
def conditionally_trigger(context, dag_run_object):
condition_param = context['params']['condition_param']
if condition_param:
return dag_run_obj
return None
# Target DAG's callable
def say_hello():
print("Hello")
# Controller DAG
controller_dag = DAG(
dag_id="controller",
default_args = {
"owner":"Patrick Stump",
"start_date":datetime.utcnow(),
},
schedule_interval='@once',
)
# Target DAG
target_dag = DAG(
dag_id="target",
default_args = {
"owner":"Patrick Stump",
"start_date":datetime.utcnow(),
},
schedule_interval=None,
)
# Controller DAG's task
controller_task = TriggerDagRunOperator(
task_id="trigger_dag",
trigger_dag_id="target",
python_callable=conditionally_trigger,
params={'condition_param':True},
dag=controller_dag,
)
# Target DAG's task -- never scheduled!
target_task = PythonOperator(
task_id="print_hello",
python_callable=say_hello,
dag=target_dag,
)
提前感谢:)
转载请注明出处:http://www.400tyeyaji.com/article/20230526/1968679.html