目录

线程

记录自己所学的所有有关于线程的知识。

基本知识

线程跟语言关系不大,它与操作系统的实现是密切结合在一起的,操作系统提供的线程机制决定了线程的使用。linux中线程的实现是内核线程与用户线程按照1:1实现的,所以每增加一条用户线程,其实是映射到一条内核线程上。线程实体在内核处,而唯一标识它的就是thread_t类型的变量(其实就是线程tid)。

多线程最大优势在于:可以通过全局变量来共享信息,最大的风险也在于多个线程对共享的全局变量同时访问不可控

struct pthread_t;       // 线程ID
struct pthread_attr_t;  // 线程属性

int       pthread_create(pthread_t *tid, pthread_attr_t *attr, void *(*start)(void *), void *arg);
pthread_t pthread_self(void);           // 获取自身线程ID
void      pthread_exit( void *retval ); // 线程退出

int pthreat_equal(pthread_t tid_1, pthread_t tid_2); // 两个线程相等? 是 则返回 0
int pthread_join(pthread_t tid, void **retval);// 阻塞,直到获得某线程的退出状态
int pthread_cancel(pthread_t tid); // 取消同一进程中的其他线程

int pthread_detach(pthread_t tid); // 线程返回后,自己自动清理,不返回数据给join

互斥量

互斥量mutex用于实现线程对共享资源的独占式使用。使用共享资源的代码片段称为“代码临界区”,区内代码片段的执行必须是原子的,不能被其他线程中断。

mutex有机器语言级别的实现,它有两种状态:LOCKEDUNLOCK。线程实现约定,任何时候只能有一个线程可以锁定同一个mutex变量,其他无法锁定mutex的线程将会阻塞在pthread_mutex_lock调用处。所以通过对mutex实体加锁、然后执行“临界区代码”,然后解锁,就可以实现对共享资源的保护。

struct pthread_mutex_t;     // mutex
struct pthread_mutexattr_t; // mutex 属性

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;                           // 声明一个 mutex 编译器完成
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr); // 声明一个 mutex 运行期完成
int pthread_mutex_lock(pthread_mutex_t *mutex);                            // 锁定一个 mutex
int pthread_mutex_unlock(pthread_mutex_t *mutex);                          // 解锁一个 mutex
int pthread_mutex_destory(pthread_mutex_t *mutex);                         // 销毁一个 mutex
long            global_cnt          = 0;                          // 全局共享变量,所有线程共享
pthread_mutex_t global_cnt_mutex    = PTHREAD_MUTEX_INITIALIZER;  // 用于保护全局共享变量

static void *func(void *arg)
{
    for(long i = 0, tmp = 0; i < 10000; i++ )
    {
        pthread_mutex_lock( &global_cnt_mutex );    // 加锁,保护开始,只允许当前线程执行后面代码
        tmp = global_cnt; // 读取 全局变量
        tmp = tmp + 1;
        global_cnt = tmp; // 存储 全局变量
        pthread_mutex_unlock( &global_cnt_mutex );  // 解锁,保护结束,其余线程可以加锁后执行上面代码了
    }
}

int main(int argc, char *argv[])
{
    pthread_t threads[5]; // 预先设计好 5 个线程
    pthread_create( &threads[0], NULL, func, "why so serials!" );
    pthread_create( &threads[1], NULL, func, "why so serials! 2" );
    pthread_join( threads[0], NULL );
    pthread_join( threads[1], NULL );
    printf("global cnt %ld\n", global_cnt);
    return 0;
}

信号量

Semaphore信号量是一种特殊的变量,它的值为自然数,只支持两种操作:P(进入临界区)和V(退出临界区)。对于信号量sem有:

  • P(sem) : 如果sem > 0,则sem--; 如果sem = 0 则阻塞

  • V(sem) : 如果有其他进程因为P(sem)而挂起,则唤醒它;如果没有,则sem++

