如何在触发DAG后安排DAG任务?

我正在使用TriggerDagRunOperator,以便一个控制器DAG可以触发一个目标DAG。但是,一旦控制器DAG触发了目标DAG,目标DAG就会切换到"running",但它的任何任务都不会被调度。我希望一旦控制器DAG触发目标DAG,就安排目标DAG的任务。

# Controller DAG's callable
def conditionally_trigger(context, dag_run_object):
   condition_param = context['params']['condition_param']
   if condition_param:
      return dag_run_obj
   return None

# Target DAG's callable
def say_hello():
   print("Hello")

# Controller DAG
controller_dag = DAG(
   dag_id="controller",
   default_args = {
      "owner":"Patrick Stump",
      "start_date":datetime.utcnow(),
   },
   schedule_interval='@once',
)

# Target DAG
target_dag = DAG(
   dag_id="target",
   default_args = {
      "owner":"Patrick Stump",
      "start_date":datetime.utcnow(),
   },
   schedule_interval=None,
)

# Controller DAG's task
controller_task = TriggerDagRunOperator(
   task_id="trigger_dag",
   trigger_dag_id="target",
   python_callable=conditionally_trigger,
   params={'condition_param':True},
   dag=controller_dag,
)

# Target DAG's task -- never scheduled!
target_task = PythonOperator(
   task_id="print_hello",
   python_callable=say_hello,
   dag=target_dag,
)

提前感谢:)

转载请注明出处:http://www.400tyeyaji.com/article/20230526/1968679.html