Linux线程

如果把聊天软件看作一个进程,则线程就是发送聊天信息这一个单一动作的其中一个,聊天的过程可以细分为若干的线程,这就是线程。

一、线程函数

1.原型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include<pthread.h>
//比较两个线程ID,相等返回0
int pthread_equal(pthread_t tid1,pthread_t tid2);
//获取自身线程ID
pthread_t pthread_self(void);

//创建线程:线程ID,属性,动作函数,动作函数的参数
//注意:不保证哪个线程会先运行
int pthread_create(pthread_t *restrict tidp
const pthread_attr_t *restrict attr,
void *(*start_rtn)(void *),
void *restrict arg);

//终止线程:1.简单返回,rval_ptr包含返回码
// 2.线程取消,rval_ptr指定内存单元设置为PTHREAD_CANCELED
void pthread_exit(void *rval_ptr);

//请求取消同一进程中的其他线程
int pthread_cancel(pthread_t tid);

//线程清理处理函数
void pthread_cleanup_push(void(*rtn)(void*),void *arg);
void pthread_cleanup_pop(int execute);

//线程状态分为可结合的(joinable)或则分离的(detached)
//分离线程,使线程资源在终止时立刻被系统回收,并获得退出码(存在rval_ptr指向的单元)
int pthread_join(pthread_t thread,void **rval_ptr);
//分离线程,使线程资源在终止时立刻被系统释放
int pthread_detach(pthread_t tid);

二、线程同步

1.互斥量

原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include<pthread.h>
//使用互斥量前要把他设为常量PTHREAD_MUTEX_INITIALIZER
//或者使用pthread_mutex_init初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
//动态分配互斥量后(如使用malloc)要调用pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex);

//加锁,对已经上锁的则阻塞
int pthread_mutex_lock(pthread_mutex_t *mutex);
//尝试加锁,对已经上锁的不阻塞,返回EBUSY
int pthread_mutex_trylock(pthread_mutex_t *mutex);
//解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);

#include<time.h>
#include<pthread.h>
//等同pthread_mutex_lock加上时间限制,超过时间则不加锁,返回ETIMEDOUT
int pthread_mutex_timedlock(phread_mutx_t *restrict mutex,
const struct timespec *restrict tsptr);

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <stdlib.h>
#include <pthread.h>

struct foo {
int f_count;
pthread_mutex_t f_lock;
int f_id;
/* ... more stuff here ... */
};

struct foo *
foo_alloc(int id) /* allocate the object */
{
struct foo *fp;

if ((fp = malloc(sizeof(struct foo))) != NULL) {
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
free(fp);
return(NULL);
}
/* ... continue initialization ... */
}
return(fp);
}

void
foo_hold(struct foo *fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}

void
foo_rele(struct foo *fp) /* release a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
if (--fp->f_count == 0) { /* last reference */
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destroy(&fp->f_lock);
free(fp);
} else {
pthread_mutex_unlock(&fp->f_lock);
}
}

2.读写锁

读写锁适合对于数据结构读的次数远大于写的情况。写模式下锁住单一进程进行写动作,读模式下可被多线程进行读取动作。

原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
#include<pthread.h>
//初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
//释放资源
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

//读模式下锁定
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
//写模式下锁定
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
//解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <stdlib.h>
#include <pthread.h>

struct job {
struct job *j_next;
struct job *j_prev;
pthread_t j_id; /* tells which thread handles this job */
/* ... more stuff here ... */
};

struct queue {
struct job *q_head;
struct job *q_tail;
pthread_rwlock_t q_lock;
};

/*
* Initialize a queue.
*/
int
queue_init(struct queue *qp)
{
int err;

qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock, NULL);
if (err != 0)
return(err);
/* ... continue initialization ... */
return(0);
}

/*
* Insert a job at the head of the queue.
*/
void
job_insert(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = qp->q_head;
jp->j_prev = NULL;
if (qp->q_head != NULL)
qp->q_head->j_prev = jp;
else
qp->q_tail = jp; /* list was empty */
qp->q_head = jp;
pthread_rwlock_unlock(&qp->q_lock);
}

/*
* Append a job on the tail of the queue.
*/
void
job_append(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = NULL;
jp->j_prev = qp->q_tail;
if (qp->q_tail != NULL)
qp->q_tail->j_next = jp;
else
qp->q_head = jp; /* list was empty */
qp->q_tail = jp;
pthread_rwlock_unlock(&qp->q_lock);
}

/*
* Remove the given job from a queue.
*/
void
job_remove(struct queue *qp, struct job *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
if (jp == qp->q_head) {
qp->q_head = jp->j_next;
if (qp->q_tail == jp)
qp->q_tail = NULL;
else
jp->j_next->j_prev = jp->j_prev;
} else if (jp == qp->q_tail) {
qp->q_tail = jp->j_prev;
jp->j_prev->j_next = jp->j_next;
} else {
jp->j_prev->j_next = jp->j_next;
jp->j_next->j_prev = jp->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}

/*
* Find a job for the given thread ID.
*/
struct job *
job_find(struct queue *qp, pthread_t id)
{
struct job *jp;

if (pthread_rwlock_rdlock(&qp->q_lock) != 0)
return(NULL);

for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
if (pthread_equal(jp->j_id, id))
break;

pthread_rwlock_unlock(&qp->q_lock);
return(jp);
}

3.条件变量

条件变量和互斥量仪器使用时,允许线程以无竞争的方式等待特定的条件发生。具体表现为调用者把锁住的互斥量传给函数,函数自动把调用线程放到等待条件的线程列表上,对互斥量解锁。满足条件后实施动作并返回时(即pthread_cond_wait返回时),互斥量再次被锁住。

原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include<pthread.h>
//初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
//释放资源
int pthread_cond_destroy(pthread_cond_t *cond);

//等待条件为真,限定时间不能满足条件则返回错误码
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
//同上,但可以设置愿意等待的时间
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);

//唤醒等待该条件的一个线程
int pthread_cond_signal(pthread_cond_t *cond);
//唤醒等待该条件的所有进程
int pthread_cond_broadcast(pthread_cond_t *cond);

例子:

条件变量和互斥量一起同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <pthread.h>

struct msg {
struct msg *m_next;
/* ... more stuff here ... */
};

struct msg *workq;

pthread_cond_t qready = PTHREAD_COND_INITIALIZER;

pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void
process_msg(void)
{
struct msg *mp;

for (;;) {
pthread_mutex_lock(&qlock);
while (workq == NULL)
pthread_cond_wait(&qready, &qlock);
mp = workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
/* now process the message mp */
}
}

void
enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthread_cond_signal(&qready);
}

4.自旋锁

自旋锁和互斥量类似,但他不是通过休眠使进程阻塞,而是在获取锁之前一直处于忙等(自旋)阻塞状态。自旋锁适用于:持锁时间短,而且线程不希望在重新调度上花费太多的成本。

原型:

1
2
3
4
5
6
7
8
9
10
11
12
#include<pthread.h>
//初始化
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
//释放资源
int pthread_spin_destroy(pthread_spinlock_t *lock);

//加锁,对已经上锁的则结果未定义:返回EDEADLK或者永久自旋
int pthread_spin_lock(pthread_spinlock_t *lock);
//尝试加锁
int pthread_spin_trylock(pthread_spinlock_t *lock);
//解锁
int pthread_spin_unlock(pthread_spinlock_t *lock);

5.屏障

屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从这点继续执行。

原型:

1
2
3
4
5
6
7
8
#include<pthread.h>
int pthread_barrier_init(pthread_barrier_t *redtrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);

//本线程完成工作,等待其他线程完成
int pthread_barrier_wait(pthread_barrier_t *barrier);

例子:

使用8个线程分解800万个数的排序,每个线程使用堆排序,最后合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>

#define NTHR 8 /* number of threads */
#define NUMNUM 8000000L /* number of numbers to sort */
#define TNUM (NUMNUM/NTHR) /* number to sort per thread */

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t,
int (*)(const void *, const void *));
#endif

/*
* Compare two long integers (helper function for heapsort)
*/
int
complong(const void *arg1, const void *arg2)
{
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;

if (l1 == l2)
return 0;
else if (l1 < l2)
return -1;
else
return 1;
}

/*
* Worker thread to sort a portion of the set of numbers.
*/
void *
thr_fn(void *arg)
{
long idx = (long)arg;

heapsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b);

/*
* Go off and perform more work ...
*/
return((void *)0);
}

/*
* Merge the results of the individual sorted ranges.
*/
void
merge()
{
long idx[NTHR];
long i, minidx, sidx, num;

for (i = 0; i < NTHR; i++)
idx[i] = i * TNUM;
for (sidx = 0; sidx < NUMNUM; sidx++) {
num = LONG_MAX;
for (i = 0; i < NTHR; i++) {
if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}

int
main()
{
unsigned long i;
struct timeval start, end;
long long startusec, endusec;
double elapsed;
int err;
pthread_t tid;

/*
* Create the initial set of numbers to sort.
*/
srandom(1);
for (i = 0; i < NUMNUM; i++)
nums[i] = random();

/*
* Create 8 threads to sort the numbers.
*/
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR+1);
for (i = 0; i < NTHR; i++) {
err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
if (err != 0)
err_exit(err, "can't create thread");
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);

/*
* Print the sorted list.
*/
startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (double)(endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds\n", elapsed);
for (i = 0; i < NUMNUM; i++)
printf("%ld\n", snums[i]);
exit(0);
}