mutex只允许一个线程进入临界区,而信号量允许多个线程同时进入临界区,“同时”就又意味着不可预期。mutex的作用仅限于,对共享资源的独占式访问。

考虑这样一种情况:多个消费者线程 读取 任务队列 (共享资源),然后处理任务。当所有任务都被执行完毕时,消费者线程会停下么?

答案是:并不会,消费者线程在分配到时间片后,仍然会“加锁,进入临界区,解锁”运行,白白浪费 CPU 资源。对于这种场景,我们希望:在任务队列为空时,所有消费者线程进入阻塞态,当生产者线程产生 1 个或多个任务后,消费者线程被唤醒,处理任务,完毕后又再次沉睡(阻塞)。

信号量 API 的 “阻塞” 和 “唤醒” 机制正好适合实现: 生产者线程 + 消费者线程 这样一种模式。

struct sem_t; // 信号量 类型
sem_init( sem_t *sem, 0, 0 );   // 线程信号量 初始化
sem_wait( sem_t *sem );         // P 操作
sem_post( sem_t *sem );         // V 操作
sem_destory( sem_t *sem );      // 销毁信号量
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>

#define MAXSIZE (20)
typedef struct task{
    short status;
    long num;
}task_t;

task_t task_list[MAXSIZE];
pthread_mutex_t task_list_mutex = PTHREAD_MUTEX_INITIALIZER;
sem_t task_list_sem;

void *producer(void *arg)
{
    for( int i = 0; i < MAXSIZE; i++ )
    {
        sleep(1);

        pthread_mutex_lock( &task_list_mutex );
        task_list[i].status = 1;
        task_list[i].num    = i;
        sem_post( &task_list_sem );
        pthread_mutex_unlock( &task_list_mutex );
    }
}

void *comsumer(void *arg)
{
    while( 1 )
    {
        sem_wait( &task_list_sem ); // 假如没有这句,那么消费者线程在无任务的时候,也会疯狂空转

        // 当 task_list_sem >= 2 多个线程被唤醒,所以还需要 mutex 控制
        pthread_mutex_lock( &task_list_mutex );
        printf("thread %ld run\n", pthread_self());
        for( int i = 0; i < MAXSIZE; i++ )
        {
            if( task_list[i].status == 1 )
            {
                int num = task_list[i].num;
                printf(" thread process : %d x %d = %d\n", num, num, num * num );
                task_list[i].status = 0;
                break;
            }
        }
        pthread_mutex_unlock( &task_list_mutex );
    }
}

int main(int argc, char *argv[])
{
    sem_init( &task_list_sem, 0, 0 );

    pthread_t producer_tid, comsumer_tid_1, comsumer_tid_2;
    pthread_create( &comsumer_tid_1, NULL, comsumer, NULL ); // 消费者线程 1
    pthread_create( &comsumer_tid_2, NULL, comsumer, NULL ); // 消费者线程 2

    pthread_create( &producer_tid, NULL, producer, NULL );   // 生产者线程 3

    pthread_join( producer_tid, NULL );
    pthread_join( comsumer_tid_1, NULL );
    pthread_join( comsumer_tid_2, NULL );

    sem_destroy( &task_list_sem );
    pthread_mutex_destroy( &task_list_mutex );

    return 0;
}

条件变量

线程使用条件变量cond来相互通知:共享资源的状态发生了变化。

pthread_cont_t cond = PTHREAD_COND_INITIALIZER;                        // 初始化一个条件变量
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *attr); // 动态初始化条件变量
int pthread_cond_signal(pthread_cond_t *cond);                         // 只通知一条阻塞的线程
int pthread_cond_broadcast(pthread_cond_t *cond);                      // 确保通知所有阻塞的线程
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);   // 阻塞,等待通知
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, struct timespec *abstime);

条件变量的“阻塞”和“主动唤醒”机制,也可以实现:消费者线程 + 生产者线程 这一模式。

#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>

#define MAXSIZE 20

typedef struct task{
    short status;
    long num;
}task_t;

task_t task_list[MAXSIZE];
int task_cnt = 0;
pthread_mutex_t task_list_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  task_list_cond  = PTHREAD_COND_INITIALIZER;

void *producer(void *arg)
{
    for( int i = 0; i < MAXSIZE; i++ )  // 模拟每秒产生一个任务
    {
        sleep(1);

        pthread_mutex_lock( &task_list_mutex );
        printf("producer thread %ld run\n", pthread_self());
        task_list[i].status = 1;
        task_list[i].num    = i;
        task_cnt ++;
        pthread_mutex_unlock( &task_list_mutex );
        pthread_cond_signal( &task_list_cond );
    }
}

void *comsumer(void *arg)
{
    while( 1 )
    {
        pthread_mutex_lock( &task_list_mutex );
        // 注意: cond_wait 调用要处于 mutex_lock 调用后面
        while( task_cnt == 0 )  // 使用 while 而不是 if , 防止虚假唤醒
            pthread_cond_wait( &task_list_cond, &task_list_mutex);

        printf("comsumer thread %ld run\n", pthread_self());
        for( int i = 0; i < MAXSIZE; i++ )
        {
            if( task_list[i].status == 1 )
            {
                int num = task_list[i].num;
                printf("process : %d x %d = %d\n", num, num, num * num );
                task_list[i].status = 0;
                task_cnt --;
                break;
            }
        }
        pthread_mutex_unlock( &task_list_mutex );
    }
}

int main(int argc, char *argv[])
{
    pthread_t producer_tid, comsumer_tid_1, comsumer_tid_2;
    pthread_create( &comsumer_tid_1, NULL, comsumer, NULL ); // 消费者线程 1
    pthread_create( &comsumer_tid_2, NULL, comsumer, NULL ); // 消费者线程 2

    pthread_create( &producer_tid, NULL, producer, NULL );   // 生产者线程 3

    pthread_join( producer_tid, NULL );
    pthread_join( comsumer_tid_1, NULL );
    pthread_join( comsumer_tid_2, NULL );

    pthread_cond_destroy( &task_list_cond );
    pthread_mutex_destroy( &task_list_mutex );

    return 0;
}

虚假唤醒

pthread_mutex_lock( &task_list_mutex );
// 注意: cond_wait 调用要处于 mutex_lock 调用后面
while( task_cnt == 0 )  // 使用 while 而不是 if , 防止虚假唤醒
pthread_cond_wait( &task_list_cond, &task_list_mutex);

上面三句代码中,pthread_cond_wait调用的效果时,先解锁task_list_mutex,然后根据cond状态决定是否阻塞,收到其他线程cond_signal解除阻塞后,立即再次锁上task_list_mutex。如果此时是if,那么就继续往下处理任务了。问题就在这里,由于cond_signal信号是发给 “至少一个” 阻塞线程,所以可能对于当前线程来说,任务已经被别的线程抢先解决了,所以需要使用while再次判断下任务个数,再决定是否往下处理任务。

mutex、sem 和 cond 对比

mutex可以使用一个二值sem(0 , 1)来实现:

  • sem使用sem_init初始化为1,然后同一个线程内对某个sem先调用sem_wait再调用sem_post,两次调用中间就可以写上临界区代码了。

cond也可以使用sem来实现:

  • 例如在消费者线程中调用sem_wait, 而在生产者线程中调用sem_post

其他不同:

  • mutex必须是同一个线程获取以及释放, 否则会死锁.而条件变量和信号量则不必

  • sem的递增与减少会被系统自动记住, 系统内部有一个计数器实现,不必担心会丢失, 而唤醒一个cond时,如果没有相应的线程在等待该条件变量, 这次唤醒将被丢失

只能使用互斥量而不能使用信号量的: