airflow optimizing

Ethereal Lv4

1. 调度优化

1.1 概念理解

1.1.1 Executor

是airflow部署的模式,更改后必须重启airflow才可以生效。

  • 本地Executor

    • Airflow 任务在调度器进程内部本地运行。
      优点: 非常易于使用,速度快,延迟极低,设置要求少。
      缺点: 功能有限,并与 Airflow 调度器共享资源。
  • 远程Executor

    • 队列/批处理 Executor

      • CeleryExecutor

        • 在k8s中,会部署一个worker节点和一个redis节点(作为broker),scheduler只管推送至queue中,然后worker轮询方式获取任务。

        • scheduler会轮询检查DAG状态,将上一个已经完成的任务标记为完成并启动下一个任务的执行。

        因此,在启动任务前,存在以下步骤:

        1. parser分析DAG流图(主要因素:分析)

        2. scheduler轮询检查是否有可以开始的任务,并放入queue内(主要时间因素:轮询)

        3. queue通过数据库锁更新DAG状态(主要时间因素:锁)

        4. worker轮询检查是否有新的任务可以被执行(主要时间因素:轮询)

        5. 获取任务(主要时间因素:下载任务)

        6. 启动执行

        7. 向数据库通知任务完成,更新状态

        _images/run_task_on_celery_executor.png

      • BatchExecutor

        • 通过docker执行任务
      • EdgeExecutor  (实验性预发布)

        • 涉及边设备
    • 容器化 Executor

  • 并行使用多个Executor

    • 参数设置:

      • executor: CeleryKubernetesExecutor(helm内)(已经废弃)
    • 新的方法:

      • executor: “CeleryExecutor,KubernetesExecutor”

1.1.2 Parser

解析并更新DAG设置

参数设置

  • parsing_processes(AIRFLOW__DAG_PROCESSOR__PARSING_PROCESSES):DAG processor 可以并行运行多个进程来解析 dags。这定义了将运行的进程数量。

1.1.3 Scheduler

参数设置:

  • [core] parallelism(AIRFLOW__CORE__PARALLELISM):这定义了每个 Airflow 调度器可以同时运行的最大任务实例数量,与 worker 数量无关。通常,此值乘以集群中调度器的数量,就是元数据数据库中处于 running 状态的任务实例的最大数量。该值必须大于或等于 1。

  • [scheduler] dag_stale_not_seen_duration(AIRFLOW__SCHEDULER__DAG_STALE_NOT_SEEN_DURATION):经过此秒数后,未被 Dag 处理器更新的 DAG 将被停用。

  • [scheduler] max_dagruns_per_loop_to_schedule(AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE):调度程序在调度和排队任务时应检查(并锁定)多少个 DagRun。

  • [scheduler] max_dagruns_to_create_per_loop(AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP):每个调度程序循环中为多少个 DAG 创建 DagRun 的最大数量。

  • [scheduler] max_tis_per_query(AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY):此项确定在每个调度程序循环中评估多少个任务实例进行调度。将其设置为 0 以使用 [core] parallelism 的值。

  • [scheduler] num_runs(AIRFLOW__SCHEDULER__NUM_RUNS):尝试调度每个 DAG 文件的次数 -1 表示无限次数。

  • [scheduler] orphaned_tasks_check_interval(AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL):调度程序应多久(以秒为单位)检查一次孤立任务和 SchedulerJobs。

  • [scheduler] scheduler_heartbeat_sec(AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC):调度器不断尝试触发新任务(有关更多信息,请参阅文档中的调度器部分)。这定义了调度器应多久运行一次(以秒为单位)。

  • [scheduler] scheduler_idle_sleep_time(AIRFLOW__SCHEDULER__SCHEDULER_IDLE_SLEEP_TIME):控制调度器在循环之间休眠多长时间,但前提是循环中没有可做的事情。也就是说,如果它调度了某个任务,那么它将立即开始下一个循环迭代。

  • [scheduler] task_instance_heartbeat_sec(AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT):任务在被重试或设置为失败之前可以处于排队状态的时间量。

1.1.3 Broker

Broker类型 特点 适用场景
RabbitMQ 高可靠、支持AMQP协议、功能丰富(如优先级队列) 生产环境首选,对消息可靠性要求高
Redis 简单轻量、性能高、支持Pub/Sub模式 中小规模集群,对性能要求较高
其他 如Amazon SQS、Kafka(需定制) 特殊场景(如云原生、大数据架构)

1.1.4 Wroker

参数设置:没有celery本身参数?celery中存在参数bundle_refresh_check_interval指定轮询间隔:猜测以下参数类似:min_heartbeat_interval

  • [worker] min_heartbeat_interval(AIRFLOW__WORKERS__MIN_HEARTBEAT_INTERVAL):工作节点与 API 服务器检查任务实例心跳状态以确认其仍然处于活动状态的最小间隔(以秒为单位)。

1.2 测试分析

通过打印时间发现主要分成两部分:

  • 从下发命令到排队时间(取决于scheduler轮询并开始下发新任务间隔)

    • 通过缩短心跳时间可以减少结束到queue时间:scheduler_heartbeat_sec
  • 从排队到执行时间(取决于worker启动新任务间隔)

    • queue时间还没有发现更改方式(AIRFLOW__WORKERS__MIN_HEARTBEAT_INTERVAL没有效果)

1.3 尝试优化

1.3.1 优化方式

缩短轮询时间,增加并行度

1
2
3
4
5
6
7
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 1
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
# [core] 设置 parallelism 和 dag_concurrency
AIRFLOW__CORE__PARALLELISM: 4
AIRFLOW__CORE__DAG_CONCURRENCY: 4
AIRFLOW__CELERY__CELERYD_CONSUMER_POLL_INTERVAL: 0.2
AIRFLOW__WORKERS__MIN_HEARTBEAT_INTERVAL: 1

1.3.2 运行日志

打印时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[2025-05-08 10:40:28,959: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[cdb45479-493d-4328-ae0c-5cb793c76a62] received
[2025-05-08 10:40:28,974: INFO/ForkPoolWorker-15] [cdb45479-493d-4328-ae0c-5cb793c76a62] Executing command in Celery: ['airflow', 'tasks', 'run', 'download_bing', 'save_task', 'manual__2025-05-08T10:40:16.745026+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']
[2025-05-08 10:40:29,139: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
[2025-05-08 10:40:29,375: INFO/ForkPoolWorker-15] Running <TaskInstance: download_bing.save_task manual__2025-05-08T10:40:16.745026+00:00 [queued]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2025-05-08 10:40:40,264: INFO/ForkPoolWorker-15] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[cdb45479-493d-4328-ae0c-5cb793c76a62] succeeded in 11.302474641997833s: None
[2025-05-08 10:41:03,570: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[f0164707-0bc2-401e-931c-855c7fc9a0c4] received
[2025-05-08 10:41:03,583: INFO/ForkPoolWorker-15] [f0164707-0bc2-401e-931c-855c7fc9a0c4] Executing command in Celery: ['airflow', 'tasks', 'run', 'download_bing', 'request_task', 'manual__2025-05-08T10:41:03.285683+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']
[2025-05-08 10:41:03,719: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
[2025-05-08 10:41:03,923: INFO/ForkPoolWorker-15] Running <TaskInstance: download_bing.request_task manual__2025-05-08T10:41:03.285683+00:00 [queued]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2025-05-08 10:41:14,857: INFO/ForkPoolWorker-15] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[f0164707-0bc2-401e-931c-855c7fc9a0c4] succeeded in 11.284130732004996s: None
[2025-05-08 10:41:15,644: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[e8a51a0d-d621-4895-aa5a-ad2d1cc416f2] received
[2025-05-08 10:41:15,657: INFO/ForkPoolWorker-15] [e8a51a0d-d621-4895-aa5a-ad2d1cc416f2] Executing command in Celery: ['airflow', 'tasks', 'run', 'download_bing', 'save_task', 'manual__2025-05-08T10:41:03.285683+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']
[2025-05-08 10:41:15,794: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
[2025-05-08 10:41:15,989: INFO/ForkPoolWorker-15] Running <TaskInstance: download_bing.save_task manual__2025-05-08T10:41:03.285683+00:00 [queued]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2025-05-08 10:41:26,872: INFO/ForkPoolWorker-15] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[e8a51a0d-d621-4895-aa5a-ad2d1cc416f2] succeeded in 11.225932340006693s: None
1
2
[2025-05-08, 10:41:04 UTC] {logging_mixin.py:188} INFO - 1746700864.6873255
[2025-05-08, 10:41:16 UTC] {logging_mixin.py:188} INFO - 1746700876.7167912

新任务进入排队(10:41:03,570)->worker开始执行(10:41:03,923)->真正开始执行(10:41:04)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[2025-05-09 05:19:09,662: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[34b1c9f8-106f-4a57-b5db-e5372fd4d586] received
[2025-05-09 05:19:09,662: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x78c712290a60> (args:('airflow.providers.celery.executors.celery_executor_utils.execute_command', '34b1c9f8-106f-4a57-b5db-e5372fd4d586', {'lang': 'py', 'task': 'airflow.providers.celery.executors.celery_executor_utils.execute_command', 'id': '34b1c9f8-106f-4a57-b5db-e5372fd4d586', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '34b1c9f8-106f-4a57-b5db-e5372fd4d586', 'parent_id': None, 'argsrepr': "[['airflow', 'tasks', 'run', 'download_bing', 'request_task', 'manual__2025-05-09T05:19:09.480045+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']]", 'kwargsrepr': '{}', 'origin': 'gen7@airflow-scheduler-7d48d8794c-jg52n', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '34b1c9f8-106f-4a57-b5db-e5372fd4d586', 'reply_to': 'ce14b94e-9598-36d8-a3f1-efeefc50cd54', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'default'}, 'priority': 0,... kwargs:{})
[2025-05-09 05:19:09,665: DEBUG/MainProcess] Task accepted: airflow.providers.celery.executors.celery_executor_utils.execute_command[34b1c9f8-106f-4a57-b5db-e5372fd4d586] pid:82
[2025-05-09 05:19:09,676: INFO/ForkPoolWorker-15] [34b1c9f8-106f-4a57-b5db-e5372fd4d586] Executing command in Celery: ['airflow', 'tasks', 'run', 'download_bing', 'request_task', 'manual__2025-05-09T05:19:09.480045+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']
[2025-05-09 05:19:09,707: DEBUG/ForkPoolWorker-15] Calling callbacks: [<function default_action_log at 0x78c7167a0b80>]
[2025-05-09 05:19:09,738: DEBUG/ForkPoolWorker-15] Loading plugins
[2025-05-09 05:19:09,738: DEBUG/ForkPoolWorker-15] Loading plugins from directory: /opt/airflow/plugins
[2025-05-09 05:19:09,739: DEBUG/ForkPoolWorker-15] Loading plugins from entrypoints
[2025-05-09 05:19:09,739: DEBUG/ForkPoolWorker-15] Importing entry_point plugin openlineage
[2025-05-09 05:19:09,812: DEBUG/ForkPoolWorker-15] Loading 1 plugin(s) took 0.07 seconds
[2025-05-09 05:19:09,813: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
[2025-05-09 05:19:09,815: DEBUG/ForkPoolWorker-15] Importing /opt/airflow/dags/test.py
[2025-05-09 05:19:09,823: DEBUG/ForkPoolWorker-15] Loaded DAG <DAG: download_bing>
[2025-05-09 05:19:10,003: DEBUG/ForkPoolWorker-15] Plugins are already loaded. Skipping.
[2025-05-09 05:19:10,003: DEBUG/ForkPoolWorker-15] Integrate DAG plugins
[2025-05-09 05:19:10,008: DEBUG/ForkPoolWorker-15] previous_execution_date was called
[2025-05-09 05:19:10,018: INFO/ForkPoolWorker-15] Running <TaskInstance: download_bing.request_task manual__2025-05-09T05:19:09.480045+00:00 [queued]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2025-05-09 05:19:10,018: DEBUG/ForkPoolWorker-15] Disposing DB connection pool (PID 694)
[2025-05-09 05:19:10,019: DEBUG/ForkPoolWorker-15] Setting up DB connection pool (PID 694)
[2025-05-09 05:19:10,019: DEBUG/ForkPoolWorker-15] settings.prepare_engine_args(): Using NullPool
[2025-05-09 05:19:10,383: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
[2025-05-09 05:19:15,384: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
[2025-05-09 05:19:20,384: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
[2025-05-09 05:19:20,843: DEBUG/MainProcess] pidbox received method ping() [reply_to:{'exchange': 'reply.celery.pidbox', 'routing_key': 'e39109c1-1692-3932-8384-c1cccb7baf1a'} ticket:5bb21c32-2bae-415d-9d34-ff1b00e7270e]
[2025-05-09 05:19:20,981: DEBUG/ForkPoolWorker-15] Calling callbacks: []
[2025-05-09 05:19:21,008: INFO/ForkPoolWorker-15] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[34b1c9f8-106f-4a57-b5db-e5372fd4d586] succeeded in 11.343424211023375s: None
[2025-05-09 05:19:21,100: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[6291141f-dbc2-4fd8-a518-81e017134130] received
[2025-05-09 05:19:21,101: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x78c712290a60> (args:('airflow.providers.celery.executors.celery_executor_utils.execute_command', '6291141f-dbc2-4fd8-a518-81e017134130', {'lang': 'py', 'task': 'airflow.providers.celery.executors.celery_executor_utils.execute_command', 'id': '6291141f-dbc2-4fd8-a518-81e017134130', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '6291141f-dbc2-4fd8-a518-81e017134130', 'parent_id': None, 'argsrepr': "[['airflow', 'tasks', 'run', 'download_bing', 'save_task', 'manual__2025-05-09T05:19:09.480045+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']]", 'kwargsrepr': '{}', 'origin': 'gen7@airflow-scheduler-7d48d8794c-jg52n', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '6291141f-dbc2-4fd8-a518-81e017134130', 'reply_to': 'ce14b94e-9598-36d8-a3f1-efeefc50cd54', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'default'}, 'priority': 0, 'body_encoding':... kwargs:{})
[2025-05-09 05:19:21,103: DEBUG/MainProcess] Task accepted: airflow.providers.celery.executors.celery_executor_utils.execute_command[6291141f-dbc2-4fd8-a518-81e017134130] pid:82
[2025-05-09 05:19:21,119: INFO/ForkPoolWorker-15] [6291141f-dbc2-4fd8-a518-81e017134130] Executing command in Celery: ['airflow', 'tasks', 'run', 'download_bing', 'save_task', 'manual__2025-05-09T05:19:09.480045+00:00', '--local', '--subdir', 'DAGS_FOLDER/test.py']
[2025-05-09 05:19:21,149: DEBUG/ForkPoolWorker-15] Calling callbacks: [<function default_action_log at 0x78c7167a0b80>]
[2025-05-09 05:19:21,189: DEBUG/ForkPoolWorker-15] Loading plugins
[2025-05-09 05:19:21,190: DEBUG/ForkPoolWorker-15] Loading plugins from directory: /opt/airflow/plugins
[2025-05-09 05:19:21,190: DEBUG/ForkPoolWorker-15] Loading plugins from entrypoints
[2025-05-09 05:19:21,190: DEBUG/ForkPoolWorker-15] Importing entry_point plugin openlineage
[2025-05-09 05:19:21,263: DEBUG/ForkPoolWorker-15] Loading 1 plugin(s) took 0.07 seconds
[2025-05-09 05:19:21,264: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/test.py
[2025-05-09 05:19:21,265: DEBUG/ForkPoolWorker-15] Importing /opt/airflow/dags/test.py
[2025-05-09 05:19:21,273: DEBUG/ForkPoolWorker-15] Loaded DAG <DAG: download_bing>
[2025-05-09 05:19:21,448: DEBUG/ForkPoolWorker-15] Plugins are already loaded. Skipping.
[2025-05-09 05:19:21,449: DEBUG/ForkPoolWorker-15] Integrate DAG plugins
[2025-05-09 05:19:21,453: DEBUG/ForkPoolWorker-15] previous_execution_date was called
[2025-05-09 05:19:21,462: INFO/ForkPoolWorker-15] Running <TaskInstance: download_bing.save_task manual__2025-05-09T05:19:09.480045+00:00 [queued]> on host airflow-worker-0.airflow-worker.airflow.svc.cluster.local
[2025-05-09 05:19:21,462: DEBUG/ForkPoolWorker-15] Disposing DB connection pool (PID 729)
[2025-05-09 05:19:21,463: DEBUG/ForkPoolWorker-15] Setting up DB connection pool (PID 729)
[2025-05-09 05:19:21,463: DEBUG/ForkPoolWorker-15] settings.prepare_engine_args(): Using NullPool
[2025-05-09 05:19:25,384: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
[2025-05-09 05:19:30,383: DEBUG/MainProcess] pidbox received method enable_events() [reply_to:None ticket:None]
[2025-05-09 05:19:32,355: DEBUG/ForkPoolWorker-15] Calling callbacks: []
[2025-05-09 05:19:32,387: INFO/ForkPoolWorker-15] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[6291141f-dbc2-4fd8-a518-81e017134130] succeeded in 11.284129409003071s: None
1
2
[2025-05-09, 05:19:10 UTC] {logging_mixin.py:188} INFO - 1746767950.8249474
[2025-05-09, 05:19:22 UTC] {logging_mixin.py:188} INFO - 1746767962.2026312
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# airflow tasks run download_bing save_task manual__2025-05-09T05:19:09.480045+00:00
...
[2025-05-10T04:49:16.545+0000] {cli_action_loggers.py:67} DEBUG - Calling callbacks: [<function default_action_log at 0x7e4c4b906820>]
[2025-05-10T04:49:16.572+0000] {plugins_manager.py:328} DEBUG - Loading plugins
[2025-05-10T04:49:16.573+0000] {plugins_manager.py:254} DEBUG - Loading plugins from directory: /opt/***/plugins
[2025-05-10T04:49:16.573+0000] {plugins_manager.py:234} DEBUG - Loading plugins from entrypoints
[2025-05-10T04:49:16.595+0000] {plugins_manager.py:237} DEBUG - Importing entry_point plugin openlineage
[2025-05-10T04:49:16.727+0000] {plugins_manager.py:346} DEBUG - Loading 1 plugin(s) took 0.15 seconds
[2025-05-10T04:49:16.728+0000] {dagbag.py:540} INFO - Filling up the DagBag from /opt/***/dags
[2025-05-10T04:49:16.730+0000] {dagbag.py:332} DEBUG - Importing /opt/***/dags/test.py
[2025-05-10T04:49:16.752+0000] {dagbag.py:506} DEBUG - Loaded DAG <DAG: download_bing>
[2025-05-10T04:49:16.753+0000] {dagbag.py:332} DEBUG - Importing /opt/***/dags/hello.py
[2025-05-10T04:49:16.757+0000] {dagbag.py:506} DEBUG - Loaded DAG <DAG: b_hello>
[2025-05-10T04:49:16.910+0000] {plugins_manager.py:322} DEBUG - Plugins are already loaded. Skipping.
[2025-05-10T04:49:16.911+0000] {plugins_manager.py:506} DEBUG - Integrate DAG plugins
[2025-05-10T04:49:16.915+0000] {taskinstance.py:991} DEBUG - previous_execution_date was called
[2025-05-10T04:49:16.923+0000] {task_command.py:423} INFO - Running <TaskInstance: download_bing.save_task manual__2025-05-09T05:19:09.480045+00:00 [success]> on host ***-worker-0.***-worker.***.svc.cluster.local
[2025-05-10T04:49:16.923+0000] {settings.py:386} DEBUG - Disposing DB connection pool (PID 32686)
[2025-05-10T04:49:16.924+0000] {settings.py:249} DEBUG - Setting up DB connection pool (PID 32686)
[2025-05-10T04:49:16.924+0000] {settings.py:318} DEBUG - settings.prepare_engine_args(): Using NullPool
[2025-05-10T04:49:18.554+0000] {cli_action_loggers.py:85} DEBUG - Calling callbacks: []
[2025-05-10T04:49:18.554+0000] {settings.py:386} DEBUG - Disposing DB connection pool (PID 32686)

1.3.3 修改Broker

rabbitmq时间损耗:

  1. 加入环境变量优化前

    1. 1747293379.924484->1747293381.4033878(1.479)
  2. 加入环境变量优化后

    1. 1747294250.2575343->1747294251.8361712(1.579)

    2. 1747294511.4867296->1747294513.37435(1.888)

redis时间损耗:

  1. 加入环境变量优化前

    1. 1747296261.2109053->1747296263.5157278(2.305)

    2. 1747295348.6995502->1747295350.9857132(2.286)

  2. 加入环境变量优化后

    1. 1747295391.1667778->1747295393.4209657(2.26)

    2. 1747296451.4672449->1747296453.208179(1.741)

1.3.4 使用KubernetesPodOperator

使用官方版本helm,并且注意要将配置修改:

1
multiNamespaceMode: true

编写如下脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from datetime import datetime
import time
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator


with DAG(
dag_id='download_bing',
tags=['test'], # DAG的tag标签,用于分类
) as dag:
def request():
print('hello')
print(time.time())
return ''
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp


def save():
print('hello2')
print(time.time())
return
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)

# 使用PythonOperator执行Python函数
request_task = PythonOperator(
task_id="request_task",
python_callable=request
)

save_task = PythonOperator(
task_id="save_task",
python_callable=save
)

k = KubernetesPodOperator(
task_id="example_task",
name="example_pod",
namespace="default",
image="debian",
cmds=["bash", "-cx"],
arguments=["date +%Y-%m-%d' '%H:%M:%S.%3N"],
labels={"example": "label"},
get_logs=True,
is_delete_operator_pod=True,
)

request_task >> save_task
save_task >> k

使用KubernetesPodOperator消耗:

前一个输出:1747749304.166512

后一个输出(KubernetesPodOperator):2025-05-20 13:55:07.802

大约耗费:3.636s

1.3.5 使用多Executor并存:指定KubernetesExecutor

配置:

1
2
3
4
5
# 多execotor并存
executor: "CeleryExecutor,KubernetesExecutor"
# 同时要修改files/pod-template-file.kubernetes-helm-yaml
- name: AIRFLOW__CORE__EXECUTOR
value: {{ .Values.executor | quote }}

使用以下方式指定:

1
2
3
4
5
b = BashOperator(
task_id="my_task_in_its_own_pod",
executor="KubernetesExecutor",
bash_command="echo hello & sleep 10",
)

但是目前还无法看到log,所以无法计时。

1.3.6 使用pod_override

dag如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from datetime import datetime
import time
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from kubernetes.client import models as k8s
with DAG(
dag_id='download_bing',
tags=['test'], # DAG的tag标签,用于分类
) as dag:
def request():
print('hello')
print(time.time())
return ''
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp


def save():
print('hello2')
print(time.time())
return
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)

# 使用PythonOperator执行Python函数
request_task = PythonOperator(
task_id="request_task",
python_callable=request
)

save_task = PythonOperator(
task_id="save_task",
executor="KubernetesExecutor",
python_callable=save
)

k = KubernetesPodOperator(
task_id="example_task",
name="example_pod",
namespace="default",
executor="KubernetesExecutor",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "hello"],
labels={"example": "label"},
get_logs=True,
is_delete_operator_pod=True,
)

b = BashOperator(
task_id="my_task_in_its_own_pod",
executor="KubernetesExecutor",
bash_command="echo hello & sleep 10",
)

executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=['echo "retrieved from mount" > /shared/test.txt'],
command=["bash", "-cx"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
],
)
),
}

@task(executor="KubernetesExecutor",executor_config=executor_config_sidecar)
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
print("hello world")
#for i in range(5):
# try:
# return_code = os.system("cat /shared/test.txt")
# if return_code != 0:
# raise ValueError(f"Error when checking volume mount. Return code {return_code}")
#except ValueError as e:
# if i > 4:
# raise e

sidecar_task = test_sharedvolume_mount()

request_task >> save_task

save_task >> sidecar_task
  • 如果container只剩一个,那么就会一直在排队状态(可以执行,但是无法把完成写入到数据库中)

  • base名字是指worker container,并且必须置于第一个,如果这里有image,那么就会覆盖。不需要设定其他任何参数。

1.3.7 调用链

task_run(airflow/cli/commands/task_command.py)

-> 1. get_dag(airflow/utils/cli.py) 2. DAG::get_task(airflow/models/dag.py) 3. _get_ti(airflow/cli/commands/task_command.py) 4. TaskInstance::init_run_context(airflow-core/src/airflow/models/taskinstance.py)

-> 4. TaskInstance::get_template_context(airflow/models/taskinstance.py)

-> 4. integrate_macros_plugins(airflow/plugins_manager.py)

-> 5. reconfigure_orm(airflow/settings.py)

-> 6. _run_raw_task(airflow/models/taskinstance.py)

观察发现可能是以下几个函数调用时间过长:

  1. DAG::get_task(airflow/models/dag.py)

  2. _get_ti(airflow/cli/commands/task_command.py)

  3. _run_raw_task(airflow/models/taskinstance.py)

1
2
[2025-05-10T04:49:18.554+0000] {cli_action_loggers.py:85} DEBUG - Calling callbacks: []
[2025-05-10T04:49:18.554+0000] {settings.py:386} DEBUG - Disposing DB connection pool (PID 32686)

都是执行程序结束后的退出时的调用

1.4 对比选型

1.4.1 argo

优点

  • 支持更复杂的工作流设计,包括循环、递归和条件逻辑,适合需要高度控制和灵活性的复杂任务。其工作流以 YAML 或 JSON 格式定义

缺点

  • 使用 Kubernetes 的 CronJob 来调度工作流,调度能力依赖于 Kubernetes

1.4.2 kubeflow

优点

  • Kubeflow 在 机器学习、分布式计算、资源密集型任务 中更快。因为依赖于k8s,可以更好利用资源。

    • Airflow 在 轻量级任务、短时任务调度、非容器化环境 中更快。
  • 提供很多ML相关包,例如类似jupter notebook的kubeflow notebooks

  • 其中kubeflow pipeline基于argo实现,相比于argo能有更多ML内容,增强了持久化层。

缺点

  • 同样需要每个任务启动新的Pod
  • kubeflow也不支持定义循环

99b06eba24bb908bec12eb3ad08a8cba

1.5 Celery优化

1.5.1 Broker Connection Pools(连接池数目)

1.5.2 Using Transient Queues(不需要持久化的队列)

  • by using task_routes:

    1
    task_routes = { 'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}}

1.5.3 Prefetch Limits(预取数量,对于任务执行时间短的可以设置较大数值)

1.5.4 Reserve one task at a time(留存任务)

1
2
task_acks_late = True
worker_prefetch_multiplier = 1

1.5.5 其他概念

1.5.5.1 定时任务(Periodic Tasks)

通过celery beat调度程序调度

1.5.5.2 应用(Application)

类似flask

1.5.5.3 队列(Queue)

指定任务调度的队列Routing options

1
add.apply_async(queue='priority.high')
1.5.5.4 任务(Tasks)

优化建议:

  1. 一般来说,最好将问题分解为许多小任务

  2. 最好的办法是在内存中有一份副本

  3. 注意竞争

1.5.5.5 工作节点(Worker)
  • 并发:默认为机器上的CPU数量->k8s环境中指Pod分配到的CPU数量

  • 自动缩放:--autoscale选项

  • 并发控制

    • prefork:默认选项,非常适合CPU受限的任务和大多数用例。

    • eventlet和gevent:这些模型专为IO受限的任务而设计,使用greenlets实现高并发性。请注意,某些功能,如soft_timeout,在这些模式下不可用。

    • solo:在主线程中按顺序执行任务。

    • threads:利用线程实现并发。

    • custom:允许通过环境变量指定自定义工作池实现。

1.5.5.6 缓存池
1
2
3
4
cache_backend_options = {
'binary': True,
'behaviors': {'tcp_nodelay': True},
}

cache_backend_options

1.5.5.7 详细配置

Configuration and defaults — Celery 5.5.2 documentation

参考

配置参考 — Airflow 文档 - Airflow 工作流管理平台

参数 — Airflow 文档 - Airflow 工作流管理平台

调度器 — Airflow 文档 - Airflow 工作流管理平台

Celery Executor — apache-airflow-providers-celery 文档 - Airflow 工作流管理平台

Airflow Celery Executor 架构与任务执行过程-CSDN博客

CeleryKubernetes 执行器 — apache-airflow-providers-celery 文档 - Airflow 工作流管理平台

Celery 执行器命令 — apache-airflow-providers-celery 文档 - Airflow 工作流管理平台

记一次 Airflow 性能调优

Celery Executor — apache-airflow-providers-celery Documentation

高性能分布式任务队列Celery功能探究 - 知乎

管理 DAG 文件 — helm-chart 文档 - Airflow 工作流管理平台

Executor — Airflow 文档 - Airflow 工作流管理平台

Configuration and defaults — Celery 5.5.2 documentation

Code search results

Using the Command Line Interface — Airflow Documentation

Celery - Distributed Task Queue — Celery 5.5.2 documentation

任务队列神器:Celery 入门到进阶指南-腾讯云开发者社区-腾讯云

Optimizing — Celery 5.5.2 documentation

Concurrency — Celery 5.5.2 documentation

Configuration and defaults — Celery 5.5.2 documentation

Kubeflow-K8S的机器学习工具包,太牛了!

从零搭建机器学习平台Kubeflow-腾讯云开发者社区-腾讯云

airflow/chart/docs/production-guide.rst at 570e0b9eceef21c061277163c409adba81b9033a · apache/airflow

Backends and Brokers — Celery 5.5.2 documentation

Using Redis — Celery 5.5.2 documentation

Using RabbitMQ — Celery 5.5.2 documentation

Airflow Workers Trying to Create Pods in Default Namespace · apache/airflow · Discussion #29619

[Solved] pods is forbidden: User system:serviceaccount cannot list resource

Airflow 3.0 - Airflow - Apache Software Foundation

Multiple LocalExecutors on HA Scheduler deployment? · apache/airflow · Discussion #26564

Apache Airflow® Executors | Astronomer Documentation

Task execution failure with multiple executors · Issue #48667 · apache/airflow

Change default executor in pod template to support executor parameter in task (re-uploaded) by ihnokim · Pull Request #49433 · apache/airflow

airflow/chart/files/pod-template-file.kubernetes-helm-yaml at main · apache/airflow

airflow/airflow/example_dags/example_kubernetes_executor.py at v2-10-stable · apache/airflow

Kubernetes Executor — apache-airflow-providers-cncf-kubernetes Documentation

airflow/chart/files/pod-template-file.kubernetes-helm-yaml at main · apache/airflow

airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py at 3b78c6a72ac17001aff437dfc7300217b728d728 · apache/airflow

airflow/providers/cncf/kubernetes/docs/kubernetes_executor.rst at 3b78c6a72ac17001aff437dfc7300217b728d728 · apache/airflow

  • Title: airflow optimizing
  • Author: Ethereal
  • Created at: 2025-05-08 17:21:54
  • Updated at: 2025-05-27 15:29:35
  • Link: https://ethereal-o.github.io/2025/05/08/airflow-optimizing/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments