应用层实现RCU机制

本文最后更新于:2026年5月1日 晚上

内核我们知道有个RCU机制,读多写少的场景,性能很出色,内核协议栈中经常能见到它的身影。应用层能不能实现一个相同的机制呢?
答案是可以的,而且代码很简短,对于我们了解它的原理很有用。

简易实现源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <unistd.h>

#define MAX_THREADS 128

struct rcu_reader {
atomic_int active;
};

static struct rcu_reader readers[MAX_THREADS];
static atomic_int reader_count = 0;
static __thread int rcu_id = -1;

static void rcu_register_thread(void)
{
if (rcu_id >= 0)
return;

int id = atomic_fetch_add(&reader_count, 1);
if (id >= MAX_THREADS) {
fprintf(stderr, "too many threads\n");
abort();
}

rcu_id = id;
atomic_store(&readers[id].active, 0);
}

static void rcu_read_lock(void)
{
rcu_register_thread();

// 标记自己正在 RCU 读临界区
atomic_store_explicit(&readers[rcu_id].active, 1, memory_order_release);
}

static void rcu_read_unlock(void)
{
// 标记自己已经离开 RCU 读临界区
atomic_store_explicit(&readers[rcu_id].active, 0, memory_order_release);
}

static void synchronize_rcu(void)
{
int n = atomic_load_explicit(&reader_count, memory_order_acquire);

for (;;) {
int all_quiet = 1;

for (int i = 0; i < n; i++) {
if (atomic_load_explicit(&readers[i].active, memory_order_acquire)) {
all_quiet = 0;
break;
}
}

if (all_quiet)
break;

sched_yield();
}
}

/* ================= 测试demo ================= */

struct config {
int version;
char name[64];
};

static _Atomic(struct config *) g_cfg;

static void *reader_thread(void *arg)
{
long id = (long)arg;
rcu_register_thread();

while (1) {
rcu_read_lock();

struct config *cfg = atomic_load_explicit(&g_cfg, memory_order_acquire);

if (cfg) {
printf("reader %ld: version=%d, name=%s\n", id, cfg->version, cfg->name);
}

rcu_read_unlock();
usleep(300*1000);
}

return NULL;
}

static void *writer_thread(void *arg)
{
(void)arg;

for (int i = 1; ; i++) {
struct config *new_cfg = malloc(sizeof(*new_cfg));
new_cfg->version = i;
snprintf(new_cfg->name, sizeof(new_cfg->name), "config-%d", i);

struct config *old_cfg = atomic_exchange_explicit(&g_cfg, new_cfg, memory_order_acq_rel);

printf("writer: updated config to version=%d, name=%s\n", new_cfg->version, new_cfg->name);

// 打印耗时
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);

// 等待所有可能正在使用 old_cfg 的读者退出
synchronize_rcu();

clock_gettime(CLOCK_MONOTONIC, &end);
long elapsed_us = (end.tv_sec - start.tv_sec) * 1000*1000 + (end.tv_nsec - start.tv_nsec) / 1000;
printf("writer: synchronize_rcu took %ld us\n", elapsed_us);

free(old_cfg);
sleep(2);
}

return NULL;
}

int main(void)
{
pthread_t r1, r2, r3, w;

struct config *init = malloc(sizeof(*init));
init->version = 0;
snprintf(init->name, sizeof(init->name), "init");

atomic_store(&g_cfg, init);

pthread_create(&r1, NULL, reader_thread, (void *)1);
pthread_create(&r2, NULL, reader_thread, (void *)2);
pthread_create(&r3, NULL, reader_thread, (void *)3);
pthread_create(&w, NULL, writer_thread, NULL);

pthread_join(r1, NULL);
pthread_join(r2, NULL);
pthread_join(r3, NULL);
pthread_join(w, NULL);

return 0;
}

然后编译,运行,可以看到3个读者一直打印数据内容,一个写者定期更新数据后,读者也能正常读到更新后的数据。更新的延迟都是0us。

虽然核心实现很短,但要理解它们,还是需要一翻功夫。
下面对一些不熟的语法做一翻解释:

原子操作

c11就在标准库引入了原子操作

1
2
3
4
5
6
7
8
9
static atomic_int reader_count = 0;  //定义一个int原子变量
int id = atomic_fetch_add(&reader_count, 1); //原子变量+1后结果
atomic_store(&readers[id].active, 0); //给原子变量赋值0

atomic_store_explicit(&readers[rcu_id].active, 1, memory_order_release); //给原子变量赋值1
int n = atomic_load_explicit(&reader_count, memory_order_acquire); // 获取原子变量的值
// 后面的memory_order_acquire表示内存循序,另外的文章来展开讲

static _Atomic(struct config *) g_cfg; // 一个指针类型的原子变量

为何共享的数据指针要设置为_Atomic类型
(1)避免编译器优化到寄存器,每次都从内存读
(2)设置为原子类型,后面atomic_exchange_explicit才能设置内存顺序,避乱乱序执行
(3)避免撕裂读,指针赋值不一定是原子的,32位系统64位指针就会被拆分为2条语句。

线程变量

1
static __thread int rcu_id = -1;

语义为该变量每个线程都会有一个独立的,互不影响。

sched_yield

主动让出CPU

测试更新延迟

注释掉reader的打印和usleep,观察更新操作的延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
writer: updated config to version=1, name=config-1
writer: synchronize_rcu took 614 us
writer: updated config to version=2, name=config-2
writer: synchronize_rcu took 48 us
writer: updated config to version=3, name=config-3
writer: synchronize_rcu took 183 us
writer: updated config to version=4, name=config-4
writer: synchronize_rcu took 2 us
writer: updated config to version=5, name=config-5
writer: synchronize_rcu took 15 us
writer: updated config to version=6, name=config-6
writer: synchronize_rcu took 37 us
writer: updated config to version=7, name=config-7
writer: synchronize_rcu took 374 us

更新的延迟最大有几百微秒。

而如果我们把读者的打印打开,就会发现写着就无法更新了,一直等在synchronize_rcu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void *reader_thread(void *arg)
{
long id = (long)arg;
rcu_register_thread();

while (1) {
rcu_read_lock();

struct config *cfg = atomic_load_explicit(&g_cfg, memory_order_acquire);

if (cfg) {
printf("reader %ld: version=%d, name=%s\n", id, cfg->version, cfg->name);
}

rcu_read_unlock();
// usleep(300*1000);
}

return NULL;
}

这是为何?

因为注释掉printf时,reader会因为时间片用完发生调度,此时reader线程可能退出了临界区,也可能没有退出临界区。这时writer被调度唤醒,有一定概率线程都退出临界区,能够完成更新。我们在synchronize_rcu中添加sched_yield次数统计,发现等待数百微秒的更新都是轮了上千次sched_yield。即writer可能被调度了上千次才发现一次何时的机会。
而添加printf呢,printf属于慢语句,执行周期很长,导致writer被唤醒后,更大概率reader在printf中,此时还持有rcu锁,所以长时间无法更新。
根据我的测试,去掉printf,中间加一个for循环100次,writer更新就很慢了。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void *reader_thread(void *arg)
{
long id = (long)arg;
rcu_register_thread();

while (1) {
rcu_read_lock();

struct config *cfg = atomic_load_explicit(&g_cfg, memory_order_acquire);

for(int i = 0; i < 100; i++); // 模拟读操作耗时

rcu_read_unlock();
// usleep(300*1000);
}
return NULL;
}

打印如下,sched_yield等了十几万次所有线程才退出临界区,更新耗时长达数秒。

1
2
3
4
5
6
7
8
9
writer: updated config to version=1, name=config-1
synchronize_rcu: waited 7399576 iterations
writer: synchronize_rcu took 4198469 us
writer: updated config to version=2, name=config-2
synchronize_rcu: waited 2855759 iterations
writer: synchronize_rcu took 1617254 us
writer: updated config to version=3, name=config-3
synchronize_rcu: waited 5264813 iterations
writer: synchronize_rcu took 2996109 us

对比mutex性能

因为RCU适用于读多写少,所以我们在每次writer更新后,都延迟10ms,模拟每秒100次写入进行测试。
测试时间每轮持续5s。
测试结果如下:

readers mutex read_qps rcu read_qps mutex write_qps rcu write_qps
1 187,598,247 183,430,272 99.6 99.4
2 53,570,533 80,424,114 99.6 98.4
4 51,458,082 67,714,799 99.6 33.2
8 39,332,508 64,436,950 99.6 4.0
16 30,526,263 69,822,479 99.4 12.8
32 30,699,191 93,329,954 99.2 0.4

即便是上面简陋的RCU,多线程的情况下依然是完爆互斥锁。但是线程多了之后,rcu的写几乎要被饿死了。
要想写入不被饿死,那么就不能使用上面的简易机制,要在writer后就阻止新reader进入,writer等到所有进行中的reader退出即可完成更新。


人生苦短,远离bug Leon, 2026-05-01

应用层实现RCU机制
https://leon0625.github.io/2026/05/01/80e1deb29df3/
作者
leon.liu
发布于
2026年5月1日
许可协议