线程概念
在程序设计时把进程设计成某个时刻,每个线程能够处理各自独立的任务。这有很多好处:
- 为每种事件类型分配单独的处理线程,可以简化处理异步事件的代码;
- 多个进程必须使用操作系统提供的复杂机制才能实现内存和文件描述符的共享,而多个线程自动地可以访问相同的存储地址空间和文件描述符;
- 分解问题从而提高整个程序的吞吐量。若是单线程进程要完成多个任务,需要把任务串行化;若是进程控制多个线程,相互独立的任务处理可以交叉进行,只需为每个任务分配一个单独的线程;
- 交叉程序同样可以通过多线程改善响应时间,多线程可以把程序中处理用户输入输出部分和其他部分分开
即使运行在单处理上,程序也可以通过多线程进行简化。而且,即使多线程程序在串行化任务时阻塞,由于某些线程在阻塞时还有其他线程可以运行,所以多线程程序在单处理上运行还是可以改善响应时间和吞吐量的。
每个线程都包含有表示执行环境所必须的信息,其中包括:线程ID、一组寄存器值、栈、调度优先级、策略、信号屏蔽字、errno变量(每个线程拥有属于自己的局部errno,以免一个线程干扰另一个线程)、线程私有数据。一个进程的所有信息对该进程的所有线程共享,包括可执行程序的代码、程序的全局内存、堆内存、栈、文件描述符。
线程标识
每个线程都有各自的线程ID。进程ID是整个系统中唯一的,线程ID是它所属的进程下上文中才有意义。
线程ID是pthread_t数据类型表示,所有可移植操作系统不能把它作为整数处理,pthread_equal函数是用于对两个线程ID进行比较。
1 |
|
Linux3.2.0 使用无符号长整型(unsigned long int)表示pthread_t ;
Solaris 10 使用无符号整型(unsigned int)表示pthread_t;
FreeBSD 8.0和Mac OS X 10.6.8用一个指向pthread结构的指针表示pthread_t。
线程可以通过pthread_self函数获取自身线程ID。
1 |
|
当线程需要识别以线程ID为标识的数据结构时,pthread_self函数和pthread_equal可以一起使用。如:
下图为主线程控制工作队列实例。可以看到,主线程可以将新作业放进工作队列中,另外3个线程组成的线程池从队列中移出作业,当然线程不能任意从队列顶端取出作业,而是由主线程控制作业分配,主线程会在每个待处理作业的结构中标志处理该作业的线程ID,每个工作线程只能移除有自己线程ID的作业。
线程创建
在创建多个控制线程之前,程序的行为与传统进程并无区别。新增线程可以通过调用pthread_create函数创建。
1 |
|
当pthread_create成功返回时,新创建的线程ID设置在tidp指向的内存单元。attr参数用于定制不同的线程属性(为NULL时,创建一个具有默认属性的线程)。
新创建的线程从start_rtn函数的地址开始运行,arg是该函数的参数。如果要向start_rtn函数传递一个以上的参数,则可以将参数放入一个结构中,然后把这个结构的地址作为arg参数传入。
线程创建时不能保证哪个线程会先运行,新创建的线程可以访问进程的地址空间,并且继承调用线程的浮点环境和信号屏蔽字,但是该线程的挂起信号集会被清除。
注意,pthread函数在调用失败会返回错误码,因为每个线程都提供errno副本,这样就可以与使用errno的函数兼容。
注意,在pthread_create函数返回之前新线程就可能开始执行。
>打印线程ID
#include "apue.h"
#include <pthread.h>
pthread_t ntid;
void printids(const char *s)
{
pid_t pid;
pthread_t tid;
pid = getpid();
tid = pthread_self();
printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid, (unsigned long)tid, (unsigned long)tid);
}
void *thr_fn(void *arg)
{
printids("new thread: ");
return ((void *)0);
}
int main(void)
{
int err;
err = pthread_create(&ntid, NULL, thr_fn, NULL);
if (err != 0)
err_exit(err, "can't create thread");
printids("main thread: ");
sleep(1);
exit(0);
}
在Linux下运行结果是:
main thread: pid 3037 tid 140295829989184 (0x7f992b1c2740)
new thread: pid 3037 tid 140295829985024 (0x7f992b1c1700)
可以看到,两个线程的进程ID均为3037,两个线程ID不同。
线程终止
如果进程中的任一线程调用了exit、_Exit、_exit,那么整个进程都会终止。与此类似,如果默认的动作(信号)是终止进程,那么发送到线程的信号会终止整个进程。
单个线程可以在不终止整个进程的情况下,通过3种方式退出,从而停止它的控制流:
- 线程可以简单地从启动例程中返回,返回值是线程的退出码
- 线程可以被同一进程中的其他进程取消
- 线程调用pthread_exit
1 |
|
rval_ptr是一个无类型指针,与传给启动例程的单个参数类似。进程中的其他线程也可以通过调用pthread_join函数访问该指针。
1 |
|
该函数调用线程将一直阻塞,直到指定的线程调用pthread_exit、从启动例程中返回或被取消。如果线程简单地从它的启动例程返回,rval_ptr就包含返回码。如果线程被取消,有rval_ptr指定的内存单元就设置为PTHREAD_CANCELED。
通过调用pthread_join自动把线程置于分离状态,资源就可以恢复。如果线程已经处于分离状态,pthread_join调用就会失败,返回EINVAL。
如果对线程的返回值不感兴趣,那么可以把rval_ptr设置为NULL。在这种情况下,调用pthread_join函数可以等待指定线程的终止,但不获取指定线程的终止状态。
> 获得线程退出状态
#include "apue.h"
#include <pthread.h>
void *
thr_fn1(void *arg)
{
printf("thread 1 returning\n");
return ((void *)1); //退出方式1:直接返回
}
 
void *
thr_fn2(void *arg)
{
printf("thread 2 exiting\n");
pthread_exit((void *)2); //退出方式2:调用pthread_exit()函数
}
 
int
main(void)
{
int err;
pthread_t tid1, tid2;
void *tret;
err = pthread_create(&tid1, NULL, thr_fn1, NULL);
if (err != 0)
err_exit(err, "can't create thread 1");
err = pthread_create(&tid2, NULL, thr_fn2, NULL);
if (err != 0)
err_exit(err, "can't create thread 2");
err = pthread_join(tid1, &tret);
if (err != 0)
err_exit(err, "can't join with thread 1");
printf("thread 1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if (err != 0)
err_exit(err, "can't join with thread 2");
printf("thread 2 exit code %ld\n", (long)tret);
exit(0);
}
Linux中运行结果:
thread 2 exiting
thread 1 returning
thread 1 exit code 1
thread 2 exit code 2
可以看到,当一个线程(此处为主线程)通过调用pthread_exit退出或简单地从启动例程中返回时,进程中的其他线程可以通过调用pthread_join函数获得该线程的退出状态。
内存覆写
如果线程在自己的栈上分配了一个结构,然后把指向这个结构的指针传给pthread_exit,那么调用pthread_join的线程(下面的代码中是主线程)试图使用这个结构,这个栈有可能已经被撤销,这块内存已被另作他用。
> pthread_exit的参数不正确使用
#include "apue.h"
#include <pthread.h>
 
struct foo {
int a, b, c, d;
};
 
void
printfoo(const char *s, const struct foo *fp)
{
printf("%s", s);
printf(" structure at 0x%lx\n", (unsigned long)fp);
printf(" foo.a = %d\n", fp->a);
printf(" foo.b = %d\n", fp->b);
printf(" foo.c = %d\n", fp->c);
printf(" foo.d = %d\n", fp->d);
}
 
void *
thr_fn1(void *arg)
{
struct foo foo = {1, 2, 3, 4};
 
printfoo("thread 1: \n", &foo);
pthread_exit((void *)&foo);
}
 
void *
thr_fn2(void *arg)
{
printf("thread 2: ID is %lu\n", (unsigned long)pthread_self());
pthread_exit((void *)0);
}
 
int
main(void)
{
int err;
pthread_t tid1, tid2;
struct foo *fp;
 
err = pthread_create(&tid1, NULL, thr_fn1, NULL);
if (err != 0)
err_exit(err, "can't create thread 1");
err = pthread_join(tid1, (void *)&fp);
if (err != 0)
err_exit(err, "can't join with thread 1");
sleep(1);
printf("parent starting second thread\n");
err = pthread_create(&tid2, NULL, thr_fn2, NULL);
if (err != 0)
err_exit(err, "can't create thread 2");
sleep(1);
printfoo("parent:\n", fp);
exit(0);
}
 
Linux中的结果是:
thread 1:
structure at 0x7fbadc518ed0
foo.a = 1
foo.b = 2
foo.c = 3
foo.d = 4
parent starting second thread
thread 2: ID is 140440536979200
parent:
structure at 0x7fbadc518ed0
foo.a = 0
foo.b = 0
foo.c = -330136658
foo.d = 32766
 
可以看到,在调用thread2线程后,fp所指向的内存单元已经被改写。
线程可以通过调用pthread_cancel函数请求取消同一进程中的其他线程。
1 |
|
默认情况下,pthread_cancel函数会使tid表示的线程表现为调用了PTHREAD_CANCELED的pthread_exit函数。但是tid指示的线程可以选择忽略或者其他方式控制它被取消的方式。注意pthread_cancel并不等待线程终止,它仅仅是提出请求。
线程可以安排它退出时需要调用的函数,这与进程在退出时使用atexit函数(第7.3节)安排退出类似。这样的函数称为**线程清理处理程序(thread cleanup handler)**。一个线程可以建立多个清理处理程序,处理程序记录在栈中,也就是说,它们执行的顺序与它们注册时相反。
1 |
|
当(退出)线程执行以下动作时,清理函数rtn由pthread_cleanup_push函数调度,调用时只有一个arg参数:
- 调用pthread_exit
- 响应取消请求(pthread_cancel)
- 用非零execute参数调用pthread_cleanup_pop
如果execute参数为0,则清理函数不被调用。不管发生是上述那种情况,pthread_cleanup_pop都将删除上次pthread_cleanup_push调用建立的清理处理程序。
> 线程清理程序
#include "apue.h"
#include <pthread.h>
 
void cleanup(void *arg)
{
printf("cleanup: %s\n", (char *)arg);
}
 
void *
thr_fn1(void *arg)
{
printf("thread 1 start\n");
pthread_cleanup_push(cleanup, "thread 1 first handler");
pthread_cleanup_push(cleanup, "thread 1 second handler");
printf("thread 1 push complete\n");
if (arg)
return ((void *)1);
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
return ((void *)1);
}
 
void *
thr_fn2(void *arg)
{
printf("thread 2 start\n");
pthread_cleanup_push(cleanup, "thread 2 first handler");
pthread_cleanup_push(cleanup, "thread 2 second handler");
printf("thread 2 push complete\n");
if (arg)
pthread_exit((void *)2);
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
pthread_exit((void *)2);
}
nbsp
int
main(void)
{
int err;
pthread_t tid1, tid2;
void *tret;
 
err = pthread_create(&tid1, NULL, thr_fn1, (void *)1);
if (err != 0)
err_exit(err, "can't create thread 1");
err = pthread_create(&tid2, NULL, thr_fn2, (void *)1);
if (err != 0)
err_exit(err, "can't create thread 2");
 
err = pthread_join(tid1, &tret);
if (err != 0)
err_exit(err, "can't join with thread 1");
printf("thread 1 exit code %ld\n", (long)tret);
 
err = pthread_join(tid2, &tret);
if (err != 0)
err_exit(err, "can't join with thread 2");
printf("thread 2 exit code %ld\n", (long)tret);
exit(0);
}
 
在Linux中,其结果如下:
thread 2 start
thread 2 push complete
cleanup: thread 2 second handler
cleanup: thread 2 first handler
thread 1 start
thread 1 push complete
thread 1 exit code 1
thread 2 exit code 2
可以看到,线程1和线程2都正常启动和退出,但只有线程2的清理程序被调用。因此可以判断,若线程通过返回启动例程的方式终止,则不会调用清理程序;若线程通过pthread_exit()函数终止,则会调用清理程序。注意,清理程序的调用顺序与它们创建时相反。
在FreeBSD和Mac OS X中,上述程序会发生段异常并产生core文件。因为在这两个平台上,pthread_cleanup_push是用宏实现,宏把某些上下文存放在栈上。线程1在pthread_cleanup_push和pthread_cleanup_pop之间返回时,栈已被改写,此时清理程序会调用被改写的上下文。唯一的可移植方法时调用pthread_exit函数。
下面总结线程函数和进程函数的相似之处,如下表所示:
| 进程原语 | 线程原语 | 描述 |
|---|---|---|
| fork | pthread_create | 创建新的控制流 |
| exit | pthread_exit | 从现有的控制流中退出 |
| waitpid | pthread_join | 从控制流中得到退出状态 |
| atexit | pthread_cleanup_push | 注册在退出控制流时调用的函数 |
| getpid | pthread_self | 获取控制流的ID |
| abort | pthread_cancel | 请求控制流的非正常退出 |
在默认情况下,线程的终止状态会保持到对该线程调用pthread_join(和进程中直到父进程调用waitpid相似)。但如果线程分离,线程的底层存储资源会在线程终止时立即被收回。在线程被分离后,不能通过pthread_join等待它的终止状态,对分离状态的线程调用pthread_join会产生未定义行为。
可以通过调用pthread_detach分离线程
1 |
|
线程同步
当多个控制线程共享相同的内存时,需要确保每个线程看到一致的数据。当一个线程可以修改变量,其他线程也可以读取或修改时,则需要对这些线程进行同步,确保它们在访问变量的存储内容时不会访问到无效的值。
锁
当一个线程修改一个变量时,其他线程在读取这个变量时可能会看到不一致的值。若变量修改时间多于一个内存访问周期的的处理器结构中,当内存读与内存写这两个周期交叉时,这种不一致就会出现。
如下图所示,可以看到线程B的内存读操作在线程A的内存写操作(这里内存写需要两个周期)之间,此时线程B内存读的变量,它会与线程A内存写的变量不一致。
为了解决该问题,可以使用锁,锁保证同一时间只允许一个线程访问该变量。如下图所示,可以看到,当线程A或线程B想操作变量时均需要获取锁。锁只能一个时间由一个线程拥有,这样线程B在线程A释放锁前就不能读取该变量。
当两个或以上的线程试图在同一时间修改同一变量时,也需要进行同步。下图为考虑增量操作的情况,增量操作通常分解为3步:
- 从内存单元将变量读入寄存器
- 在寄存器中对变量做增量操作
- 把新值写回内存单元
图中可以看到,两个线程试图在同一时间对同一个变量做增量操作,结果可能出现不一致,变量可能增加1,可能增加2,这取决于第二个线程开始操作时获取的数值。
如果修改操作是原子操作,就不存在竞争。如果数据总是顺序一致出现(即1. 线程都必须按照程序顺序执行;2. 线程都只看到一个单一操作执行,每个操作都是原子执行,并立即对所有线程可见),则不需要额外的同步操作。当多个线程观察不到数据的不一致,那么操作就是顺序一致的。但在现代计算机系统中,内存访问需要多个总线周期,多处理器的总线周期通常在多个处理器上是交叉的,因此不能保证数据是顺序一致的。
互斥量
可以使用pthread的互斥接口来保护数据,确保同一时间只有一个线程访问数据。互斥量(mutex)本质上是一把锁,在访问共享资源时设置互斥量(加锁),在访问后释放互斥量(解锁)。在互斥量加锁后,任何试图再次对互斥量加锁的线程都会被阻塞,直到该线程释放该互斥锁。如果释放互斥量时由一个以上的线程阻塞,那么所有该锁上的阻塞线程变成可运行状态,第一个变为运行状态的线程对互斥量加锁,其他线程看到互斥量仍然时锁着的,则会再次进入阻塞状态直到互斥量可用。这种方式下,每次只有一个线程可以执行。
只有将所有线程都设置成遵守相同数据访问规则时,互斥机制才能正常工作。操作系统并不会为我们做数据访问的串行化。
互斥变量可以用pthread_mutex_t数据类型表示。在使用互斥变量前,需要将其进行初始化,可以把它设置为常量PTHREAD_MUTEX_INITIALIZER(只适用于静态分配的互斥量),也可以通过调用pthread_mutex_init函数进行初始化。如果是动态分配互斥量(例如,调用malloc函数),在释放内存前需要调用pthread_mutex_destroy函数。
1 |
|
attr表示互斥量的属性,需要用默认属性初始化互斥量时,可以将attr设为NULL。
对互斥量进行加锁,需要调用pthread_mutex_lock。如果互斥量已经上锁,调用线程将阻塞直到互斥量被解锁。对互斥量解锁,需要调用pthread_mutex_unlock。
1 |
|
如果希望线程不被阻塞,可以使用pthread_mutex_trylock尝试对互斥量加锁。如果调用该函数时互斥量处于未锁住状态,则该函数会锁住互斥量并返回0而不会出现阻塞,否则该函数调用失败,不能锁住互斥量,返回EBUSY。
实例
> 互斥量保护数据结构
#include "apue.h"
#include <pthread.h>
 
struct foo {
int f_count;
pthread_mutex_t f_lock;
int f_id;
/* ... more stuff here ...*/
};
 
struct foo *
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
 
if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp->f_count = 1;
fp->f_id = id;
/* init mutex */
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return (NULL);
}
/* ... continue initialization ... */
}
return (fp);
}
 
void
foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
 
void
foo_rele(struct foo *fp) /* release a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
if (--fp->f_count == 0) { /* last reference */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else {
pthread_mutex_unlock(&fp->f_lock);
}
}
在foo_alloc函数中为初始化计数器,并不需要加锁,因为在该操作前分配线程是唯一引用该对象的线程。后续如果该对象被放到一个列表中,则它可能被其他线程发现,此时需要对它加锁。
在使用该对象时,线程需要调用foo_hold为该对象的引用计数加1。当线程对使用完毕后,必须调用foo_rele释放引用。当最后一个引用被释放时,该对象所占的空间就被释放。
该例子中,忽视了线程在调用foo_hold时如何找到该对象。如果另一个线程在调用foo_hold时阻塞等待互斥锁,这时即使该对象引用计数为0,也foo_rele不应该释放该对象(的内存空间)。我们可以通过确保在释放内存之前找不到该对象来避免这个问题。
避免死锁
如果一个线程试图对一个互斥量加锁两次,那么它自身会陷入死锁状态。在使用互斥量时,其他方式也可能产生死锁。例如,程序使用一个以上的互斥量,线程1占有第一个互斥量,请求第二个互斥量时阻塞,但线程2占有第二个互斥量,请求第一个互斥量时阻塞,由于两个线程都在相互请求另一个线程拥有的资源,所以两个线程都无法运行,于是陷入死锁状态。
可以通过控制互斥量加锁顺序来避免死锁。例如,需要对两个互斥量A和B同时加锁,所有线程只有在锁住互斥量A后才能对互斥量B加锁,这样两个互斥量就避免了死锁(当然在其他资源上也可能出现死锁)。
只有当一个线程试图以另一个线程相反的顺序锁住互斥量时,才会有可能发生死锁。
有时候,应用程序的结构使得对互斥量进行排序十分困难。如果涉及太多的锁和数据结构,可用函数不能把它转换成简单层次,那么就需要采用另外的方法:线程可以使用pthread_mutex_trylock接口避免死锁。如果线程已占有某些锁,并且pthread_mutex_trylock接口返回成功,则程序继续执行;如果函数调用失败,则线程释放已占有的锁,做好清理工作,后续在重新尝试。
实例
>使用两个互斥量,hash表的操作
#include "apue.h"
#include <pthread.h>
/* 此处哈希表为拉链法,即通过链表解决哈希冲突 */
#define NHASH 29 /* 哈希表的大小 */
#define HASH(id) (((unsigned long)id) % NHASH) /* 获取哈希值 */
 
struct foo *fh[NHASH]; /* 哈希表,公共变量,会被所有线程发现 */
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER; /* 初始化哈希表锁hashlock */
 
struct foo {
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here ... */
};
 
struct foo*
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
int idx;
 
if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return (NULL);
}
idx = HASH(id);
/* 可以看到这里要使用两个互斥量,则先锁hashlock,后锁f_lock */
pthread_mutex_lock(&hashlock);
/* 下面两步是将foo节点插入哈希表中 */
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ...continue initialization ... */
pthread_mutex_unlock(&fp->f_lock);
}
return (fp);
}
 
void
foo_hold(struct foo *fp) /* add reference to the object */
{
/* 可以看到,f_count是由f_lock保护 */
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
 
struct foo *
foo_find(int id) /* find an existing object */
{
struct foo *fp;
/* 这里只对哈希表操作,而不修改foo节点的内容,因此只需要锁hashlock */
pthread_mutex_lock(&hashlock);
for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
if (fp->f_id == id) {
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return (fp);
}
 
void
foo_rele(struct foo *fp) /* release a reference to the object */
{
struct foo *tfp;
int idx;
 
pthread_mutex_lock(&fp->f_lock);
if (fp->f_count == 1) { /* last reference */
/* 当fp节点此时引用数为1,即只有最后一个引用,因此需要将引用减1之后释放该节点 */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
phtread_mutex_lock(&fp->f_lock);
/* need to recheck the condition */
/* 这里对f_count进行了二次判断,因为在上一个if语句后,获取hashlock之前,其他线程可能修改f_count的值,因此需要对f_count进行二次判断 */
if (fp->f_count != 1) {
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
return;
}
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
/* 寻找hash表中对应的foo节点并删除 */
if (tfp == fp) {
fh[idx] = fp->f_next;
}
else {
while (tfp->f_next != fp)
tfp = tfp->f_next;
tfp->f_next = fp->f_next;
}
/* 先释放hashloc,在释放f_lock */
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else {
/* 若fp节点引用不为1,则需简单地减1即可,不需要释放foo节点 */
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
}
}
上述程序,展现了两个互斥量的操作,这里通过让它们以相同的顺序加锁(可以看到,当需要使用两个互斥量时,先锁hashlock,在锁f_lock),这样就可以避免死锁。hashlock互斥量用于保护散列表fh和foo结构体中的散列字段f_next。f_lock互斥量保护foo结构体中的其他字段。
foo_find函数先锁住散列表锁hashlock,然后在散列表中寻找对应的foo节点,找到后调用foo_hold函数,锁住foo节点对应的锁f_lock,然后将对应节点的f_count加1。
foo_rele函数相比于一个互斥量的情况,实现更加的复杂。代码相应的解释已在上述代码中描述。
可以看到这种锁方法十分复杂。我们可以把散列表锁(hashlock)来保护结构引用计数,结构锁(f_lock)用于保护foo结构中的所有其他内容。
>简化的锁
#include <stdlib.h>
#include <pthread.h>
 
#define NHASH 29
#define HASH(id) (((unsigned int)id) % NHASH)
 
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
 
struct foo {
int f_count; /* protected by hashlock */
pthread_mutex_t f_lock;
int f_id;
struct foo *f_next; /* protected by hashlock */
/* ... more stuff here ... */
};
 
struct foo *
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;
int idx;
 
if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp-> f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return (NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_lock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
/* ...continue initialization ... */
pthread_mutex_unlock(&fp->f_lock);
}
return (fp);
}
 
void
foo_hold(struct foo *fp) { /* add a reference to the object */
/* 可以看到,这里通过散列锁hashlock保护f_count */
pthread_mutex_lock(&hashlock);
fp->f_count++;
pthread_mutex_unlock(&hashlock);
}
 
struct foo *
foo_find(int id) /* find an existing object */
{
struct foo *fp;
 
pthread_mutex_lock(&hashlock);
for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
if (fp->f_id == id) {
fp->f_count++;
break;
}
}
pthread_mutex_unlock(&hashlock);
return (fp);
}
 
void
foo_rele(struct foo *fp) /* release a reference to the object */
{
struct foo *tfp;
int idx;
 
pthread_mutex_lock(&hashlock);
if (--fp->f_count == 0) { /* last reference,remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if (tfp == fp) {
fh[idx] = fp->f_next;
}
else {
while (tfp->f_next != fp)
tfp = tfp->f_next;
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
}
else {
pthread_mutex_unlock(&hashlock);
}
}
可以看到,这里的程序比上一个程序简单许多,在这里f_count也通过hashlock保护,因此涉及f_count的操作都不需要结构锁f_lock,围绕散列表和引用计数的锁排序问题就不存在了。但是,这里也存在一个问题,都使用hashlock时,导致该锁的粒度太粗,导致程序性能不佳。
在多线程程序中,如果锁的粒度太粗,就会出现很多线程阻塞等待相同的锁,导致不能改善并发性。如果锁的粒度太细,那么过多的锁开销会使系统性能收到影响,而且代码变得更加复杂。因此需要在满足锁需求的情况下,在代码复杂度和性能之间找到正确的平衡。
函数pthread_mutex_timedlock
pthread_mutex_timedlock可以绑定线程阻塞时间。pthread_mutex_timedlock和pthread_mutex_lock基本等价,但是当pthread_mutex_timedlock超过设定的时间值后,其不会对互斥量进行加锁而阻塞,而是返回错误码ETIMEDOUT。
1 |
|
这里的tsptr值指定的绝对时间(与相对时间相反,这里指我们愿意阻塞到时间X,而不是愿意阻塞Y秒)。这个超时时间就是timespec结构表示。可以通过该函数避免线程永久阻塞。
实例
>使用pthread_mutex_timedlock
#include "apue.h"
#include <pthread.h>
 
int
main(void)
{
int err;
struct timespec tout;
struct tm *tmp;
char buf[64];
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 
pthread_mutex_lock(&lock); /* 主线程锁住lock */
printf("mutex is locked\n");
 
/* 获取时钟CLOCK_REALTIME的当前值并将其存储在tout中。 */
clock_gettime(CLOCK_REALTIME, &tout);
/* 将tout.tv_sec转换成struct tm格式 */
tmp = localtime(&tout.tv_sec);
/* 将struct tm格式的tmp转换成字符串 */
strftime(buf, sizeof(buf), "%r", tmp);
printf("current time is %s\n", buf);
tout.tv_sec += 10;
/* caution: this could lead to deadlock */
/*主线程试图去获取lock,但之前lock已被主线程锁定,因此会进入死锁状态 */
err = pthread_mutex_timedlock(&lock, &tout);
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("the time is now %s\n", buf);
if (err == 0)
printf("mutex locked again!\n");
else
printf("can't lock mutex again: %s\n", strerror(err));
exit(0);
}
 
Linux中的输出结果为:
mutex is locked
current time is 08:28:51 PM
the time is now 08:29:01 PM
can't lock mutex again: Connection timed out
这个程序故意对已有互斥量进行加锁,演示pthread_mutex_timedlock的工作。不推荐在实际中使用这种策略,这会导致死锁。
Mac OS没有支持pthread_mutex_timedlock,FreeBSD 8.0和Linux3.2.0以及Solaris 10支持该函数。
读写锁
读写锁(reader-writer lock)与互斥量类似,不过读写锁允许更高的并行性。互斥锁要么是加锁状态,要么是不加锁状态,而且一次只有一个线程可以对其加锁。读写锁可以有3种状态:(1)读模式下加锁状态、(2)写模式下加锁状态、(3)不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。
当读写锁为写加锁状态时,在解锁之前,所有试图多该锁加锁的线程都会被阻塞。当读写锁处于读加锁状态时,所有试图以读模式对其加锁的线程都可以获得访问权,但是任何写模式加锁的线程都会阻塞,直到所有线程释放读锁为止。
当读写锁处于读模式加锁状态时,若有一个线程试图以写模式获取读写锁时,读写锁会阻塞后续的读锁请求,以避免读锁长期占用,而等待的写锁一直得不到满足。
读写锁非常适用于对数据结构读的次数远大于写的情况。在读写锁的写模式下,它所保护的数据结构可以安全修改,因为一次只有一个线程可以在写模式下获取该锁。当读写锁处于读模式下,只要线程获取了读模式的锁,该锁保护的数据结构就可以被多个获得读模式锁的线程读取。
读写锁也被称为共享互斥锁(shared-exclusive lock)。读模式下,称为以共享模式锁住;写模式下,称为互斥模式锁住。
与互斥量相比,读写锁在使用之前必须初始化,在释放它们底层的内存之前必须销毁。
1 |
|
读写锁通过调用pthread_rwlock_init进行初始化,如果希望读写锁拥有默认属性,可以传一个null指针给attr。
PTHREAD_RWLOCK_INITIALIZER常量(默认属性)也可用于对静态分配的读写锁进行初始化。
在释放读写锁占用的内存之前,需要调用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init为读写锁分配资源,pthread_rwlock_destroy将释放这些资源。
在读模式下锁定读写锁,需要调用pthread_rwlock_rdlock。在写模式下锁定读写锁,需要调用pthread_rwlock_wrlock。不管以何种方式获取读写锁,都可以调用pthread_rwlock_unlock释放锁。
1 |
|
各种实现会对共享模式下的读写锁次数进行限制,所以需要检查pthread_rwlock_rdlock的返回值。错误返回值的定义只是针对不正确使用读写锁的情况(如未初始化锁),或者试图获取已拥有的锁导致可能产生死锁的情况。
Single UNIX Specification还定义了读写锁原语的条件版本。
1 |
|
获取锁时,若成功返回0;否则,返回EBUSY。
读写锁的使用
>读写锁的使用
#include <stdlib.h>
#include <pthread.h>
struct job {
struct job *j_next;
struct job *j_prev;
pthread_t j_id; /* tell which thread handles this jon */
/* ...more stuff here */
};
struct queue {
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};
/*
* Initialize a queue
*/
int
queue_init(struct queue *qp)
{
int err;
 
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock, NULL);
if (err != 0)
return (err);
/* ...continue initialization ... */
return (0);
}
/*
* Insert a job at the head of the queue.
*/
void
job_insert(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = qp->q_head;
jp->j_prev = NULL;
if (qp->q_head != NULL)
qp->q_head->j_prev = jp;
else
qp->q_tail = jp; /* list was empty */
qp->q_head = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
 
/*
* Append a job on the tail of the queue
*/
void
job_append(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = NULL;
jp->j_prev = qp->q_tail;
if (qp->q_tail != NULL)
qp->q_tail->j_next = jp;
else
qp->q_head = jp; /* list was empty */
qp->q_tail = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
 
/*
* Remove the given job from a queue
*/
void
job_remove(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
/* 双向链表的操作,如果删除节点是头节点 */
if (jp == qp->q_head) {
qp->q_head = jp->j_next;
if (qp->q_tail == jp)
qp->q_tail = NULL;
else
jp->j_next->j_prev = jp->j_prev;
}
/* 如果删除节点是尾节点 */
else if (jp == qp->q_tail) {
qp->q_tail = jp->j_prev;
jp->j_prev->j_next = jp->j_next;
}
/* 如果删除节点是中间节点 */
else {
jp->j_prev->j_next = jp->j_next;
jp->j_next->j_prev = jp->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}
 
/*
* Find a job for the given thread ID.
*/
struct job*
job_find(struct queue *qp, pthread_t id)
{
struct job *jp;
if (phtread_rwlock_rdlock(&qp->q_lock) != 0)
return (NULL);
 
for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
if (pthread_equeal(jp->j_id, id))
break;
 
pthread_rwlock_unlock(&qp->q_lock);
return (jp);
}
在上述例子,凡是需要想队列中增加作业或者从队列中删除作业时,都需要采用写模式来锁住队列的读写锁。对于搜索队列,可以采用读模式锁去获取读写锁,从而允许所有工作线程并发地搜索队列。只有在线程搜索作业的频率远远高于增加或删除作业时,使用读写锁才能改善心性能。
工作线程只能从队列中读取与它们的线程ID匹配的作业。
带有超时的读写锁
与互斥量相同,Single UNIX Specification提供了带有超时的读写锁加锁函数,使应用程序在获取读写锁时避免陷入永久阻塞状态。
1 |
|
这两个函数的行为与不计时的版本类似,tsptr参数只想timespec结构,表示线程应该停止阻塞的时间。如果他们不能获取锁,那么超时到期后,这两个函数返回ETIMEDOUT错误。与pthread_mutex_timedlock函数类似,超时指定的是绝对时间,不是相对时间。
条件变量
条件变量与互斥量一起使用时,允许线程以无竞争方式等待特定的条件发生。
条件本身由互斥量保护。线程在改变条件状态之前必须首先锁住互斥量。其他线程直到它们获取互斥量之前都不会察觉到这种变化,因为只有锁住互斥锁才能计算条件变量。
在使用条件变量之前,需要进行初始化。条件变量有两种初始化方式,可以把常量PTHREAD_COND_INITIALIZER复制给静态分配的条件变量,如果条件变量是动态分配的,则需要使用pthread_cond_init函数对它进行初始化。
在释放条件变量的内存之前,需要使用pthread_cond_destroy函数对条件变量进行反初始化(deinitialize)。
1 |
|
attr表示条件变量的属性,设为NULL表示默认的条件变量。
我们可以使用pthread_cond_wait等待条件变量变为真。(pthread_cond_timedwait())如果在给定的时间内条件不能满足,则会返回一个错误码。
1 |
|
传递给 pthread_cond_wait 的互斥量保护条件变量。 调用者将互斥量传递给函数,然后函数以原子方式将调用线程放在等待条件的线程列表中并解锁互斥锁。 这会关闭检查条件的时间和线程进入睡眠等待条件更改的时间之间的窗口,以便线程不会错过条件的更改(如果不这么做,当在检查条件和线程进入睡眠之间的时间窗口内条件变量改变,如果后续条件不再发生改变,这会导致线程一直阻塞)。 当 pthread_cond_wait 返回时,互斥锁再次被锁定。
对于tsptr参数,这是一个timespec结构,表示愿意等待的时间。可以使用clock_gettime函数获取timespec结构表示当前时间。但不是所有平台都支持该函数,可以通过gettimeofday获取timeval结构表示当前时间,然后把这个时间转换成timespec结构。
要获得超时值的绝对时间,可以使用下面的函数(假设阻塞的最大时间使用分来表示)。
1 |
|
如果超时后条件还没有出现,pthread_cond_timewait将重新获取互斥量,然后返回错误ETIMEDOUT。从pthread_cond_wait或phtread_cond_timedwait调用成功返回是,线程需要检查条件,因为另一个线程可能已经在运行并改变了条件。
有两个函数可以用户通知线程条件以满足。pthread_cond_signal函数至少能唤醒一个等待该条件的线程。而pthread_cond_broadcast函数则能够唤醒等待该条件的所有线程。
1 |
|
在调用pthread_cond_signal或者pthread_cond_broadcast时,表示在给线程或者条件发信号。必须注意,一定要在改变条件状态以后在给线程发信号。
实例
下面给出了如何使用条件变量和互斥量对线程进行同步:
>使用条件变量
#include <pthread.h>
 
struct msg {
struct msg *m_next;
/* ...more stuff here ... */
};
 
struct msg *workq;
 
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
 
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
 
void
process_msg(void)
{
struct msg *mp;
 
for (;;) {
/* 在使用条件变量前,先使用互斥量qlock锁住 */
pthread_mutex_lock(&qlock);
/* 当工作链表为NULL则等待 */
while (workq == NULL) /*若当线程醒来后,发现队列仍为空,可以继续等待 */
pthread_cond_wait(&qready, &qlock);
mp = workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
/* now process the message mp */
}
}
 
void
enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
/* 在给等待线程发信号时,不需要占有互斥量 */
pthread_cond_signal(&qready);
}
自旋锁
自旋锁与互斥量类似,但它不是通过休眠使线程阻塞,而是在获取锁之前一直处于忙等(自旋)阻塞状态。适用于:锁被持有的时间短,而且线程不希望在重新调度上花费太多的成本。
自旋锁通常作为底层原语用于实现其他类型的锁。
在用户层,自旋锁并不非常有用。运行在分时调度中的用户线程在两种情况下会被取消调度:1. 当它们的时间片到期;2. 具有更优先级的线程就绪变成可运行状态。在这些情况下,拥有自旋锁的线程会进入休眠状态,阻塞在锁上的其他线程自旋的时间可能比预期的时间要长。
互斥量的实现十分高效,以至于应用程序采用互斥锁的性能与自旋锁相当。事实上,有些互斥量的实现在试图获取互斥量的时候会自旋一段时间,只有在自旋计数到达某一阈值时才会休眠。这些因素,加上现代处理器的进步,使得上下文切换越来越快,也使得自旋锁只在某些特定的情况下有用。
自旋锁与互斥锁的借口类似。可以使用pthread_spin_init函数初始化自旋锁。使用pthread_spin_destroy函数对自旋锁去初始化。
1 |
|
pshared参数表示进程共享属性,表示自旋锁是如何获取的。 如果它设置为 PTHREAD_PROCESS_SHARED,那么可以访问自旋锁底层内存的线程可以获得自旋锁,即使这些线程来自不同的进程。若pshared 参数设置为 PTHREAD_PROCESS_PRIVATE 并且只能从初始化它的进程中的线程访问自旋锁。
可以使用pthread_spin_lock或pthread_spin_trylock对自旋锁进行加锁,前者获取锁之前一直自旋,后者如果不能获取锁,就立即返回EBUSY错误。注意,pthread_spin_trylock不能自旋。不管以何种方式加锁,自旋锁都可以调用pthread_spin_unlock函数解锁。
1 |
|
注意,如果自旋锁当前在解锁状态时,pthread_spin_lock函数不需要自旋就可以对它加锁。如果线程已经对其加锁,则结果未定义。调用pthread_spin_lock会返回EDEADLK错误(或其他错误),或者调用可能永久自旋,这取决于具体实现。试图对未加锁的自旋锁解锁,结果也是未定义。
需要注意,不要调用在持有自旋锁的情况下可能会进入休眠状态的函数,如果调用了这些函数,会浪费CPU资源,因为其他线程需要获取自旋锁需要等待的时间就延长了。
屏障
屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许多个线程等待,直到所有的合作线程都达到某一点,然后从该点继续执行。我们已经看到一种屏障,pthread_join函数就是一种屏障,允许一个线程等待,直到另一个线程退出。
但屏障的概念更广,它允许任意数量的线程等待,直到所有线程完成处理工作,而线程不需要退出。所有线程到达屏障后可接着工作。
可以使用pthread_barrier_init函数对屏障进行初始化,用pthread_barrier_destroy函数去初始化屏障。
1 |
|
参数count指定必须到达屏障的线程数目,参数attr指定屏障对象的属性。
pthread_barrier_wait函数表明,线程已完成工作,准备等待所有其他线程赶上来。
1 |
|
调用pthread_barrier_wait的线程在屏障计数(phtread_barrier_init中的count参数)为满足条件时,会进入休眠状态。如果该线程是最后一个调用pthread_barrier_wait的线程,就满足了屏障计数,则所有线程会被唤醒。
对于一个任意线程,pthread_barrier_wait函数返回了PTHREAD_BARRIER_SERIAL_THREAD,剩下的其他线程看到的返回值为0.这使得一个线程可以作为主线程对所有其他线程完成的工作的结果进行操作。
一旦达到屏障计数值,而且线程处于非阻塞状态,屏障就可以被重用。除非是调用了pthread_barrier_destroy后,又调用了pthread_barrier_init函数对计数用另外的值进行初始化。否则屏障计数不会改变。
实例
使用8个线程来对800万数字进行排序工作,每个线程用堆排序算法堆100万个数进行排序,然后主线程调用merge函数对结果进行合并。
这里并不需要使用PHTREAD_BARRIER_SERIAL_THREAD来决定哪个线程执行合并操作,这由主线程完成,这也是把屏障值设为工作线程加1的原因,主线程也是其中一个候选线程。
在单线程程序中,耗时12.14秒。在8核处理器系统上,8个线程并行和1个线程合并,相同的数字耗时只要1.91秒,速度提升了6倍。
在我的Linux系统中,耗时为2.6秒
>使用屏障
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>
 
#define NTHR 8 /* number of threads */
#define NUMNUM 8000000L /* number of numbers to sort */
#define TNUM (NUMNUM/NTHR) /* number to sort per thread */
 
long nums[NUMNUM];
long snums[NUMNUM];
 
pthread_barrier_t b;
 
#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t, int (*)(const void *, const void *));
#endif
 
/*
* Compare two long integers (helper function for heapsort)
*/
int
complong(const void *arg1, const void *arg2)
{
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;
 
if (l1 == l2)
return 0;
else if (l1 < l2)
return -1;
else
return 1;
}
 
/*
* Worker thread to sort a portion of the set of numbers
*/
void *
thr_fn(void *arg)
{
long idx = (long)arg;
 
heapsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b);
 
/*
* Go off and perform more work ...
*/
return ((void *)0);
}
 
/*
* Merge the results of the individual sorted ranges
*/
void
merge()
{
long idx[NTHR];
long i, minidx, sidx, num;
 
for (i = 0; i < NTHR; i++)
idx[i] = i * TNUM;
for (sidx = 0; sidx < NUMNUM; sidx++) {
num = LONG_MAX;
/* 对各线程负责的数组范围内,比较第一个值并找出最小值 */
for (i = 0; i < NTHR; i++) {
if ((idx[i] < (i + 1) * TNUM) && (nums[idx[i]] < num)) {
num = nums[idx[i]];
minidx = i;
}
}
/* 将最小值赋值给snums,并对相应的线程索引+1 */
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}
 
int
main()
{
unsigned long i;
struct timeval start, end;
long long startusec, endusec;
double elapsed;
int err;
pthread_t tid;
 
/*
* Create the initial set of numbers to sort.
*/
srandom(1);
for (i = 0; i < NUMNUM; i++)
nums[i] = random();
 
/*
* Create 8 threads to sort the numbers.
*/
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR + 1);
for (i = 0; i < NTHR; i++) {
/* 创建线程来执行相应范围的数字排序 */
err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
if (err != 0)
err_exit(err, "can't create thread");
}
/* 主线程和其他线程等待所有线程完成排序工作 */
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);
 
/*
* Print the sorted list.
*/
startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds \n", elapsed);
for (i = 0; i < NUMNUM; i++)
printf("%ld\n", snums[i]);
exit(0);
}
小结
本章介绍了线程的概念,介绍了5个线程同步机制:互斥量、读写锁、条件变量、自旋锁、屏障。
习题
这部分是对APUE书本上的习题进行解答
11.1
修改图11-4所示的实例代码,正确地在两个线程之间传递结构。
11-4的实例代码如下:
>11-4 pthread_exit参数的不正确使用
#include "apue.h" #include <pthread.h>   struct foo { int a, b, c, d; };   void printfoo(const char *s, const struct foo *fp) { printf("%s", s); printf(" structure at 0x%lx\n", (unsigned long)fp); printf(" foo.a = %d\n", fp->a); printf(" foo.b = %d\n", fp->b); printf(" foo.c = %d\n", fp->c); printf(" foo.d = %d\n", fp->d); }   void * thr_fn1(void *arg) { struct foo foo = {1, 2, 3, 4};   printfoo("thread 1:\n", &foo); pthread_exit((void *)&foo); }   void * thr_fn2(void *arg) { printf("thread 2: ID is %lu\n", (unsigned long)pthread_self()); pthread_exit((void *)0); }   int main(void) { int err; pthread_t tid1, tid2; struct foo *fp;   err = pthread_create(&tid1, NULL, thr_fn1, NULL); if (err != 0) err_exit(err, "can't create thread 1"); err = pthread_join(tid1, (void *)&fp); if (err != 0) err_exit(err, "can't join with thread 1"); sleep(1); printf("parent starting second thread\n"); err = pthread_create(&tid2, NULL, thr_fn2, NULL); if (err != 0) err_exit(err, "can't create thread 2"); sleep(1); printfoo("parent:\n", fp); exit(0); }
可以把struct foo以动态内存分配的方式创建内存。即:
1 | void *thr_fn1(void *arg) |
11.2
在图11-14所示的实例代码中,需要另外添加什么同步(如果需要的话)可以使得主线程改变与挂起作业关联的线程ID?这会对job_remove函数产生什么影响?
11-14的代码如下:
>使用读写锁
#include <stdlib.h>
#include <pthread.h>
 
struct job {
struct job *j_next;
struct job *j_prev;
pthread_t j_id; /* tells which thread handles this job */
/* ... more stuff here ... */
};
 
struct queue {
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};
 
/*
* Initialize a queue.
*/
int
queue_init(struct queue *qp)
{
int err;
 
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock, NULL);
if (err != 0)
return(err);
/* ... continue initialization ... */
return(0);
}
 
/*
* Insert a job at the head of the queue.
*/
void
job_insert(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = qp->q_head;
jp->j_prev = NULL;
if (qp->q_head != NULL)
qp->q_head->j_prev = jp;
else
qp->q_tail = jp; /* list was empty */
qp->q_head = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
 
/*
* Append a job on the tail of the queue.
*/
void
job_append(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = NULL;
jp->j_prev = qp->q_tail;
if (qp->q_tail != NULL)
qp->q_tail->j_next = jp;
else
qp->q_head = jp; /* list was empty */
qp->q_tail = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
 
/*
* Remove the given job from a queue.
*/
void
job_remove(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
if (jp == qp->q_head) {
qp->q_head = jp->j_next;
if (qp->q_tail == jp)
qp->q_tail = NULL;
else
jp->j_next->j_prev = jp->j_prev;
} else if (jp == qp->q_tail) {
qp->q_tail = jp->j_prev;
jp->j_prev->j_next = jp->j_next;
} else {
jp->j_prev->j_next = jp->j_next;
jp->j_next->j_prev = jp->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}
 
/*
* Find a job for the given thread ID.
*/
struct job *
job_find(struct queue *qp, pthread_t id)
{
struct job *jp;
 
if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
return(NULL);
 
for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
if (pthread_equal(jp->j_id, id))
break;
 
pthread_rwlock_unlock(&qp->q_lock);
return(jp);
}
此时存在的问题:在job_find和job_remove之间的时间内可以改动作业ID,这会引起一个作业通过线程ID被找到并分配给了线程1,但在remove之前,这个job中的线程ID被改变成线程2。这个问题可以通过在job结构中增加引用计数和互斥量,然后find中增加引用计数的方式解决,当该引用计数不为0时,主线程不得更改该job的线程ID。
11.3
把图11-15(使用条件变量)中的技术运用到工作线程实例(图11-1(工作队列)和图11-14(使用读写锁))中实现工作线程函数。不要忘记更新queue_init函数对条件变量进行初始,修改job_insert和job_append函数给工作线程发信号。会出现什么困难?
修改的代码:
1 |
|
遇到的问题:
原代码是通过读写锁实现,若更换为条件变量时,需要搭配互斥锁进行保护条件变量。可以看到在job_find和job_remove都添加了条件变量和互斥锁,这导致了所有工作线程都等待一个条件满足(即queue中插入一个工作),此时若有很多工作线程时,当唤醒多个线程后又没有工作可做时,出现惊群效应问题,导致CPU资源浪费,并且增加了锁的争夺。
11-4
下列哪个步骤序列是正确的?
- 对互斥量加锁
- 改变互斥量保护的条件
- 给等待条件的线程发信号
- 对互斥量解锁
或
- 多互斥量加锁
- 改变互斥量保护的条件
- 对互斥量解锁
- 给等待条件的线程发信号
两种情况都可能是正确的,都有不足
第一种情况:等待线程会在收到信号后唤醒,但由于互斥锁被其他线程持有,则该线程马上又会进入阻塞状态
第二种情况:等待线程可能在步骤3和步骤4中获得互斥锁,但是条件判断失败,又会释放锁。当步骤4执行后,唤醒等待线程,等待线程判断条件并退出while循环(不能仅仅因为pthread_cond_wait返回即判断为真,因为其他线程也有可能消耗条件资源,从而导致程序错误)。
11-5
实现屏障需要什么同步原语?给出pthread_barrier_wait函数的一个实现。
我认为可以通过条件变量来实现,设定一个共享值,当线程到达barrier_wait时,增加该共享值,若共享值小于屏障init时设定的count,则线程调用pthread_cond_wait进入等待队列,并释放互斥锁;直到一个线程到达barrier_wait,增加共享值后等于count时,该线程通过pthread_cond_broadcast唤醒所有等待线程。