RDMA learning

Ethereal Lv5

1. 安装环境

  1. 安装扩展内核模块
1
sudo apt install linux-modules-extra-$(uname -r)
  1. 安装rdma

RDMA:Soft-RoCE 环境搭建实验

  1. 运行程序

async-ucx/.github/workflows/main.yml at main · madsys-dev/async-ucx

1
2
3
4
5
6
7
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
. "$HOME/.cargo/env"
sudo apt-get -y install pkg-config gcc g++ autoconf
sudo apt-get -y install clang
sudo apt-get -y install ucx-utils libucx-dev
cargo build --all-features --all-targets
cargo test --all-features

01/service.c · wq/RDMA-EXAMPLE - 码云 - 开源中国

1
2
./async-ucx/target/debug/examples/rma
./async-ucx/target/debug/examples/rma

2. 单边与双边

RDMA单边和双边操作在代码编写上有显著差异,主要体现在操作方式、内存管理和通信模式等方面。

📊 核心差异对比

特性 单边操作(One-Sided) 双边操作(Two-Sided)
远程CPU参与 不需要 必需
操作类型 RDMA Write/Read/Atomic SEND/RECV
内存管理 需要交换内存元数据 不需要内存元数据交换
通信模式 单向直接内存访问 双向消息传递
性能 延迟更低 延迟较高
复杂性 设置更复杂 相对简单

🔧 代码层面的具体差异

1. 操作类型和语义差异

双边操作(Two-Sided) - SEND/RECV
1
2
3
4
5
6
7
8
9
10
11
12
13
// 发送方 - 发布SEND操作
struct ibv_send_wr send_wr;
memset(&send_wr, 0, sizeof(send_wr));
send_wr.wr_id = 1;
send_wr.opcode = IBV_WR_SEND; // 发送操作
send_wr.send_flags = IBV_SEND_SIGNALED;
// ... 设置sg_list等

// 接收方 - 必须发布RECV操作来匹配
struct ibv_recv_wr recv_wr;
memset(&recv_wr, 0, sizeof(recv_wr));
recv_wr.wr_id = 2;
// ... 设置接收缓冲区
单边操作(One-Sided) - RDMA WRITE
1
2
3
4
5
6
7
8
9
// 发起方 - RDMA WRITE操作
struct ibv_send_wr send_wr;
memset(&send_wr, 0, sizeof(send_wr));
send_wr.wr_id = 1;
send_wr.opcode = IBV_WR_RDMA_WRITE; // 单边写入操作
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.wr.rdma.remote_addr = remote_addr; // 远程内存地址
send_wr.wr.rdma.rkey = remote_rkey; // 远程内存密钥
// ... 不需要远程端发布任何操作

2. 内存管理差异

双边操作 - 简单的缓冲区注册
1
2
3
4
5
6
7
8
// 双边操作只需要注册本地使用的内存
char* send_buffer = new char[BUFFER_SIZE];
char* recv_buffer = new char[BUFFER_SIZE];

struct ibv_mr* send_mr = ibv_reg_mr(pd, send_buffer, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE);
struct ibv_mr* recv_mr = ibv_reg_mr(pd, recv_buffer, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE);
单边操作 - 需要内存元数据交换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 单边操作需要交换内存元数据
struct MemoryMetadata {
uint64_t address; // 内存地址
uint32_t rkey; // 远程密钥
uint32_t length; // 内存长度
};

// 服务器注册并发布内存供客户端写入
char* server_buffer = new char[BUFFER_SIZE];
struct ibv_mr* server_mr = ibv_reg_mr(pd, server_buffer, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE); // 允许远程写入

// 通过双边操作交换内存元数据
MemoryMetadata server_metadata;
server_metadata.address = (uint64_t)server_buffer;
server_metadata.rkey = server_mr->rkey;
server_metadata.length = BUFFER_SIZE;

// 发送内存元数据给对端
send_metadata_to_peer(server_metadata);

3. 完整的代码示例对比

双边操作完整示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 双边操作 - 完整的SEND/RECV流程
class TwoSidedRDMA {
private:
struct ibv_qp* qp;
struct ibv_mr* send_mr, *recv_mr;
char* send_buf, *recv_buf;

public:
void setup() {
// 注册内存 - 相对简单
send_buf = new char[BUFFER_SIZE];
recv_buf = new char[BUFFER_SIZE];

send_mr = ibv_reg_mr(pd, send_buf, BUFFER_SIZE, IBV_ACCESS_LOCAL_WRITE);
recv_mr = ibv_reg_mr(pd, recv_buf, BUFFER_SIZE, IBV_ACCESS_LOCAL_WRITE);
}

void post_receive() {
// 必须预先发布接收请求
struct ibv_recv_wr wr, *bad_wr;
struct ibv_sge sge;

sge.addr = (uintptr_t)recv_buf;
sge.length = BUFFER_SIZE;
sge.lkey = recv_mr->lkey;

wr.wr_id = 0;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;

ibv_post_recv(qp, &wr, &bad_wr);
}

void send_message(const char* message) {
// 复制数据到发送缓冲区
strcpy(send_buf, message);

struct ibv_send_wr wr, *bad_wr;
struct ibv_sge sge;

sge.addr = (uintptr_t)send_buf;
sge.length = strlen(message) + 1;
sge.lkey = send_mr->lkey;

wr.wr_id = 1;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND; // 双边发送操作
wr.send_flags = IBV_SEND_SIGNALED;

ibv_post_send(qp, &wr, &bad_wr);
}
};
单边操作完整示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 单边操作 - 完整的RDMA WRITE流程
class OneSidedRDMA {
private:
struct ibv_qp* qp;
struct ibv_mr* local_mr, *remote_mr;
char* local_buf, *remote_buf;
MemoryMetadata remote_metadata; // 需要存储远程内存信息

public:
void setup() {
// 注册本地内存
local_buf = new char[BUFFER_SIZE];
local_mr = ibv_reg_mr(pd, local_buf, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE);

// 在实际应用中,需要通过其他方式获取远程内存信息
exchange_memory_metadata();
}

void exchange_memory_metadata() {
// 这是关键差异:需要交换内存元数据
// 通常使用双边操作或带内数据来完成

// 注册供远程访问的内存
remote_buf = new char[BUFFER_SIZE];
remote_mr = ibv_reg_mr(pd, remote_buf, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);

// 创建并发送内存元数据
MemoryMetadata my_metadata;
my_metadata.address = (uint64_t)remote_buf;
my_metadata.rkey = remote_mr->rkey;
my_metadata.length = BUFFER_SIZE;

// 使用双边操作发送元数据
send_metadata_via_two_sided(my_metadata);

// 接收远程内存元数据
remote_metadata = receive_metadata_via_two_sided();
}

void rdma_write(const char* data) {
// 复制数据到本地缓冲区
strcpy(local_buf, data);

struct ibv_send_wr wr, *bad_wr;
struct ibv_sge sge;

sge.addr = (uintptr_t)local_buf;
sge.length = strlen(data) + 1;
sge.lkey = local_mr->lkey;

wr.wr_id = 1;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE; // 单边写入操作
wr.send_flags = IBV_SEND_SIGNALED;

// 关键:设置远程内存信息
wr.wr.rdma.remote_addr = remote_metadata.address;
wr.wr.rdma.rkey = remote_metadata.rkey;

ibv_post_send(qp, &wr, &bad_wr);
}

// 检查远程端写入的数据
const char* get_remote_data() {
return remote_buf; // 直接访问本地映射的缓冲区
}
};

4. 连接建立和QP设置的差异

双边操作的QP访问标志
1
2
// 双边操作 - 相对简单的访问权限
qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE;
单边操作的QP访问标志
1
2
3
4
// 单边操作 - 需要更广泛的访问权限
qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_WRITE;

5. 完成事件处理的差异

双边操作完成处理
1
2
3
4
5
6
7
8
9
10
11
void handle_two_sided_completion(struct ibv_wc* wc) {
if (wc->opcode == IBV_WC_RECV) {
// 接收到消息
std::cout << "接收到消息,长度: " << wc->byte_len << std::endl;
// 需要重新发布RECV请求
post_receive();
} else if (wc->opcode == IBV_WC_SEND) {
// 发送完成
std::cout << "发送操作完成" << std::endl;
}
}
单边操作完成处理
1
2
3
4
5
6
7
8
9
10
void handle_one_sided_completion(struct ibv_wc* wc) {
if (wc->opcode == IBV_WC_RDMA_WRITE) {
// RDMA写入完成
std::cout << "RDMA写入操作完成" << std::endl;
} else if (wc->opcode == IBV_WC_RDMA_READ) {
// RDMA读取完成,数据现在在本地缓冲区
std::cout << "RDMA读取操作完成,数据已就绪" << std::endl;
}
// 注意:远程端不会有完成事件,因为其CPU未参与
}

🎯 选择指南

使用双边操作的场景:

  • 需要明确的消息确认
  • 通信模式类似于传统网络编程
  • 远程端需要知道每次通信的发生
  • 开发复杂度要求较低

使用单边操作的场景:

  • 对延迟和CPU开销极其敏感
  • 需要频繁的大数据量传输
  • 远程端不需要知道每次内存访问
  • 可以实现更复杂的通信模式(如分布式共享内存)

💡 实际建议

  1. 从双边操作开始:如果你刚开始学习RDMA,建议先掌握双边操作
  2. 渐进式迁移:在理解双边操作的基础上,逐步过渡到单边操作
  3. 混合使用:在实际应用中,经常混合使用两种模式:
    • 使用双边操作交换控制信息和内存元数据
    • 使用单边操作进行高效的数据传输

理解这些差异有助于你根据具体应用需求选择合适的RDMA操作模式。

3. 双边

RDMA(远程直接内存访问)技术允许计算机直接访问另一台计算机的内存,而无需对方处理器的参与,从而显著降低通信延迟并提高吞吐量。下面是一个基于RDMA的SEND/RECV操作模式的简单示例,它演示了如何建立连接并进行基本的数据传输。

🔧 RDMA通信基础示例

以下C++代码示例展示了使用RDMA进行数据传输的基本框架。请注意,这只是一个简化示例,实际应用中需要更完善的错误处理和资源管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <rdma/rdma_cma.h>

#define MAX_MSG_SIZE 1024

// RDMA连接上下文
struct RDMAConnectionContext {
struct ibv_context* context;
struct ibv_pd* protection_domain;
struct ibv_cq* completion_queue;
struct ibv_comp_channel* comp_channel;
struct ibv_qp* queue_pair;

// 内存区域
struct ibv_mr* send_mr;
struct ibv_mr* recv_mr;

// 缓冲区
char* send_region;
char* recv_region;

// 完成事件处理线程
pthread_t comp_thread;
};

// 完成事件处理线程
void* completion_thread(void* arg) {
RDMAConnectionContext* ctx = (RDMAConnectionContext*)arg;

while (true) {
struct ibv_cq* cq;
void* cq_context;

// 等待完成事件
if (ibv_get_cq_event(ctx->comp_channel, &cq, &cq_context) == 0) {
ibv_ack_cq_events(cq, 1);
ibv_req_notify_cq(cq, 0);

// 处理工作完成
struct ibv_wc wc;
int comp_count = ibv_poll_cq(cq, 1, &wc);

if (comp_count > 0) {
if (wc.status == IBV_WC_SUCCESS) {
if (wc.opcode & IBV_WC_RECV) {
std::cout << "接收到消息: " << ctx->recv_region << std::endl;

// 重新投递接收请求
struct ibv_recv_wr wr, *bad_wr;
struct ibv_sge sge;

memset(&wr, 0, sizeof(wr));
wr.wr_id = (uintptr_t)ctx->recv_region;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;

sge.addr = (uintptr_t)ctx->recv_region;
sge.length = MAX_MSG_SIZE;
sge.lkey = ctx->recv_mr->lkey;

ibv_post_recv(ctx->queue_pair, &wr, &bad_wr);
} else if (wc.opcode == IBV_WC_SEND) {
std::cout << "发送操作完成" << std::endl;
}
}
}
}
}
return NULL;
}

// 初始化RDMA连接上下文
RDMAConnectionContext* init_rdma_connection() {
RDMAConnectionContext* ctx = new RDMAConnectionContext();

// 获取RDMA设备列表
struct ibv_device** device_list = ibv_get_device_list(NULL);
if (!device_list) {
std::cerr << "未找到RDMA设备" << std::endl;
delete ctx;
return nullptr;
}

// 打开第一个设备
ctx->context = ibv_open_device(device_list[0]);
if (!ctx->context) {
std::cerr << "无法打开RDMA设备" << std::endl;
ibv_free_device_list(device_list);
delete ctx;
return nullptr;
}

// 分配保护域
ctx->protection_domain = ibv_alloc_pd(ctx->context);
if (!ctx->protection_domain) {
std::cerr << "无法分配保护域" << std::endl;
ibv_close_device(ctx->context);
ibv_free_device_list(device_list);
delete ctx;
return nullptr;
}

// 创建完成队列
ctx->comp_channel = ibv_create_comp_channel(ctx->context);
ctx->completion_queue = ibv_create_cq(ctx->context, 10, NULL, ctx->comp_channel, 0);

// 注册内存区域
ctx->send_region = new char[MAX_MSG_SIZE];
ctx->recv_region = new char[MAX_MSG_SIZE];

ctx->send_mr = ibv_reg_mr(ctx->protection_domain, ctx->send_region, MAX_MSG_SIZE,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ);
ctx->recv_mr = ibv_reg_mr(ctx->protection_domain, ctx->recv_region, MAX_MSG_SIZE,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ);

// 创建队列对
struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
qp_init_attr.send_cq = ctx->completion_queue;
qp_init_attr.recv_cq = ctx->completion_queue;
qp_init_attr.qp_type = IBV_QPT_RC;

qp_init_attr.cap.max_send_wr = 10;
qp_init_attr.cap.max_recv_wr = 10;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;

ctx->queue_pair = ibv_create_qp(ctx->protection_domain, &qp_init_attr);

// 启动完成事件处理线程
pthread_create(&ctx->comp_thread, NULL, completion_thread, ctx);
ibv_req_notify_cq(ctx->completion_queue, 0);

ibv_free_device_list(device_list);
return ctx;
}

// 发送消息
int rdma_send_message(RDMAConnectionContext* ctx, const char* message) {
if (strlen(message) >= MAX_MSG_SIZE) {
std::cerr << "消息过长" << std::endl;
return -1;
}

// 复制消息到发送区域
strcpy(ctx->send_region, message);

// 准备发送工作请求
struct ibv_send_wr wr, *bad_wr;
struct ibv_sge sge;

memset(&wr, 0, sizeof(wr));
wr.wr_id = (uintptr_t)ctx->send_region;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND;
wr.send_flags = IBV_SEND_SIGNALED;

sge.addr = (uintptr_t)ctx->send_region;
sge.length = strlen(message) + 1;
sge.lkey = ctx->send_mr->lkey;

return ibv_post_send(ctx->queue_pair, &wr, &bad_wr);
}

// 投递接收请求
int post_receive_request(RDMAConnectionContext* ctx) {
struct ibv_recv_wr wr, *bad_wr;
struct ibv_sge sge;

memset(&wr, 0, sizeof(wr));
wr.wr_id = (uintptr_t)ctx->recv_region;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;

sge.addr = (uintptr_t)ctx->recv_region;
sge.length = MAX_MSG_SIZE;
sge.lkey = ctx->recv_mr->lkey;

return ibv_post_recv(ctx->queue_pair, &wr, &bad_wr);
}

// 清理资源
void cleanup_rdma_connection(RDMAConnectionContext* ctx) {
if (!ctx) return;

pthread_cancel(ctx->comp_thread);
pthread_join(ctx->comp_thread, NULL);

if (ctx->queue_pair) ibv_destroy_qp(ctx->queue_pair);
if (ctx->send_mr) ibv_dereg_mr(ctx->send_mr);
if (ctx->recv_mr) ibv_dereg_mr(ctx->recv_mr);
if (ctx->completion_queue) ibv_destroy_cq(ctx->completion_queue);
if (ctx->comp_channel) ibv_destroy_comp_channel(ctx->comp_channel);
if (ctx->protection_domain) ibv_dealloc_pd(ctx->protection_domain);
if (ctx->context) ibv_close_device(ctx->context);

delete[] ctx->send_region;
delete[] ctx->recv_region;

delete ctx;
}

int main() {
std::cout << "初始化RDMA连接..." << std::endl;

RDMAConnectionContext* ctx = init_rdma_connection();
if (!ctx) {
std::cerr << "RDMA连接初始化失败" << std::endl;
return -1;
}

// 投递初始接收请求
if (post_receive_request(ctx) != 0) {
std::cerr << "无法投递接收请求" << std::endl;
cleanup_rdma_connection(ctx);
return -1;
}

std::cout << "RDMA连接已就绪" << std::endl;

// 演示发送消息
const char* test_message = "Hello, RDMA!";
if (rdma_send_message(ctx, test_message) == 0) {
std::cout << "消息已发送: " << test_message << std::endl;
} else {
std::cerr << "发送消息失败" << std::endl;
}

// 等待一段时间以接收可能的回复
sleep(5);

cleanup_rdma_connection(ctx);
std::cout << "RDMA连接已关闭" << std::endl;

return 0;
}

💻 编译与运行

要编译此代码,你需要安装RDMA开发库(在Ubuntu上通常是librdmacm-devlibibverbs-dev包)。然后使用以下命令编译:

1
g++ -std=c++11 -o rdma_demo rdma_demo.cpp -lrdmacm -libverbs -lpthread

🔍 代码关键点说明

  1. RDMA操作模式:此代码使用了SEND/RECV操作模式,这是RDMA的一种基本通信方式,要求发送和接收端都明确参与。

  2. 核心组件

    • 保护域(Protection Domain):隔离RDMA资源
    • 完成队列(Completion Queue):记录已完成的操作
    • 队列对(Queue Pair):包含发送和接收队列
    • 内存区域(Memory Region):注册用于RDMA操作的内存
  3. 数据传输:通过ibv_post_sendibv_post_recv函数提交发送和接收请求,由硬件异步处理。

🛠 实际应用考虑

在实际项目中,你可能还需要:

  • 实现队列对状态机管理(从RESET到INIT、RTR、RTS状态)
  • 添加更完善的错误处理和超时机制
  • 实现连接管理(使用rdma_cma API)
  • 考虑使用RDMA READ/WRITE操作以实现单边通信
  • 实施内存固定技术以提高性能

4. 单边

RDMA单边操作(One-Sided Operations)包括RDMA Write、RDMA Read和原子操作,这些操作不需要远程端的CPU参与。下面是一个使用RDMA Write的单边通信示例:

🔧 RDMA单边写入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <rdma/rdma_cma.h>
#include <rdma/rdma_verbs.h>
#include <arpa/inet.h>

#define BUFFER_SIZE 4096
#define PORT "12345"

// RDMA连接上下文
struct RDMAContext {
struct ibv_context* ctx;
struct ibv_pd* pd;
struct ibv_cq* cq;
struct ibv_qp* qp;
struct ibv_mr* mr;

char* buffer;
uint32_t rkey;
uint64_t remote_addr;
uint32_t remote_rkey;

struct rdma_event_channel* ec;
struct rdma_cm_id* id;
};

// 内存区域信息交换结构
struct MemoryInfo {
uint64_t addr;
uint32_t rkey;
uint32_t size;
};

// 完成事件处理
int poll_completion(RDMAContext* ctx) {
struct ibv_wc wc;
int ret;

do {
ret = ibv_poll_cq(ctx->cq, 1, &wc);
} while (ret == 0);

if (ret < 0) {
std::cerr << "轮询完成队列失败" << std::endl;
return -1;
}

if (wc.status != IBV_WC_SUCCESS) {
std::cerr << "工作请求失败,状态: " << wc.status << std::endl;
return -1;
}

return 0;
}

// 建立QP到RTS状态
int setup_qp(RDMAContext* ctx) {
struct ibv_qp_attr attr;
int flags;
int ret;

// 重置到INIT状态
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_INIT;
attr.port_num = 1;
attr.pkey_index = 0;
attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_WRITE;

flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS;
ret = ibv_modify_qp(ctx->qp, &attr, flags);
if (ret) {
std::cerr << "修改QP到INIT状态失败" << std::endl;
return ret;
}

// 转换到RTR状态
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = IBV_MTU_1024;
attr.dest_qp_num = ctx->qp->qp_num;
attr.rq_psn = 0;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 12;

attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = 1; // 在实际系统中需要正确设置
attr.ah_attr.sl = 0;
attr.ah_attr.src_path_bits = 0;
attr.ah_attr.port_num = 1;

flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER;

ret = ibv_modify_qp(ctx->qp, &attr, flags);
if (ret) {
std::cerr << "修改QP到RTR状态失败" << std::endl;
return ret;
}

// 转换到RTS状态
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.timeout = 14;
attr.retry_cnt = 7;
attr.rnr_retry = 7;
attr.sq_psn = 0;
attr.max_rd_atomic = 1;

flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;

ret = ibv_modify_qp(ctx->qp, &attr, flags);
if (ret) {
std::cerr << "修改QP到RTS状态失败" << std::endl;
return ret;
}

return 0;
}

// 服务器端代码
void run_server() {
RDMAContext ctx = {0};
struct rdma_addrinfo* res = NULL;
struct rdma_addrinfo hints;
int ret;

std::cout << "启动RDMA服务器..." << std::endl;

// 创建事件通道
ctx.ec = rdma_create_event_channel();
if (!ctx.ec) {
std::cerr << "创建事件通道失败" << std::endl;
return;
}

// 创建CM ID
ret = rdma_create_id(ctx.ec, &ctx.id, NULL, RDMA_PS_TCP);
if (ret) {
std::cerr << "创建CM ID失败" << std::endl;
goto cleanup;
}

// 解析地址
memset(&hints, 0, sizeof(hints));
hints.ai_port_space = RDMA_PS_TCP;
ret = rdma_getaddrinfo(NULL, PORT, &hints, &res);
if (ret) {
std::cerr << "获取地址信息失败" << std::endl;
goto cleanup;
}

// 绑定地址并监听
ret = rdma_bind_addr(ctx.id, res->ai_src_addr);
if (ret) {
std::cerr << "绑定地址失败" << std::endl;
goto cleanup;
}

ret = rdma_listen(ctx.id, 1);
if (ret) {
std::cerr << "监听失败" << std::endl;
goto cleanup;
}

std::cout << "服务器监听在端口 " << PORT << std::endl;

// 等待连接
struct rdma_cm_event* event;
ret = rdma_get_cm_event(ctx.ec, &event);
if (ret) {
std::cerr << "获取CM事件失败" << std::endl;
goto cleanup;
}

if (event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
std::cerr << "意外事件: " << event->event << std::endl;
rdma_ack_cm_event(event);
goto cleanup;
}

// 接受连接请求
ctx.id = event->id;
rdma_ack_cm_event(event);

// 建立QP等资源
ctx.ctx = ctx.id->verbs;
ctx.pd = ibv_alloc_pd(ctx.ctx);
ctx.cq = ibv_create_cq(ctx.ctx, 10, NULL, NULL, 0);

struct ibv_qp_init_attr qp_attr;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.cap.max_send_wr = 10;
qp_attr.cap.max_recv_wr = 10;
qp_attr.cap.max_send_sge = 1;
qp_attr.cap.max_recv_sge = 1;
qp_attr.send_cq = ctx.cq;
qp_attr.recv_cq = ctx.cq;
qp_attr.qp_type = IBV_QPT_RC;

ret = rdma_create_qp(ctx.id, ctx.pd, &qp_attr);
if (ret) {
std::cerr << "创建QP失败" << std::endl;
goto cleanup;
}
ctx.qp = ctx.id->qp;

// 注册内存区域
ctx.buffer = new char[BUFFER_SIZE];
ctx.mr = ibv_reg_mr(ctx.pd, ctx.buffer, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);

// 准备连接信息
struct rdma_conn_param cm_params;
memset(&cm_params, 0, sizeof(cm_params));
cm_params.responder_resources = 1;
cm_params.initiator_depth = 1;
cm_params.rnr_retry_count = 7;

// 接受连接
ret = rdma_accept(ctx.id, &cm_params);
if (ret) {
std::cerr << "接受连接失败" << std::endl;
goto cleanup;
}

// 等待连接建立
ret = rdma_get_cm_event(ctx.ec, &event);
if (ret) {
std::cerr << "获取CM事件失败" << std::endl;
goto cleanup;
}

if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
std::cerr << "连接建立失败,事件: " << event->event << std::endl;
rdma_ack_cm_event(event);
goto cleanup;
}
rdma_ack_cm_event(event);

std::cout << "RDMA连接已建立" << std::endl;

// 等待客户端写入数据
std::cout << "等待客户端写入数据..." << std::endl;
sleep(5);

std::cout << "服务器接收到的数据: " << ctx.buffer << std::endl;

cleanup:
if (res) rdma_freeaddrinfo(res);
if (ctx.mr) ibv_dereg_mr(ctx.mr);
if (ctx.buffer) delete[] ctx.buffer;
if (ctx.qp) rdma_destroy_qp(ctx.id);
if (ctx.cq) ibv_destroy_cq(ctx.cq);
if (ctx.pd) ibv_dealloc_pd(ctx.pd);
if (ctx.id) rdma_destroy_id(ctx.id);
if (ctx.ec) rdma_destroy_event_channel(ctx.ec);
}

// 客户端代码
void run_client() {
RDMAContext ctx = {0};
struct rdma_addrinfo* res = NULL;
struct rdma_addrinfo hints;
int ret;

std::cout << "启动RDMA客户端..." << std::endl;

// 创建事件通道
ctx.ec = rdma_create_event_channel();
if (!ctx.ec) {
std::cerr << "创建事件通道失败" << std::endl;
return;
}

// 创建CM ID
ret = rdma_create_id(ctx.ec, &ctx.id, NULL, RDMA_PS_TCP);
if (ret) {
std::cerr << "创建CM ID失败" << std::endl;
goto cleanup;
}

// 解析服务器地址
memset(&hints, 0, sizeof(hints));
hints.ai_port_space = RDMA_PS_TCP;
ret = rdma_getaddrinfo("localhost", PORT, &hints, &res);
if (ret) {
std::cerr << "获取地址信息失败" << std::endl;
goto cleanup;
}

// 建立QP等资源
ctx.ctx = ctx.id->verbs;
ctx.pd = ibv_alloc_pd(ctx.ctx);
ctx.cq = ibv_create_cq(ctx.ctx, 10, NULL, NULL, 0);

struct ibv_qp_init_attr qp_attr;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.cap.max_send_wr = 10;
qp_attr.cap.max_recv_wr = 10;
qp_attr.cap.max_send_sge = 1;
qp_attr.cap.max_recv_sge = 1;
qp_attr.send_cq = ctx.cq;
qp_attr.recv_cq = ctx.cq;
qp_attr.qp_type = IBV_QPT_RC;

ret = rdma_create_qp(ctx.id, ctx.pd, &qp_attr);
if (ret) {
std::cerr << "创建QP失败" << std::endl;
goto cleanup;
}
ctx.qp = ctx.id->qp;

// 解析路由
ret = rdma_resolve_addr(ctx.id, NULL, res->ai_dst_addr, 2000);
if (ret) {
std::cerr << "解析地址失败" << std::endl;
goto cleanup;
}

// 等待地址解析完成
struct rdma_cm_event* event;
ret = rdma_get_cm_event(ctx.ec, &event);
if (ret) {
std::cerr << "获取CM事件失败" << std::endl;
goto cleanup;
}

if (event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
std::cerr << "地址解析失败,事件: " << event->event << std::endl;
rdma_ack_cm_event(event);
goto cleanup;
}
rdma_ack_cm_event(event);

// 解析路由
ret = rdma_resolve_route(ctx.id, 2000);
if (ret) {
std::cerr << "解析路由失败" << std::endl;
goto cleanup;
}

ret = rdma_get_cm_event(ctx.ec, &event);
if (ret) {
std::cerr << "获取CM事件失败" << std::endl;
goto cleanup;
}

if (event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
std::cerr << "路由解析失败,事件: " << event->event << std::endl;
rdma_ack_cm_event(event);
goto cleanup;
}
rdma_ack_cm_event(event);

// 建立QP状态
ret = setup_qp(&ctx);
if (ret) {
std::cerr << "设置QP失败" << std::endl;
goto cleanup;
}

// 准备连接信息
struct rdma_conn_param cm_params;
memset(&cm_params, 0, sizeof(cm_params));
cm_params.responder_resources = 1;
cm_params.initiator_depth = 1;
cm_params.rnr_retry_count = 7;

// 连接服务器
ret = rdma_connect(ctx.id, &cm_params);
if (ret) {
std::cerr << "连接服务器失败" << std::endl;
goto cleanup;
}

// 等待连接建立
ret = rdma_get_cm_event(ctx.ec, &event);
if (ret) {
std::cerr << "获取CM事件失败" << std::endl;
goto cleanup;
}

if (event->event != RDMA_CM_EVENT_ESTABLISHED) {
std::cerr << "连接建立失败,事件: " << event->event << std::endl;
rdma_ack_cm_event(event);
goto cleanup;
}
rdma_ack_cm_event(event);

std::cout << "RDMA连接已建立" << std::endl;

// 在实际系统中,这里需要通过其他方式交换内存信息
// 为演示目的,我们假设已经知道远程内存信息
ctx.remote_addr = 0; // 实际应从服务器获取
ctx.remote_rkey = 0; // 实际应从服务器获取

// 准备要写入的数据
char* local_buffer = new char[BUFFER_SIZE];
const char* message = "Hello from RDMA Write!";
strcpy(local_buffer, message);

// 注册本地内存(用于RDMA操作)
struct ibv_mr* local_mr = ibv_reg_mr(ctx.pd, local_buffer, BUFFER_SIZE,
IBV_ACCESS_LOCAL_WRITE);

std::cout << "执行RDMA Write操作..." << std::endl;

// 执行RDMA Write操作
struct ibv_send_wr wr, *bad_wr;
struct ibv_sge sge;

memset(&wr, 0, sizeof(wr));
wr.wr_id = 1;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE;
wr.send_flags = IBV_SEND_SIGNALED;

// 设置远程内存信息(在实际系统中从服务器获取)
wr.wr.rdma.remote_addr = ctx.remote_addr;
wr.wr.rdma.rkey = ctx.remote_rkey;

sge.addr = (uintptr_t)local_buffer;
sge.length = strlen(message) + 1;
sge.lkey = local_mr->lkey;

ret = ibv_post_send(ctx.qp, &wr, &bad_wr);
if (ret) {
std::cerr << "提交RDMA Write失败" << std::endl;
goto cleanup;
}

// 等待操作完成
if (poll_completion(&ctx) == 0) {
std::cout << "RDMA Write操作完成" << std::endl;
}

// 清理本地内存
if (local_mr) ibv_dereg_mr(local_mr);
delete[] local_buffer;

cleanup:
if (res) rdma_freeaddrinfo(res);
if (ctx.qp) rdma_destroy_qp(ctx.id);
if (ctx.cq) ibv_destroy_cq(ctx.cq);
if (ctx.pd) ibv_dealloc_pd(ctx.pd);
if (ctx.id) rdma_destroy_id(ctx.id);
if (ctx.ec) rdma_destroy_event_channel(ctx.ec);
}

int main(int argc, char* argv[]) {
if (argc != 2) {
std::cout << "用法: " << argv[0] << " [server|client]" << std::endl;
return 1;
}

if (strcmp(argv[1], "server") == 0) {
run_server();
} else if (strcmp(argv[1], "client") == 0) {
run_client();
} else {
std::cout << "无效参数。用法: " << argv[0] << " [server|client]" << std::endl;
return 1;
}

return 0;
}

🔍 RDMA单边操作关键特点

📋 RDMA单边操作类型

  1. RDMA Write - 将数据从本地内存写入远程内存
  2. RDMA Read - 从远程内存读取数据到本地内存
  3. 原子操作 - 在远程内存上执行原子操作(比较交换、获取添加等)

🎯 单边操作的优势

  • 零CPU参与:远程端CPU完全不需要知道或处理操作
  • 低延迟:绕过远程端操作系统内核
  • 高吞吐量:直接内存访问,无数据拷贝

🔄 内存信息交换

单边操作的关键挑战是如何安全地交换内存信息:

1
2
3
4
5
6
7
8
9
10
11
// 内存信息交换结构
struct MemoryMetadata {
uint64_t address; // 内存虚拟地址
uint32_t rkey; // 远程键
uint32_t length; // 内存区域长度
};

// 交换方式:
// 1. 使用传统的SEND/RECV操作
// 2. 使用带内数据(inline data)
// 3. 使用外部协调机制(如共享文件、数据库等)

🛠 编译与运行

编译此代码需要RDMA开发库:

1
2
3
4
5
6
7
8
# Ubuntu/Debian
sudo apt-get install librdmacm-dev libibverbs-dev

# CentOS/RHEL
sudo yum install rdma-core-devel

# 编译
g++ -std=c++11 -o rdma_one_sided rdma_one_sided.cpp -lrdmacm -libverbs

运行示例:

1
2
3
4
5
# 终端1 - 启动服务器
./rdma_one_sided server

# 终端2 - 启动客户端
./rdma_one_sided client

💡 实际应用考虑

在实际RDMA单边通信应用中,需要注意:

  1. 内存注册:确保内存是固定的且已正确注册
  2. 内存保护:使用适当的访问权限标志
  3. 错误处理:实现完善的错误检测和恢复机制
  4. 并发控制:处理多个并发的单边操作
  5. 内存信息交换:安全地交换内存地址和rkey

参考

(22 封私信 / 80 条消息) 使用 C++ 20 协程封装 RDMA 操作 - 知乎

深入解析RDMA:从原理到C++实践_rdma c++-CSDN博客

使用Soft-RoCE实践RDMA_softroce-CSDN博客

RDMA-Programming/docs/编程流程.md at master · zzh-wisdom/RDMA-Programming

RDMA:Soft-RoCE 环境搭建实验

(22 封私信 / 80 条消息) 15. RDMA之RoCE & Soft-RoCE - 知乎

01/service.c · wq/RDMA-EXAMPLE - 码云 - 开源中国

RDMA+UCX_openucx-CSDN博客

async-ucx/.github/workflows/main.yml at main · madsys-dev/async-ucx

丐版RDMA环境搭建(SoftRoCE) | 魔趣典藏阁

No rdma_rxe · Issue #57 · SoftRoCE/rxe-dev

cargo build - Cargo 手册 中文版

openucx/ucx: Unified Communication X (mailing list - https://elist.ornl.gov/mailman/listinfo/ucx-group)

async-ucx/examples/rma.rs at main · madsys-dev/async-ucx

(25 封私信 / 80 条消息) async Rust 封装 UCX 通信库 - 知乎

  • Title: RDMA learning
  • Author: Ethereal
  • Created at: 2025-08-06 13:06:09
  • Updated at: 2025-11-16 20:24:01
  • Link: https://ethereal-o.github.io/2025/08/06/RDMA-learning/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments