LunarRoom

Linux多线程

字数统计: 3.1k阅读时长: 13 min
2020/05/21 Share

Linux多线程

[TOC]

1. 多线程的优势

在多进程编程中,程序每处理一个任务,都需要创建一个进程进行处理,而每个进程在创建时都需要复制父进程的进程上下文,且有自己独立的地址空间,当只需要并发处理很小的任务时(如并发服务器处理客户端的请求),这种开销是很不划算的,且每个进程之间的变量并不共享,使得进程间的通信也很麻烦。

这时候多线程的优势就体现了出来,线程也有自己的上下文(thread context),但是只是一些如线程ID、栈、栈指针、程序计数器、通用寄存器之类的东西,所以创建线程的开销较小。这些一般都是在线程运行的函数里被指定,所以在每个线程中都是独立的。而所有线程依旧在这个进程内共享一片地址空间,包括它的代码、堆、共享库和打开的文件等。

2. 线程的创建

在下面的线程相关函数中都默认导入了<pthread.h>头文件,且在编译时链接了pthread库文件。

pthread即遵守Posix标准的多线程,它是C语言处理线程的一个标准接口。

Linux中通过pthread_create函数来创建线程,其定义为

int pthread_create(pthread_t* tidp, const pthread_attr_t* attr, (void*)(*start_run)(void*), void* arg);

创建线程函数无非做了两件事(对程序员来说,对操作系统来说做了很多),让操作系统在当前进程创建一个线程,将线程的相关信息写入第一个参数指向的pthread_t类型的线程类型中,这个类型在Linux内部的定义实际是unsigned long int类型,代表线程ID;attr参数用于指定线程的一些属性;后两个参数则是做的另一件事——让创建后的线程去做什么事,start_run对应要做的事的函数指针,arg则是需要传入的参数,注意函数指针的参数和返回值都必须定义为void*类型。

3. 线程的退出

线程的退出中主要有两个函数配套使用,pthread_exitpthread_join

在讲这两个函数前,先讲一下主线程的概念,一般进程创建时默认只有一条线程,这条线程一般叫做主线程,其他线程都是通过主线程创建出来的,然而不同于进程,被创建出来的线程与主线程是平级的。主线程可以等待其它线程的退出,其它线程也可以等待主线程的退出,如果主线程退出后还存在其它线程,则进程依旧不会退出。所以主线程的概念只是一个虚的概念,没有哪一条线程是真正为“主”的。

void pthread_exit(void* retval);

pthread_exit在需要退出的线程中使用,使得线程显式地退出,当然线程也可以通过函数的return关键字隐式地退出。线程退出时允许携带一个返回值,这个值将被调用pthread_join的线程接收到。

在讲pthread_join函数前,需要讲一下Linux的线程状态。Linux线程状态分为joinable状态和unjoinable状态。如果是joinable状态的线程,则线程所占用的资源即使在线程return或pthread_exit时都不会释放,只有在其它线程中调用pthread_join函数才会释放。而unjoinable状态的线程的资源在线程退出时就会自动释放,不需要pthread_join函数进行手动释放。所以pthread_join操作的线程必须是joinable的。

至于线程创建时具体是什么状态可以通过pthread_createattr属性进行指定,默认是joinable的。

更深入的讲,pthread_join执行的其实是线程的合并,将调用函数的线程与指定的线程进行合并,然后调用线程会等待退出线程的退出,退出后进行资源的释放,这一点从函数名的join中或许可以看出来。

int pthread_join(pthread_t* thread, void** retval);

pthread_join在等待线程退出的线程中使用,thread参数代表等待的线程的ID。需要声明的是这个函数执行后是阻塞的,目的是等待thread的退出,如果指定的线程已经退出了的话就会立刻返回。

int pthread_detach(pthread* thread);

该函数不会将调用线程与退出线程合并,也不会阻塞调用线程,只是在等待退出线程退出后将资源回收,效果与将线程设置为unjoinable相同。

4. 线程同步

4.1 互斥锁

在同一个进程中,不同线程的运行先后顺序对程序员来说是不确定的。在操作系统内部,内核在很短的时间内不断地在不同的线程上下文间进行切换。因此,当有多个线程操作一个临界资源时,需要程序员进行线程的同步。

这时就诞生了互斥锁(mutex),互斥锁允许同时有一个或多个线程拥有它(一般都是一个),当多个线程需要同时操作某临界资源时,它们需要先抢占互斥锁,抢到了互斥锁的线程就可以继续运行,没抢到的则被阻塞,直到有线程释放了互斥锁,则继续抢占。所以在线程抢占了互斥锁操作完临界资源后应该尽早释放互斥锁,避免其它线程阻塞过久。

互斥锁实现的方式有多种,程序员可以自己定义一个互斥锁,这里主讲pthread.h中提供的互斥锁接口。

互斥锁的主要操作函数有三个,首先定义一个互斥锁

pthread_mutex_t mutex;

pthread_mutex_init(pthread_mutex_t* mutex);

初始化一个互斥锁

pthread_mutex_lock(pthread_mutex_t* mutex);

抢占互斥锁,如果没有抢到则被阻塞

pthread_mutex_unlock(pthread_mutex_t* mutex);

释放互斥锁

4.2 条件变量

考虑这样一种场景,某一条线程需要临界资源满足某一种条件才对临界资源进行操作,如抢票系统要在票卖完后进行加票。则加票的线程需要一直进行互斥锁的抢占,抢占后只是检查票是否卖完了,但大部分时间是没卖完的,所以多了很多不必要的抢占。而在票真正卖完后,加票线程又可能需要抢占多轮才能抢到互斥锁,导致程序的延误。这些都是只用互斥锁的程序很难解决的,于是Linux提供了条件变量接口。

于是就产生了条件变量(condition variable),系统会将使用条件变量的线程阻塞,直到满足条件时才唤醒,唤醒后互斥锁将被当前线程锁定。

定义条件变量:pthread_cond_t cond;

int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t* cond_attr);

初始化条件变量,cond_attr表示需要赋予条件变量的属性,在Linux man pages中有一段话

The LinuxThreads implementation supports no attributes for conditions, hence the cond_attr parameter is actually ignored.

所以一般置为NULL就好了。

int pthread_cond_signal(pthread_cond_t* cond);

唤醒一个正在阻塞的使用条件变量的线程,如果有多个正在阻塞的线程,依旧只唤醒一条线程,但是无法确定是哪一条。

int pthread_cond_broadcast(pthread_cond_t* cond);

唤醒所有正在阻塞的线程,如果有多个正在阻塞的线程,则这些线程需要再次抢占互斥锁。

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);

阻塞线程,等待条件变量响应后唤醒,唤醒后得到相应的互斥锁mutex(如果有多个阻塞线程需要抢占)。但是线程离开阻塞状态不一定是因为条件变量满足,也有可能遇到了中断信号或出现错误,这一点很重要。

int pthread_cond_timewait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime);

pthread_cond_wait的基础上加入了阻塞时限。

int pthread_cond_destroy(pthread_cond_t* cond);

释放条件变量。

5. 线程池模板

最后提供一个线程池模板程序,我已在一个web server项目中成功使用。

由于线程的创建和销毁都需要消耗时间,在需要持续处理高并发任务时,通常线程在处理一个任务并不会直接退出,而是阻塞或者接着执行下一个任务,这就是线程池。

本线程池模板使用任务队列的架构,即线程池将所有接收到的任务放到一个任务队列中,然后所有线程领取任务队列中的任务进行执行。当然任务的领取需要处理线程的同步问题。

源码:

头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/*
* FILE: threadpool.h
* Copyright (C) Lunar Eclipse
* Copyright (C) Railgun
*/

#ifndef THREADPOOL_H
#define THREADPOOL_H

#ifdef __cplusplus
extern "C" {
#endif

#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdint.h>
#include "debug.h"

#define THREAD_NUM 8

typedef struct task_s {
void (*func)(void*); //task function pointer
void *arg; //function arguments
struct task_s* next; //points to the next task in the task queue
} task_t;

typedef struct {
pthread_mutex_t lock; //mutex
pthread_cond_t cond; //condition variable
pthread_t* threads; //thread_t type array

task_t* head;
int thread_count; //thread number in the threadpool
int queue_size; //task number in the task queue
int shutdown; /*indicate if the threadpool is shutdown. Shutdown fall into two categories[immediate_shutdown, graceful_shutdown], immediate_shutdown means the threadpool has to shutdown no matter if there are tasks or not, graceful_shutdown will wait until all tasks are executed. */
int started; //number of threads started
} threadpool_t;

typedef enum {
tp_invalid = -1,
tp_lock_fail = -2,
tp_already_shutdown = -3,
tp_cond_broadcast = -4,
tp_thread_fail = -5,
} threadpool_error_t;

threadpool_t* threadpool_init(int thread_num);

int threadpool_add(threadpool_t* pool, void (*func)(void*), void* arg);

int threadpool_destory(threadpool_t* pool, int graceful);

#ifdef __cplusplus
}
#endif

#endif

源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
/*
* FILE: threadpool.c
* Copyright (C) Lunar Eclipse
* Copyright (C) Railgun
*/

#include "threadpool.h"

typedef enum {
immediate_shutdown = 1,
graceful_shutdown = 2
} threadpool_st_t;

static int threadpool_free(threadpool_t* pool);
static void* threadpool_worker(void* arg);

threadpool_t* threadpool_init(int thread_num) {
if (thread_num <= 0) {
LOG_ERR("the arg of the threadpool_init must greater than 0");
return NULL;
}

threadpool_t* pool;
if ((pool = (threadpool_t*)malloc(sizeof(threadpool_t))) == NULL) {
goto ERR;
}

pool->thread_count = 0;
pool->queue_size = 0;
pool->shutdown = 0;
pool->started = 0;
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thread_num);
pool->head = (task_t*)malloc(sizeof(task_t)); //dummy head

if ((pool->threads == NULL) || (pool->head == NULL)) {
goto ERR;
}

pool->head->func = NULL;
pool->head->arg = NULL;
pool->head->next = NULL;

if (pthread_mutex_init(&(pool->lock), NULL) != 0) {
goto ERR;
}

if (pthread_cond_init(&(pool->cond), NULL) != 0) {
pthread_mutex_destroy(&(pool->lock));
goto ERR;
}

int i;
for (i = 0; i < thread_num; i++) {
if (pthread_create(&(pool->threads[i]), NULL, threadpool_worker, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
//output the thread id as an 8-bit hexadecimal number
LOG_INFO("thread: %08x started", (uint32_t)pool->threads[i]);

pool->thread_count++;
pool->started++;
}
return pool;

ERR:
if (pool) {
threadpool_free(pool);
}
return NULL;
}

//add a task to the threadpool, not thread
int threadpool_add(threadpool_t* pool, void (*func)(void*), void* arg) {
int res, err = 0;
if (pool == NULL || func == NULL) {
LOG_ERR("pool = NULL or func = NULL");
return -1;
}

if (pthread_mutex_lock(&(pool->lock)) != 0) {
LOG_ERR("pthread_mutex_lock");
return -1;
}

if (pool->shutdown) {
err = tp_already_shutdown;
goto OUT;
}

//TODO: use a memory pool
task_t* task = (task_t*)malloc(sizeof(task_t));
if (task == NULL) {
LOG_ERR("malloc task fail");
goto OUT;
}

//TODO: use a memory pool
task->func = func;
task->arg = arg;
task->next = pool->head->next;
pool->head->next = task;

pool->queue_size++;

res = pthread_cond_signal(&(pool->cond));
CHECK(res == 0, "pthread_cond_signal");

OUT:
if (pthread_mutex_unlock(&pool->lock) != 0) {
LOG_ERR("pthread_mutex_unlock");
return -1;
}

return err;
}

//free all threads in the threadpool
int threadpool_free(threadpool_t* pool) {
if (pool == NULL || pool->started > 0) {
return -1;
}

if (pool->threads) {
free(pool->threads);
}

task_t* node;
while (pool->head->next) {
node = pool->head->next;
pool->head->next = pool->head->next->next;
free(node);
}

return 0;
}

//destroy the threadpool
int threadpool_destroy(threadpool_t* pool, int graceful) {
int err = 0;

if (pool == NULL) {
LOG_ERR("pool is NULL");
return tp_invalid; //tp_invalid is in enum
}

if (pthread_mutex_lock(&(pool->lock)) != 0) {
return tp_lock_fail;
}

do {
if (pool->shutdown) {
err = tp_already_shutdown;
break;
}

pool->shutdown = (graceful) ? graceful_shutdown : immediate_shutdown;

if (pthread_cond_broadcast(&(pool->cond)) != 0) {
err = tp_cond_broadcast;
break;
}

if (pthread_mutex_unlock(&(pool->lock)) != 0) {
err = tp_lock_fail;
break;
}

int i;
for (i = 0; i < pool->thread_count; i++) {
if (pthread_join(pool->threads[i], NULL) != 0) {
err = tp_thread_fail;
}
LOG_INFO("thread %08x exit", (uint32_t)pool->threads[i]);
}
} while (0);

if (!err) {
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->cond));
threadpool_free(pool);
}

return err;
}

static void* threadpool_worker(void* arg) {
if (arg == NULL) {
LOG_ERR("arg should be type threadpool_t");
return NULL;
}

threadpool_t* pool = (threadpool_t*)arg;
task_t* task;

while (1) {
pthread_mutex_lock(&(pool->lock));

//wait on condition variable, check for fake wakeups
while ((pool->queue_size == 0) && !(pool->shutdown)) {
pthread_cond_wait(&(pool->cond), &(pool->lock));
}

if (pool->shutdown == immediate_shutdown) {
break;
}
else if ((pool->shutdown == graceful_shutdown) && pool->queue_size == 0) {
break;
}

task = pool->head->next;
if (task == NULL) {
pthread_mutex_unlock(&(pool->lock));
continue;
}

pool->head->next = task->next;
pool->queue_size--;

pthread_mutex_unlock(&(pool->lock));

(*(task->func))(task->arg);
//TODO: memory pool
free(task);
}

pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);

return NULL;
}

参考资料

1. Linux man-pages

2. Linux中pthread_join和pthread_detach详解

3. Debian Linux man-pages

CATALOG
  1. 1. Linux多线程
    1. 1.0.1. 1. 多线程的优势
    2. 1.0.2. 2. 线程的创建
    3. 1.0.3. 3. 线程的退出
    4. 1.0.4. 4. 线程同步
    5. 1.0.5. 5. 线程池模板
    6. 1.0.6. 参考资料