airflow optimizing
1. 调度优化
1.1 概念理解
1.1.1 Executor
是airflow部署的模式,更改后必须重启airflow才可以生效。
本地Executor
- Airflow 任务在调度器进程内部本地运行。
优点: 非常易于使用,速度快,延迟极低,设置要求少。
缺点: 功能有限,并与 Airflow 调度器共享资源。
- Airflow 任务在调度器进程内部本地运行。
远程Executor
队列/批处理 Executor
-
在k8s中,会部署一个worker节点和一个redis节点(作为broker),scheduler只管推送至queue中,然后worker轮询方式获取任务。
scheduler会轮询检查DAG状态,将上一个已经完成的任务标记为完成并启动下一个任务的执行。
因此,在启动任务前,存在以下步骤:
parser分析DAG流图(主要因素:分析)
scheduler轮询检查是否有可以开始的任务,并放入queue内(主要时间因素:轮询)
queue通过数据库锁更新DAG状态(主要时间因素:锁)
worker轮询检查是否有新的任务可以被执行(主要时间因素:轮询)
获取任务(主要时间因素:下载任务)
启动执行
向数据库通知任务完成,更新状态
-
- 通过docker执行任务
EdgeExecutor (实验性预发布)
- 涉及边设备
-
容器化 Executor
-
通过Kubernetes来调度任务,所有的任务都被拆分成pod粒度
必须关闭flower, workers和redis
有用的场景
有长时间运行的任务时,如果在任务运行时进行部署,任务将一直运行直到完成(或超时等)。但使用 CeleryExecutor,如果你设置了宽限期,任务只会运行到宽限期结束,届时任务将被终止。
任务在资源需求或镜像方面不是很统一
-
- Amazon Elastic Container Service
CeleryKubenetesExecutor
- celery和kubenetes的结合,根据queue决定是使用celery还是Kubenetes
-
并行使用多个Executor
参数设置:
- executor: CeleryKubernetesExecutor(helm内)(已经废弃)
新的方法:
- executor: “CeleryExecutor,KubernetesExecutor”
1.1.2 Parser
解析并更新DAG设置
参数设置
- parsing_processes(AIRFLOW__DAG_PROCESSOR__PARSING_PROCESSES):DAG processor 可以并行运行多个进程来解析 dags。这定义了将运行的进程数量。
1.1.3 Scheduler
参数设置:
[core] parallelism(AIRFLOW__CORE__PARALLELISM):这定义了每个 Airflow 调度器可以同时运行的最大任务实例数量,与 worker 数量无关。通常,此值乘以集群中调度器的数量,就是元数据数据库中处于 running 状态的任务实例的最大数量。该值必须大于或等于 1。
[scheduler] dag_stale_not_seen_duration(AIRFLOW__SCHEDULER__DAG_STALE_NOT_SEEN_DURATION):经过此秒数后,未被 Dag 处理器更新的 DAG 将被停用。
[scheduler] max_dagruns_per_loop_to_schedule(AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE):调度程序在调度和排队任务时应检查(并锁定)多少个 DagRun。
[scheduler] max_dagruns_to_create_per_loop(AIRFLOW__SCHEDULER__MAX_DAGRUNS_TO_CREATE_PER_LOOP):每个调度程序循环中为多少个 DAG 创建 DagRun 的最大数量。
[scheduler] max_tis_per_query(AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY):此项确定在每个调度程序循环中评估多少个任务实例进行调度。将其设置为 0 以使用
[core] parallelism
的值。[scheduler] num_runs(AIRFLOW__SCHEDULER__NUM_RUNS):尝试调度每个 DAG 文件的次数 -1 表示无限次数。
[scheduler] orphaned_tasks_check_interval(AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL):调度程序应多久(以秒为单位)检查一次孤立任务和 SchedulerJobs。
[scheduler] scheduler_heartbeat_sec(AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC):调度器不断尝试触发新任务(有关更多信息,请参阅文档中的调度器部分)。这定义了调度器应多久运行一次(以秒为单位)。
[scheduler] scheduler_idle_sleep_time(AIRFLOW__SCHEDULER__SCHEDULER_IDLE_SLEEP_TIME):控制调度器在循环之间休眠多长时间,但前提是循环中没有可做的事情。也就是说,如果它调度了某个任务,那么它将立即开始下一个循环迭代。
[scheduler] task_instance_heartbeat_sec(AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT):任务在被重试或设置为失败之前可以处于排队状态的时间量。
1.1.3 Broker
Broker类型 | 特点 | 适用场景 |
---|---|---|
RabbitMQ | 高可靠、支持AMQP协议、功能丰富(如优先级队列) | 生产环境首选,对消息可靠性要求高 |
Redis | 简单轻量、性能高、支持Pub/Sub模式 | 中小规模集群,对性能要求较高 |
其他 | 如Amazon SQS、Kafka(需定制) | 特殊场景(如云原生、大数据架构) |
1.1.4 Wroker
参数设置:没有celery本身参数?celery中存在参数bundle_refresh_check_interval指定轮询间隔:猜测以下参数类似:min_heartbeat_interval
- [worker] min_heartbeat_interval(AIRFLOW__WORKERS__MIN_HEARTBEAT_INTERVAL):工作节点与 API 服务器检查任务实例心跳状态以确认其仍然处于活动状态的最小间隔(以秒为单位)。
1.2 测试分析
通过打印时间发现主要分成两部分:
从下发命令到排队时间(取决于scheduler轮询并开始下发新任务间隔)
- 通过缩短心跳时间可以减少结束到queue时间:scheduler_heartbeat_sec
从排队到执行时间(取决于worker启动新任务间隔)
- queue时间还没有发现更改方式(AIRFLOW__WORKERS__MIN_HEARTBEAT_INTERVAL没有效果)
1.3 尝试优化
1.3.1 优化方式
缩短轮询时间,增加并行度
1 | AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 1 |
1.3.2 运行日志
打印时间
1 | [2025-05-08 10:40:28,959: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[cdb45479-493d-4328-ae0c-5cb793c76a62] received |
1 | [2025-05-08, 10:41:04 UTC] {logging_mixin.py:188} INFO - 1746700864.6873255 |
新任务进入排队(10:41:03,570)->worker开始执行(10:41:03,923)->真正开始执行(10:41:04)
1 | [2025-05-09 05:19:09,662: INFO/MainProcess] Task airflow.providers.celery.executors.celery_executor_utils.execute_command[34b1c9f8-106f-4a57-b5db-e5372fd4d586] received |
1 | [2025-05-09, 05:19:10 UTC] {logging_mixin.py:188} INFO - 1746767950.8249474 |
1 | airflow tasks run download_bing save_task manual__2025-05-09T05:19:09.480045+00:00 |
1.3.3 修改Broker
rabbitmq时间损耗:
加入环境变量优化前
- 1747293379.924484->1747293381.4033878(1.479)
加入环境变量优化后
1747294250.2575343->1747294251.8361712(1.579)
1747294511.4867296->1747294513.37435(1.888)
redis时间损耗:
加入环境变量优化前
1747296261.2109053->1747296263.5157278(2.305)
1747295348.6995502->1747295350.9857132(2.286)
加入环境变量优化后
1747295391.1667778->1747295393.4209657(2.26)
1747296451.4672449->1747296453.208179(1.741)
1.3.4 使用KubernetesPodOperator
使用官方版本helm,并且注意要将配置修改:
1 | multiNamespaceMode: true |
编写如下脚本:
1 | from datetime import datetime |
使用KubernetesPodOperator消耗:
前一个输出:1747749304.166512
后一个输出(KubernetesPodOperator):2025-05-20 13:55:07.802
大约耗费:3.636s
1.3.5 使用多Executor并存:指定KubernetesExecutor
配置:
1 | 多execotor并存 |
使用以下方式指定:
1 | b = BashOperator( |
但是目前还无法看到log,所以无法计时。
1.3.6 使用pod_override
dag如下:
1 | from datetime import datetime |
如果container只剩一个,那么就会一直在排队状态(可以执行,但是无法把完成写入到数据库中)
base名字是指worker container,并且必须置于第一个,如果这里有image,那么就会覆盖。不需要设定其他任何参数。
1.3.7 调用链
task_run(airflow/cli/commands/task_command.py)
-> 1. get_dag(airflow/utils/cli.py) 2. DAG::get_task(airflow/models/dag.py) 3. _get_ti(airflow/cli/commands/task_command.py) 4. TaskInstance::init_run_context(airflow-core/src/airflow/models/taskinstance.py)
-> 4. TaskInstance::get_template_context(airflow/models/taskinstance.py)
-> 4. integrate_macros_plugins(airflow/plugins_manager.py)
-> 5. reconfigure_orm(airflow/settings.py)
-> 6. _run_raw_task(airflow/models/taskinstance.py)
观察发现可能是以下几个函数调用时间过长:
DAG::get_task(airflow/models/dag.py)
_get_ti(airflow/cli/commands/task_command.py)
_run_raw_task(airflow/models/taskinstance.py)
1 | [2025-05-10T04:49:18.554+0000] {cli_action_loggers.py:85} DEBUG - Calling callbacks: [] |
都是执行程序结束后的退出时的调用
1.4 对比选型
1.4.1 argo
优点
- 支持更复杂的工作流设计,包括循环、递归和条件逻辑,适合需要高度控制和灵活性的复杂任务。其工作流以 YAML 或 JSON 格式定义
缺点
- 使用 Kubernetes 的 CronJob 来调度工作流,调度能力依赖于 Kubernetes
1.4.2 kubeflow
优点
Kubeflow 在 机器学习、分布式计算、资源密集型任务 中更快。因为依赖于k8s,可以更好利用资源。
- Airflow 在 轻量级任务、短时任务调度、非容器化环境 中更快。
提供很多ML相关包,例如类似jupter notebook的kubeflow notebooks
其中kubeflow pipeline基于argo实现,相比于argo能有更多ML内容,增强了持久化层。
缺点
- 同样需要每个任务启动新的Pod
- kubeflow也不支持定义循环
1.5 Celery优化
1.5.1 Broker Connection Pools(连接池数目)
1.5.2 Using Transient Queues(不需要持久化的队列)
by using
task_routes
:1
task_routes = { 'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}}
1.5.3 Prefetch Limits(预取数量,对于任务执行时间短的可以设置较大数值)
1.5.4 Reserve one task at a time(留存任务)
1 | task_acks_late = True |
1.5.5 其他概念
1.5.5.1 定时任务(Periodic Tasks)
通过celery beat调度程序调度
1.5.5.2 应用(Application)
类似flask
1.5.5.3 队列(Queue)
指定任务调度的队列Routing options
1 | add.apply_async(queue='priority.high') |
1.5.5.4 任务(Tasks)
优化建议:
一般来说,最好将问题分解为许多小任务
最好的办法是在内存中有一份副本
注意竞争
1.5.5.5 工作节点(Worker)
并发:默认为机器上的CPU数量->k8s环境中指Pod分配到的CPU数量
自动缩放:
--autoscale
选项并发控制
prefork:默认选项,非常适合CPU受限的任务和大多数用例。
eventlet和gevent:这些模型专为IO受限的任务而设计,使用greenlets实现高并发性。请注意,某些功能,如soft_timeout,在这些模式下不可用。
solo:在主线程中按顺序执行任务。
threads:利用线程实现并发。
custom:允许通过环境变量指定自定义工作池实现。
1.5.5.6 缓存池
1 | cache_backend_options = { |
1.5.5.7 详细配置
Configuration and defaults — Celery 5.5.2 documentation
参考
配置参考 — Airflow 文档 - Airflow 工作流管理平台
参数 — Airflow 文档 - Airflow 工作流管理平台
调度器 — Airflow 文档 - Airflow 工作流管理平台
Celery Executor — apache-airflow-providers-celery 文档 - Airflow 工作流管理平台
Airflow Celery Executor 架构与任务执行过程-CSDN博客
CeleryKubernetes 执行器 — apache-airflow-providers-celery 文档 - Airflow 工作流管理平台
Celery 执行器命令 — apache-airflow-providers-celery 文档 - Airflow 工作流管理平台
Celery Executor — apache-airflow-providers-celery Documentation
管理 DAG 文件 — helm-chart 文档 - Airflow 工作流管理平台
Executor — Airflow 文档 - Airflow 工作流管理平台
Configuration and defaults — Celery 5.5.2 documentation
Using the Command Line Interface — Airflow Documentation
Celery - Distributed Task Queue — Celery 5.5.2 documentation
任务队列神器:Celery 入门到进阶指南-腾讯云开发者社区-腾讯云
Optimizing — Celery 5.5.2 documentation
Concurrency — Celery 5.5.2 documentation
Configuration and defaults — Celery 5.5.2 documentation
从零搭建机器学习平台Kubeflow-腾讯云开发者社区-腾讯云
airflow/chart/docs/production-guide.rst at 570e0b9eceef21c061277163c409adba81b9033a · apache/airflow
Backends and Brokers — Celery 5.5.2 documentation
Using Redis — Celery 5.5.2 documentation
Using RabbitMQ — Celery 5.5.2 documentation
Airflow Workers Trying to Create Pods in Default Namespace · apache/airflow · Discussion #29619
[Solved] pods is forbidden: User system:serviceaccount cannot list resource
Airflow 3.0 - Airflow - Apache Software Foundation
Multiple LocalExecutors on HA Scheduler deployment? · apache/airflow · Discussion #26564
Apache Airflow® Executors | Astronomer Documentation
Task execution failure with multiple executors · Issue #48667 · apache/airflow
airflow/chart/files/pod-template-file.kubernetes-helm-yaml at main · apache/airflow
airflow/airflow/example_dags/example_kubernetes_executor.py at v2-10-stable · apache/airflow
Kubernetes Executor — apache-airflow-providers-cncf-kubernetes Documentation
airflow/chart/files/pod-template-file.kubernetes-helm-yaml at main · apache/airflow
- Title: airflow optimizing
- Author: Ethereal
- Created at: 2025-05-08 17:21:54
- Updated at: 2025-05-27 15:29:35
- Link: https://ethereal-o.github.io/2025/05/08/airflow-optimizing/
- License: This work is licensed under CC BY-NC-SA 4.0.