线程
记录自己所学的所有有关于线程的知识。
基本知识
线程跟语言关系不大,它与操作系统的实现是密切结合在一起的,操作系统提供的线程机制决定了线程的使用。linux
中线程的实现是内核线程与用户线程按照1:1
实现的,所以每增加一条用户线程,其实是映射到一条内核线程上。线程实体在内核处,而唯一标识它的就是thread_t
类型的变量(其实就是线程tid
)。
多线程最大优势在于:可以通过全局变量来共享信息,最大的风险也在于多个线程对共享的全局变量同时访问不可控
struct pthread_t; // 线程ID
struct pthread_attr_t; // 线程属性
int pthread_create(pthread_t *tid, pthread_attr_t *attr, void *(*start)(void *), void *arg);
pthread_t pthread_self(void); // 获取自身线程ID
void pthread_exit( void *retval ); // 线程退出
int pthreat_equal(pthread_t tid_1, pthread_t tid_2); // 两个线程相等? 是 则返回 0
int pthread_join(pthread_t tid, void **retval);// 阻塞,直到获得某线程的退出状态
int pthread_cancel(pthread_t tid); // 取消同一进程中的其他线程
int pthread_detach(pthread_t tid); // 线程返回后,自己自动清理,不返回数据给join
互斥量
互斥量mutex
用于实现线程对共享资源的独占式使用。使用共享资源的代码片段称为“代码临界区”,区内代码片段的执行必须是原子的,不能被其他线程中断。
mutex
有机器语言级别的实现,它有两种状态:LOCKED
和 UNLOCK
。线程实现约定,任何时候只能有一个线程可以锁定同一个mutex
变量,其他无法锁定mutex
的线程将会阻塞在pthread_mutex_lock
调用处。所以通过对mutex
实体加锁、然后执行“临界区代码”,然后解锁,就可以实现对共享资源的保护。
struct pthread_mutex_t; // mutex
struct pthread_mutexattr_t; // mutex 属性
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // 声明一个 mutex 编译器完成
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr); // 声明一个 mutex 运行期完成
int pthread_mutex_lock(pthread_mutex_t *mutex); // 锁定一个 mutex
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 解锁一个 mutex
int pthread_mutex_destory(pthread_mutex_t *mutex); // 销毁一个 mutex
long global_cnt = 0; // 全局共享变量,所有线程共享
pthread_mutex_t global_cnt_mutex = PTHREAD_MUTEX_INITIALIZER; // 用于保护全局共享变量
static void *func(void *arg)
{
for(long i = 0, tmp = 0; i < 10000; i++ )
{
pthread_mutex_lock( &global_cnt_mutex ); // 加锁,保护开始,只允许当前线程执行后面代码
tmp = global_cnt; // 读取 全局变量
tmp = tmp + 1;
global_cnt = tmp; // 存储 全局变量
pthread_mutex_unlock( &global_cnt_mutex ); // 解锁,保护结束,其余线程可以加锁后执行上面代码了
}
}
int main(int argc, char *argv[])
{
pthread_t threads[5]; // 预先设计好 5 个线程
pthread_create( &threads[0], NULL, func, "why so serials!" );
pthread_create( &threads[1], NULL, func, "why so serials! 2" );
pthread_join( threads[0], NULL );
pthread_join( threads[1], NULL );
printf("global cnt %ld\n", global_cnt);
return 0;
}
信号量
Semaphore
信号量是一种特殊的变量,它的值为自然数,只支持两种操作:P
(进入临界区)和V
(退出临界区)。对于信号量sem
有:
P(sem)
: 如果sem > 0
,则sem--
; 如果sem = 0
则阻塞V(sem)
: 如果有其他进程因为P(sem)
而挂起,则唤醒它;如果没有,则sem++
mutex
只允许一个线程进入临界区,而信号量允许多个线程同时进入临界区,“同时”就又意味着不可预期。mutex
的作用仅限于,对共享资源的独占式访问。
考虑这样一种情况:多个消费者线程 读取 任务队列 (共享资源),然后处理任务。当所有任务都被执行完毕时,消费者线程会停下么?
答案是:并不会,消费者线程在分配到时间片后,仍然会“加锁,进入临界区,解锁”运行,白白浪费 CPU 资源。对于这种场景,我们希望:在任务队列为空时,所有消费者线程进入阻塞态,当生产者线程产生 1 个或多个任务后,消费者线程被唤醒,处理任务,完毕后又再次沉睡(阻塞)。
信号量 API 的 “阻塞” 和 “唤醒” 机制正好适合实现: 生产者线程 + 消费者线程 这样一种模式。
struct sem_t; // 信号量 类型
sem_init( sem_t *sem, 0, 0 ); // 线程信号量 初始化
sem_wait( sem_t *sem ); // P 操作
sem_post( sem_t *sem ); // V 操作
sem_destory( sem_t *sem ); // 销毁信号量
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#define MAXSIZE (20)
typedef struct task{
short status;
long num;
}task_t;
task_t task_list[MAXSIZE];
pthread_mutex_t task_list_mutex = PTHREAD_MUTEX_INITIALIZER;
sem_t task_list_sem;
void *producer(void *arg)
{
for( int i = 0; i < MAXSIZE; i++ )
{
sleep(1);
pthread_mutex_lock( &task_list_mutex );
task_list[i].status = 1;
task_list[i].num = i;
sem_post( &task_list_sem );
pthread_mutex_unlock( &task_list_mutex );
}
}
void *comsumer(void *arg)
{
while( 1 )
{
sem_wait( &task_list_sem ); // 假如没有这句,那么消费者线程在无任务的时候,也会疯狂空转
// 当 task_list_sem >= 2 多个线程被唤醒,所以还需要 mutex 控制
pthread_mutex_lock( &task_list_mutex );
printf("thread %ld run\n", pthread_self());
for( int i = 0; i < MAXSIZE; i++ )
{
if( task_list[i].status == 1 )
{
int num = task_list[i].num;
printf(" thread process : %d x %d = %d\n", num, num, num * num );
task_list[i].status = 0;
break;
}
}
pthread_mutex_unlock( &task_list_mutex );
}
}
int main(int argc, char *argv[])
{
sem_init( &task_list_sem, 0, 0 );
pthread_t producer_tid, comsumer_tid_1, comsumer_tid_2;
pthread_create( &comsumer_tid_1, NULL, comsumer, NULL ); // 消费者线程 1
pthread_create( &comsumer_tid_2, NULL, comsumer, NULL ); // 消费者线程 2
pthread_create( &producer_tid, NULL, producer, NULL ); // 生产者线程 3
pthread_join( producer_tid, NULL );
pthread_join( comsumer_tid_1, NULL );
pthread_join( comsumer_tid_2, NULL );
sem_destroy( &task_list_sem );
pthread_mutex_destroy( &task_list_mutex );
return 0;
}
条件变量
线程使用条件变量cond
来相互通知:共享资源的状态发生了变化。
pthread_cont_t cond = PTHREAD_COND_INITIALIZER; // 初始化一个条件变量
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *attr); // 动态初始化条件变量
int pthread_cond_signal(pthread_cond_t *cond); // 只通知一条阻塞的线程
int pthread_cond_broadcast(pthread_cond_t *cond); // 确保通知所有阻塞的线程
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); // 阻塞,等待通知
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, struct timespec *abstime);
条件变量的“阻塞”和“主动唤醒”机制,也可以实现:消费者线程 + 生产者线程 这一模式。
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#define MAXSIZE 20
typedef struct task{
short status;
long num;
}task_t;
task_t task_list[MAXSIZE];
int task_cnt = 0;
pthread_mutex_t task_list_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t task_list_cond = PTHREAD_COND_INITIALIZER;
void *producer(void *arg)
{
for( int i = 0; i < MAXSIZE; i++ ) // 模拟每秒产生一个任务
{
sleep(1);
pthread_mutex_lock( &task_list_mutex );
printf("producer thread %ld run\n", pthread_self());
task_list[i].status = 1;
task_list[i].num = i;
task_cnt ++;
pthread_mutex_unlock( &task_list_mutex );
pthread_cond_signal( &task_list_cond );
}
}
void *comsumer(void *arg)
{
while( 1 )
{
pthread_mutex_lock( &task_list_mutex );
// 注意: cond_wait 调用要处于 mutex_lock 调用后面
while( task_cnt == 0 ) // 使用 while 而不是 if , 防止虚假唤醒
pthread_cond_wait( &task_list_cond, &task_list_mutex);
printf("comsumer thread %ld run\n", pthread_self());
for( int i = 0; i < MAXSIZE; i++ )
{
if( task_list[i].status == 1 )
{
int num = task_list[i].num;
printf("process : %d x %d = %d\n", num, num, num * num );
task_list[i].status = 0;
task_cnt --;
break;
}
}
pthread_mutex_unlock( &task_list_mutex );
}
}
int main(int argc, char *argv[])
{
pthread_t producer_tid, comsumer_tid_1, comsumer_tid_2;
pthread_create( &comsumer_tid_1, NULL, comsumer, NULL ); // 消费者线程 1
pthread_create( &comsumer_tid_2, NULL, comsumer, NULL ); // 消费者线程 2
pthread_create( &producer_tid, NULL, producer, NULL ); // 生产者线程 3
pthread_join( producer_tid, NULL );
pthread_join( comsumer_tid_1, NULL );
pthread_join( comsumer_tid_2, NULL );
pthread_cond_destroy( &task_list_cond );
pthread_mutex_destroy( &task_list_mutex );
return 0;
}
虚假唤醒
pthread_mutex_lock( &task_list_mutex );
// 注意: cond_wait 调用要处于 mutex_lock 调用后面
while( task_cnt == 0 ) // 使用 while 而不是 if , 防止虚假唤醒
pthread_cond_wait( &task_list_cond, &task_list_mutex);
上面三句代码中,pthread_cond_wait
调用的效果时,先解锁task_list_mutex
,然后根据cond
状态决定是否阻塞,收到其他线程cond_signal
解除阻塞后,立即再次锁上task_list_mutex
。如果此时是if
,那么就继续往下处理任务了。问题就在这里,由于cond_signal
信号是发给 “至少一个” 阻塞线程,所以可能对于当前线程来说,任务已经被别的线程抢先解决了,所以需要使用while
再次判断下任务个数,再决定是否往下处理任务。
mutex、sem 和 cond 对比
mutex
可以使用一个二值sem
(0 , 1)来实现:
sem
使用sem_init
初始化为1
,然后同一个线程内对某个sem
先调用sem_wait
再调用sem_post
,两次调用中间就可以写上临界区代码了。
cond
也可以使用sem
来实现:
例如在消费者线程中调用
sem_wait
, 而在生产者线程中调用sem_post
其他不同:
mutex
必须是同一个线程获取以及释放, 否则会死锁.而条件变量和信号量则不必sem
的递增与减少会被系统自动记住, 系统内部有一个计数器实现,不必担心会丢失, 而唤醒一个cond
时,如果没有相应的线程在等待该条件变量, 这次唤醒将被丢失
只能使用互斥量而不能使用信号量的: