migrating airflow to argo

Ethereal Lv4

1. 迁移动机

  1. 分布式管理工作流(不需要在一个python文件中定义)

  2. 易于复制部署

2. 迁移缺点

  1. 必须依赖于kubenetes,必须启动pod,不利于短时间任务的执行

3. 迁移步骤

  1. 数据迁移

  2. 工作流迁移

    1. 任务迁移

    2. 任务关系迁移

  3. 重新设定定时任务(利用K8s CronJob)

    1. 或者使用Argo的Cron WorkflowsCron Workflows - Argo Workflows - The workflow engine for Kubernetes

4. 迁移注意事项

4.1 获取触发时间

由于api不同,在任务中使用环境变量获取时间是不一样的。argo可以通过以下方法获取:

1
{{= sprig.dateModify('-168h', sprig.toDate('2006-01-02T15:04:05Z07:00', sprig.date('2006-01-02T15:04:05Z07:00', workflow.creationTimestamp))) }}

4.2 任务定义

由于每个任务都需要使用yaml文件定义,所以可能需要重复写非常多的yaml文件。可以使用模板或内联模板来复用(Inline Templates - Argo Workflows - The workflow engine for Kubernetes

4.3 数据迁移

可以使用DBT工具转换。

问题:

  • RDDs失败
  • Python models不支持Incremental tables
    • Use a Python model for the latest table and a SQL model for the incremental

4.4 其他注意事项

  • 不能与Kubeflow并存

5. 迁移示例

5.1 原airflow代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from datetime import datetime
import time
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from kubernetes.client import models as k8s
import os
with DAG(
dag_id='download_bing',
tags=['test'], # DAG的tag标签,用于分类
) as dag:
def request():
print('hello')
print(time.time())
return ''
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp


def save():
print('hello2')
print(time.time())
return
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)

# 使用PythonOperator执行Python函数
request_task = PythonOperator(
task_id="request_task",
python_callable=request
)

save_task = PythonOperator(
task_id="save_task",
executor="KubernetesExecutor",
python_callable=save
)

k = KubernetesPodOperator(
task_id="example_task",
name="example_pod",
namespace="default",
executor="KubernetesExecutor",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["date +'%Y-%m-%d %H:%M:%S.%3N'"],
labels={"example": "label"},
get_logs=True,
is_delete_operator_pod=True,
)

b = BashOperator(
task_id="my_task_in_its_own_pod",
executor="KubernetesExecutor",
bash_command="date +'%Y-%m-%d %H:%M:%S.%3N'",
)

executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=["date +'%Y-%m-%d %H:%M:%S.%3N' > /shared/test.txt"],
command=["bash", "-cx"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
],
)
),
}

@task(executor="KubernetesExecutor",executor_config=executor_config_sidecar)
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
print("hello world")
print(time.time())
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
else:
with open("/shared/test.txt", 'r') as f:
print(f.readlines())
except ValueError as e:
if i > 4:
raise e

sidecar_task = test_sharedvolume_mount()

request_task >> [save_task, sidecar_task, b, k]

# save_task >> [sidecar_task, b, k]

5.2 argo工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: download-bing-
spec:
entrypoint: main
volumes:
- name: shared-empty-dir
emptyDir: {}

templates:
- name: main
steps:
- - name: request-task
template: request-task

- - name: parallel-tasks
template: parallel-tasks

- name: parallel-tasks
steps:
- - name: save-task
template: save-task
- name: sidecar-task
template: sidecar-task
- name: my-task-in-its-own-pod
template: bash-task
- name: example-task
template: k8s-pod-task

# 请求任务
- name: request-task
container:
image: python:3.9-slim
command: [python, -c]
args:
- |
import time
print('hello')
print(time.time())
# 实际请求代码已注释
# import requests
# resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
# print(resp.text)

# 保存任务
- name: save-task
container:
image: python:3.9-slim
command: [python, -c]
args:
- |
import time
print('hello2')
print(time.time())
# 实际保存代码已注释
# with open("/results.txt", "a+", encoding="utf-8") as f:
# f.write("sample text")

# Bash 任务
- name: bash-task
container:
image: ubuntu
command: [bash, -c]
args: ["date +'%Y-%m-%d %H:%M:%S.%3N'"]

# Kubernetes Pod 任务
- name: k8s-pod-task
container:
image: ubuntu
command: [bash, -cx]
args: ["date +'%Y-%m-%d %H:%M:%S.%3N'"]

# Sidecar 任务
- name: sidecar-task
script:
image: python:3.9-slim
command: [python]
source: |
import os
import time
print("hello world")
print(time.time())
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
else:
with open("/shared/test.txt", 'r') as f:
print(f.readlines())
except ValueError as e:
if i > 4:
raise e
volumeMounts:
- name: shared-empty-dir
mountPath: /shared
sidecars:
- name: sidecar
image: ubuntu
command: [bash, -cx]
args: ["date +'%Y-%m-%d %H:%M:%S.%3N' > /shared/test.txt"]
volumeMounts:
- name: shared-empty-dir
mountPath: /shared

5.3 argo定时工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: download-bing-cron # 定时任务名称
spec:
schedule: "0 * * * *" # 每小时执行一次(根据需求调整)
concurrencyPolicy: Replace # 若上次任务未完成则替换
startingDeadlineSeconds: 60 # 超时设置
workflowSpec:
entrypoint: main
volumes:
- name: shared-empty-dir
emptyDir: {}
templates:
- name: main
steps:
- - name: request-task
template: request-task
- - name: parallel-tasks
template: parallel-tasks

- name: parallel-tasks
steps:
- - name: save-task
template: save-task
- name: sidecar-task
template: sidecar-task
- name: my-task-in-its-own-pod
template: bash-task
- name: example-task
template: k8s-pod-task

# 请求任务
- name: request-task
container:
image: python:3.9-slim
command: [python, -c]
args:
- |
import time
print('hello')
print(time.time())
# 实际请求代码已注释
# import requests
# resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
# print(resp.text)

# 保存任务
- name: save-task
container:
image: python:3.9-slim
command: [python, -c]
args:
- |
import time
print('hello2')
print(time.time())
# 实际保存代码已注释
# with open("/results.txt", "a+", encoding="utf-8") as f:
# f.write("sample text")

# Bash 任务
- name: bash-task
container:
image: ubuntu
command: [bash, -c]
args: ["date +'%Y-%m-%d %H:%M:%S.%3N'"]

# Kubernetes Pod 任务
- name: k8s-pod-task
container:
image: ubuntu
command: [bash, -cx]
args: ["date +'%Y-%m-%d %H:%M:%S.%3N'"]

# Sidecar 任务
- name: sidecar-task
script:
image: python:3.9-slim
command: [python]
source: |
import os
import time
print("hello world")
print(time.time())
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
else:
with open("/shared/test.txt", 'r') as f:
print(f.readlines())
except ValueError as e:
if i > 4:
raise e
volumeMounts:
- name: shared-empty-dir
mountPath: /shared
sidecars:
- name: sidecar
image: ubuntu
command: [bash, -cx]
args: ["date +'%Y-%m-%d %H:%M:%S.%3N' > /shared/test.txt"]
volumeMounts:
- name: shared-empty-dir
mountPath: /shared

5.4 模板

5.4.1 定义模板后引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: shared-templates
spec:
templates:
- name: common-task
container:
image: python:3.8
command: [python, -c]
args: ["print('{{inputs.parameters.message}}')"]
---
apiVersion: argoproj.io/v1alpha1
kind: Workflow
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: use-shared-template
templateRef:
name: shared-templates # 模板名称
template: common-task # 模板内的任务名
arguments:
parameters:
- name: message
value: "Reused template!"

注意:kubeflow不支持此模板配置方式

5.4.2 直接定义模板与工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: inline-template-example-
spec:
entrypoint: main-dag
templates:
- name: main-dag
dag:
tasks:
- name: step1
template: say-hello
arguments:
parameters:
- name: message
value: "Hello from Step 1!"
- name: step2
template: say-hello
arguments:
parameters:
- name: message
value: "Hello from Step 2!"
depends: step1

# 内联模板定义 (直接在 workflow 中声明)
- name: say-hello
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo '{{inputs.parameters.message}}' && sleep 1"] # 输出参数并等待1秒

注意:kubeflow只支持此模板配置方式

5.4.3 内联模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-inline-
labels:
workflows.argoproj.io/test: "true"
annotations:
workflows.argoproj.io/description: |
This workflow demonstrates running a steps with inline templates.
workflows.argoproj.io/version: ">= 3.2.0"
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: a
inline:
container:
image: argoproj/argosay:v2

注意:kubeflow不支持此模板配置方式

参考

探索数据处理管道到 Argo 工作流的迁移 - freee Developers Hub

Inline Templates - Argo Workflows - The workflow engine for Kubernetes

From Airflow to Argo Workflows and dbt Python models | Mercari Engineering

将数据管道迁移到 Argo 工作流的故事 - enechain 技术博客

Kubeflow 与其他选项 | 那些遇到过的问题

Argo Workflows - The workflow engine for Kubernetes

Airflow to Argo migration at Github

云原生离线工作流编排利器 – 分布式工作流 Argo 集群 - OSCHINA - 中文开源技术交流社区

Cron Workflows - Argo Workflows - The workflow engine for Kubernetes

在不同的argo工作流中重用模板/步骤 - 腾讯云开发者社区 - 腾讯云

ArgoWorkflow教程(五)—Workflow 的多种触发模式:手动、定时任务与事件触发 -

blog.gitcode.com/7dcffb189bd35f4c9c4a5be14ffc80fa.html

  • Title: migrating airflow to argo
  • Author: Ethereal
  • Created at: 2025-05-29 15:31:04
  • Updated at: 2025-05-29 18:12:04
  • Link: https://ethereal-o.github.io/2025/05/29/migrating-airflow-to-argo/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments