做人呢,最紧要就系开心啦

Linux多线程2:多线程同步

644次阅读
没有评论

1 互斥锁

1.1 pthread mutex

mutex 用 pthread_mutex_t 类型的变量表示,可以这样初始化和销毁:

#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, 
                       const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

int pthread_mutex_destroy(pthread_mutex_t *mutex);

返回值:成功返回 0,失败返回错误号。

pthread_mutex_init 函数对 Mutex 做初始化,参数 attr 设定 Mutex 的属性,如果 attr 为 NULL 则表示缺省属性;

用 pthread_mutex_init 函数初始化的 Mutex 可以用 pthread_mutex_destroy 销毁。

如果 Mutex 变量是静态分配的(全局变量或 static 变量),也可以用宏定义 PTHREAD_MUTEX_INITIALIZER 来初始化,相当于用 pthread_mutex_init 初始化并且 attr 参数为 NULL。

mutex 的加锁和解锁操作可以用下列函数:

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值:成功返回 0,失败返回错误号。

一个线程可以调用 pthread_mutex_lock 获得 Mutex,如果这时另一个线程已经调用 pthread_mutex_lock 获得了该 Mutex,则当前线程需要挂起等待,直到另一个线程调用 pthread_mutex_unlock 释放 Mutex,当前线程被唤醒,才能获得该 Mutex 并继续执行。

如果一个线程既想获得锁,又不想挂起等待,可以调用 pthread_mutex_trylock,如果 Mutex 已经被另一个线程获得,这个函数会失败返回 EBUSY,而不会使线程挂起等待。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NLOOP 5000

int counter; /* incremented by threads */

pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *);

int main(int argc, char **argv)
{ 
        pthread_t tidA, tidB; 
        pthread_create(&tidA, NULL, doit, NULL); 
        pthread_create(&tidB, NULL, doit, NULL);/*wait for both threads to terminate */ 
        pthread_join(tidA, NULL);
        pthread_join(tidB, NULL);
        return 0;
}
void *doit(void *vptr){
        int i, val; /* * Each thread fetches, prints, and increments the counter NLOOP times. * The value of the counter should increase monotonically. */ 
        for (i = 0; i < NLOOP; i++) 
        {pthread_mutex_lock(&counter_mutex); 
                 val = counter; 
                 printf("%x: %d\n", (unsigned int)pthread_self(), val + 1); 
                 counter = val + 1; 
                 pthread_mutex_unlock(&counter_mutex); 
        } 
        return NULL;
}

这样运行结果就正常了,每次运行都能数到 10000。

Mutex 的两个基本操作 lock 和 unlock 是如何实现的呢?

lock 和 unlock 的伪代码如下:

lock: 
if(mutex > 0)
{ 
    mutex = 0; 
    return 0; 
} else 
挂起等待; 
goto lock;

unlock: 
mutex = 1; 
唤醒等待 Mutex 的线程;
    return 0

为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。现在我们把 lock 和 unlock 的伪代码改一下(以 x86 的 xchg 指令为例):

lock: 
movb $0, %al 
xchgb %al, mutex 
if(al 寄存器的内容 > 0)
{return 0;} 
else 挂起等待;
goto lock;

unlock:
    movb $1, mutex 
唤醒等待 Mutex 的线程; 
return 0;

unlock 中的释放锁操作同样只用一条指令实现,以保证它的原子性。

“挂起等待”和“唤醒等待线程”的操作如何实现?

每个 Mutex 有一个等待队列,一个线程要在 Mutex 上挂起等待,首先在把自己加入等待队列中,然后置线程状态为睡眠,然后调用调度器函数切换到别的线程。

一个线程要唤醒等待队列中的其它线程,只需从等待队列中取出一项,把它的状态从睡眠改为就绪,加入就绪队列,那么下次调度器函数执行时就有可能切换到被唤醒的线程。

写程序时应该尽量避免同时获得多个锁,如果一定有必要这么做,则有一个原则:如果所有线程在需要多个锁时都按相同的先后顺序(常见的是按 Mutex 变量的地址顺序)获得锁,则不会出现死锁。

mutex 等待时,涉及线程切换,spin_lock 不会做线程切换,就在原地等;

1.2 用户空间 spin_lock:

#include <pthread.h>
int pthread_spin_lock(pthread_spinlock_t * lock);
int pthread_spin_trylock(pthread_spinlock_t * lock);
int pthread_spin_unlock(pthread_spinlock_t * lock);

使用场景
锁住的区间短;
区间经常发生;
区间可能成为性能瓶颈;
锁住大区间可能导致很高的 CPU 利用率和性能下降;
在较短区间,用 spin_lock 效率更高;

spin_lock 自旋会消耗 CPU;

内核的 spin_lock 会屏蔽线程抢占,其主要作用在多核场景;
用户态的 spin_lock 不会屏蔽抢占;

2 Condition Variable

线程间的同步还有这样一种情况:线程 A 需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程 A 就阻塞等待,而线程 B 在执行过程中使这个条件成立了,就唤醒线程 A 继续执行。
在 pthread 库中通过条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。Condition Variable 用 pthread_cond_t 类型的变量表示,可以这样初始化和销毁:

#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, 
                      const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

返回值:成功返回 0,失败返回错误号。

和 Mutex 的初始化和销毁类似,pthread_cond_init 函数初始化一个 Condition Variable,attr 参数为 NULL 则表示缺省属性;

pthread_cond_destroy 函数销毁一个 Condition Variable。

如果 Condition Variable 是静态分配的,也可以用宏定义 PTHEAD_COND_INITIALIZER 初始化,相当于用 pthread_cond_init 函数初始化并且 attr 参数为 NULL。Condition Variable 的操作可以用下列函数:

#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

int pthread_cond_broadcast(pthread_cond_t *cond);

int pthread_cond_signal(pthread_cond_t *cond);

返回值:成功返回 0,失败返回错误号。

一个 Condition Variable 总是和一个 Mutex 搭配使用的。一个线程可以调用 pthread_cond_wait 在一个 Condition Variable 上阻塞等待,这个函数做以下三步操作:

  1. 释放 Mutex
  2. 阻塞等待
  3. 当被唤醒时,重新获得 Mutex 并返回

pthread_cond_timedwait 函数还有一个额外的参数可以设定等待超时,如果到达了 abstime 所指定的时刻仍然没有别的线程来唤醒当前线程,就返回 ETIMEDOUT。
一个线程可以调用 pthread_cond_signal 唤醒在某个 Condition Variable 上等待的另一个线程,也可以调用 pthread_cond_broadcast 唤醒在这个 Condition Variable 上等待的所有线程。

下面的程序演示了一个生产者 - 消费者的例子,生产者生产一个结构体串在链表的表头上,消费者从表头取走结构体。

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>

struct msg { 
    struct msg *next; 
    int num;
    int flag;
};

struct msg *head;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_consumer= PTHREAD_COND_INITIALIZER;

int get_count(struct msg *head)
{
    int count_data=0;
    for (struct msg *it=head;it!=NULL;){if (head) {
            count_data++;
            if (it->next==NULL) {break;} else {it = it->next;}
        }
    }
    printf("has count:%d\n",count_data);

    return count_data;
}
void *consumer(void *p)
{
    struct msg *mp;
    for (;;) {pthread_mutex_lock(&lock);
        while (get_count(head) == 0) {printf("wait condition...\n");
            printf("---main pid:%d, tid:%lu\n",getpid(), pthread_self());
            pthread_cond_wait(&has_product, &lock);
            printf("get condition...\n");
        }
        struct msg *last_it=NULL;
        for (struct msg *it=head;it!=NULL;){if (it->next==NULL) {
                mp = it;
                printf("main pid:%d, tid:%lu,Consume %d\n",getpid(), pthread_self(),mp->num);
                if (get_count(head) == 1) {head = NULL;}
                free(mp);
                mp = NULL;
                if (last_it) {last_it->next = NULL;}
                break;
            } else {
                last_it = it;
                it = it->next;   
            }
        }

        pthread_mutex_unlock(&lock);

        sleep(1);//(rand() % 2);
    }
}

void *producer(void *p)
{
    struct msg *mp;
    for (;;) {pthread_mutex_lock(&lock);
        for (int k=0; k<10;k++) {mp = malloc(sizeof(struct msg));
            mp->num = rand() % 1000 + 1;
            mp->flag = 0;
            printf("Produce %d\n", mp->num);
            if (head == NULL) {
                head = mp;
                mp->next = NULL;
            } else {for (struct msg *it=head;it!=NULL;){if (it->next==NULL) {
                        it->next = mp;
                        mp->next=NULL;
                        break;
                    } else {it = it->next;}
                }
            }
        }

        pthread_mutex_unlock(&lock);
        //pthread_cond_signal(&has_product);
        pthread_cond_broadcast(&has_product);

        int process_flag=1;
        while(process_flag) {sleep(1);//(rand() % 1);
            //pthread_mutex_lock(&lock);
            if (get_count(head) == 0) {
                process_flag = 0;
                printf("do it ok.\n");
            sleep(1);//(rand() % 1);
            } else {printf("has %d data, will wait...\n",get_count(head));
            }
            //pthread_mutex_unlock(&lock);

        }
    }
}

int main(int argc, char *argv[])
{
    pthread_t pid, cid,did,eid,fid,gid;
    srand(time(NULL));

    pthread_create(&cid, NULL, consumer, "ccc");
    pthread_create(&did, NULL, consumer, "dddd");
    pthread_create(&eid, NULL, consumer, "eeee");
    pthread_create(&fid, NULL, consumer, "ffff");
    pthread_create(&gid, NULL, consumer, "ggg");
    sleep(1);
    pthread_create(&pid, NULL, producer, NULL);
    pthread_join(pid, NULL);
    pthread_join(cid, NULL);
    pthread_join(did, NULL);
    pthread_join(eid, NULL);
    pthread_join(fid, NULL);
    pthread_join(gid, NULL);
    return 0;
}

执行结果如下:

(base) leon@sys:~/multi_thread/$ ./a.out 
has count:0
wait condition...
---main pid:921279, tid:140152548423424
has count:0
wait condition...
---main pid:921279, tid:140152540030720
has count:0
wait condition...
---main pid:921279, tid:140152523245312
has count:0
wait condition...
---main pid:921279, tid:140152389027584
has count:0
wait condition...
---main pid:921279, tid:140152531638016
Produce 135
Produce 383
Produce 524
Produce 53
Produce 115
Produce 625
Produce 959
Produce 783
Produce 649
Produce 129
get condition...
has count:10
main pid:921279, tid:140152523245312,Consume 129
has count:10
get condition...
has count:9
main pid:921279, tid:140152531638016,Consume 649
has count:9
get condition...
has count:8
main pid:921279, tid:140152540030720,Consume 783
has count:8
get condition...
has count:7
main pid:921279, tid:140152548423424,Consume 959
has count:7
get condition...
has count:6
main pid:921279, tid:140152389027584,Consume 625
has count:6
has count:5
has count:5
has 5 data, will wait...
has count:5
main pid:921279, tid:140152523245312,Consume 115
has count:5
has count:4
main pid:921279, tid:140152531638016,Consume 53
has count:4
has count:3
main pid:921279, tid:140152540030720,Consume 524
has count:3
has count:2
main pid:921279, tid:140152548423424,Consume 383
has count:2
has count:1
main pid:921279, tid:140152389027584,Consume 135
has count:1
has count:0
do it ok.

3 信号量 Semaphone

信号量(Semaphore)和 Mutex 类似,表示可用资源的数量,和 Mutex 不同的是这个数量可以大于 1。
本节介绍的是 POSIX semaphore 库函数,详见 sem_overview(7),这种信号量不仅可用于同一进程的线程间同步,也可用于不同进程间的同步。

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);

semaphore 变量的类型为 sem_t,sem_init() 初始化一个 semaphore 变量,value 参数表示可用资源的数量,pshared 参数为 0 表示信号量用于同一进程的线程间同步,这里只介绍这种情况。

在用完 semaphore 变量之后应该调用 sem_destroy() 释放与 semaphore 相关的资源。

调用 sem_wait() 可以获得资源,使 semaphore 的值减 1,如果调用 sem_wait() 时 semaphore 的值已经是 0,则挂起等待。如果不希望挂起等待,可以调用 sem_trywait()。

调用 sem_post() 可以释放资源,使 semaphore 的值加 1,同时唤醒挂起等待的线程。

上一节生产者-消费者的例子是基于链表的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序:

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#define NUM 5
int queue[NUM];
sem_t blank_number, product_number;
void *producer(void *arg) 
{ 
        int p = 0; 
        while (1) {sem_wait(&blank_number); 
                queue[p] = rand() % 1000 + 1; 
                printf("Produce %d\n", queue[p]); 
                sem_post(&product_number); 
                p = (p+1)%NUM; 
                sleep(rand()%5); 
        }
}
void *consumer(void *arg) 
{ 
        int c = 0; 
        while (1) {sem_wait(&product_number); 
                printf("Consume %d\n", queue[c]); queue[c] = 0; 
                sem_post(&blank_number); 
                c = (c+1)%NUM; 
                sleep(rand()%5); 
        }
}
int main(int argc, char *argv[]) 
{ 
        pthread_t pid, cid; 
        sem_init(&blank_number, 0, NUM); 
        sem_init(&product_number, 0, 0); 
        pthread_create(&pid, NULL, producer, NULL); 
        pthread_create(&cid, NULL, consumer, NULL); 
        pthread_join(pid, NULL); 
        pthread_join(cid, NULL); 
        sem_destroy(&blank_number); 
        sem_destroy(&product_number); 
        return 0;
}

4 其它线程间同步机制

读者写者锁(Reader-Writer Lock)的概念,Reader 之间并不互斥,可以同时读共享数据,而 Writer 是独占的(exclusive),在 Writer 修改数据时其它 Reader 或 Writer 不能访问数据,可见 Reader-WriterLock 比 Mutex 具有更好的并发性。

用挂起等待的方式解决访问冲突不见得是最好的办法,因为这样毕竟会影响系统的并发性,在某些情况下解决访问冲突的问题可以尽量避免挂起某个线程,例如 Linux 内核的 Seqlock、RCU(read-copy-update)等机制。

哲学家就餐问题。

这是由计算机科学家 Dijkstra 提出的经典死锁场景。
原版的故事里有五个哲学家 (不过我们写的程序可以有 N 个哲学家),这些哲学家们只做两件事--思考和吃饭,他们思考的时候不需要任何共享资源,但是吃饭的时候就必须使用餐具,而餐桌上的餐具是有限的,原版的故事里,餐具是叉子,吃饭的时候要用两把叉子把面条从碗里捞出来。很显然把叉子换成筷子会更合理,所以:一个哲学家需要两根筷子才能吃饭。
现在引入问题的关键:这些哲学家很穷,只买得起五根筷子。他们坐成一圈,两个人的中间放一根筷子。哲学家吃饭的时候必须同时得到左手边和右手边的筷子。如果他身边的任何一位正在使用筷子,那他只有等着。
假设哲学家的编号是 A、B、C、D、E,筷子编号是 1、2、3、4、5,哲学家和筷子围成一圈如下图所示:
图 35.2. 哲学家问题
每个哲学家都是一个单独的线程,每个线程循环做以下动作:思考 rand()%10 秒,然后先拿左手边的筷子再拿右手边的筷子(筷子这种资源可以用 mutex 表示),有任何一边拿不到就一直等着,全拿到就吃饭 rand()%10 秒,然后放下筷子。
编写程序仿真哲学家就餐的场景:

Philosopher A fetches chopstick 5
Philosopher B fetches chopstick 1
Philosopher B fetches chopstick 2
Philosopher D fetches chopstick 3
Philosopher B releases chopsticks 1 2
Philosopher A fetches chopstick 1
Philosopher C fetches chopstick 2
Philosopher A releases chopsticks 5 1......

分析一下,这个过程有没有可能产生死锁?调用 usleep(3) 函数可以实现微秒级的延时,试着用 usleep(3) 加快仿真的速度,看能不能观察到死锁现象。然后修改上述算法避免产生死锁。

5 如何正确的加锁?

要考虑线程安全性,可重入性
加锁三要素:
(1) 同一把锁;
(2) 语义整体 (数据库中的事务的概念);
(2) 粒度最小; 增加并发性;

对于要加多个锁的情况,必须严格按照同样顺序加锁,这样可以避免死锁问题;

6 race condition 的调试:

自动检测竞态的工具,ThreadSanitizer 和 helgrind

1.ThreadSanitizer 引入编译选项 -fsanitize=thread

gcc simple_race.c -fsanitize=thread -g -pthread
  1. helgrind, old gcc version:
    不依赖 gcc fsanitize
gcc simple_race.c -g -lpthread
valgrind --tool=helgrind ./a.out 

Linux 多线程 2:多线程同步

http://github.com/google/sanitizers/wiki/ThreadSanitizeCppManual
用 sanitizers 内存消耗增长 5 -10 倍,执行时间增加 2 -20 倍;

https://www.valgrind.org/docs/manual/hg-manual.html

7 pthread_mutex 与优先级继承

高优先级线程等低优先级线程释放锁的过程中,中等优先级线程打断低优先级线程;
导致高优先级线程很大延迟;
设置优先级反转属性,低优先级就提搞到高优先级一样的优先级;
Linux 多线程 2:多线程同步

Int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol);
PTHREAD_PRIO_NONE:
PTHREAD_PRIO_INHERIT: 优先级继承

#define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <sched.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <sched.h>

#define USE_PRI_INHERIT /* high-priority task only wait LOW_SPIN time */
#if 1
#undef USE_PRI_INHERIT /* high-priority task wait LOW+MID_SPIN time */
#endif

/*
   A simple demonstration of a 3-thread priority inversion that can
   be improved with priority inheritance.

   Written by Kevin Dankwardt, k@kcomputing.com

 */

sem_t low_go, mid_go, high_go, high_ready, mid_ready;
pthread_mutex_t shared_mutex;
const int minutes = 60;
const int seconds = 1;
pthread_mutexattr_t mutex_attr;

#define LOW_SPIN 2
#define MID_SPIN 5

int gettime ()
{return time(NULL);
}

void spin_for (int n)
{int now = gettime();
        int counter=0;
        while (( gettime() - now) < n)
        { counter++;
                //if ((counter % 1000000) == 0) printf("gettime()-now = %d n=%d\n",gettime()-now,n);
        }
        //printf("done spinning for %d seconds\n",n);
}

void *low (void *n)
{int now = gettime();
        struct sched_param the_priority;

        the_priority.sched_priority  = 1;
        pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
        sem_wait(&low_go);
        pthread_mutex_lock(&shared_mutex);
        sem_wait(&mid_ready);
        sem_wait(&high_ready);
        sem_post(&high_go);
        sem_post(&mid_go);
        spin_for(LOW_SPIN*seconds);
        pthread_mutex_unlock(&shared_mutex);
        printf ("low took %d seconds wanted about %d (critical section + mid time)\n",gettime() - now,LOW_SPIN+MID_SPIN);
        return NULL;
}

void *mid (void *n)
{
        struct sched_param the_priority;
        int now;

        the_priority.sched_priority  = 25;
        pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
        sem_post(&mid_ready);
        sem_wait(&mid_go);
        now = gettime();
        spin_for(MID_SPIN*seconds);
        printf ("mid took %d seconds wanted about %d\n",gettime() - now,MID_SPIN);
        return NULL;
}

void *high (void *n)
{
        int now ;
        struct sched_param the_priority;

        the_priority.sched_priority  = 50;
        pthread_setschedparam(pthread_self(), SCHED_FIFO, &the_priority);
        sem_post(&high_ready);
        sem_wait(&high_go);
        now=gettime();
        pthread_mutex_lock(&shared_mutex);
        pthread_mutex_unlock(&shared_mutex);
        printf ("high took %d seconds wanted about %d (low critical section)\n",gettime() - now,LOW_SPIN);
        return NULL;
}

int main ()
{
        pthread_t tid1, tid2, tid3;
        cpu_set_t thecpus;

        CPU_ZERO(&thecpus);
        CPU_SET(0, &thecpus);
        if (sched_setaffinity(getpid(), sizeof(cpu_set_t), &thecpus)< 0)
                perror("set affinity");

        if (pthread_mutexattr_init(&mutex_attr)) 
        {perror("mutex init"); exit(1);}

#if defined(_POSIX_THREAD_PRIO_INHERIT) && _POSIX_THREAD_PRIO_INHERIT != -1 && defined(USE_PRI_INHERIT)
        printf("Using priority inheritance\n");
        if (pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT))
        {perror("mutex init"); exit(1);}
#else
        printf("Not Using priority inheritance\n");

#endif

        if (pthread_mutex_init(&shared_mutex, &mutex_attr))
        {perror("mutex init"); exit(1);}
        // all initialized to zero. Must wait on a post
        sem_init (&low_go,0,0);
        sem_init (&mid_go,0,0);
        sem_init (&high_go,0,0);
        sem_init (&high_ready,0,0); 
        sem_init (&mid_ready,0,0); 

        pthread_create(&tid1, NULL, low, NULL);
        pthread_create(&tid2, NULL, mid, NULL);
        pthread_create(&tid3, NULL, high, NULL);

        sem_post(&low_go);
        pthread_join(tid1, NULL);
        pthread_join(tid2, NULL);
        pthread_join(tid3, NULL);

        return 0;
}

正文完
 1
admin
版权声明:本站原创文章,由 admin 2022-01-13发表,共计13823字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)