服务器端程序设计范式

服务器端程序设计范式

1,迭代服务器

完全处理完某个客户程序才进行下一个客户

1
2
3
4
5
6
7
8
9
for ( ; ; ) {
connfd = Accept(listenfd, (SA *) NULL, NULL);

ticks = time(NULL);
snprintf(buff, sizeof(buff), "%.24s\r\n", ctime(&ticks));
Write(connfd, buff, strlen(buff));

Close(connfd);
}

2,并发服务器

2.1,为每个客户请求fork一个进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for ( ; ; ) {
clilen = addrlen;
if ( (connfd = accept(listenfd, cliaddr, &clilen)) < 0) {
if (errno == EINTR)
continue; /* back to for() */
else
err_sys("accept error");
}

if ( (childpid = Fork()) == 0) { /* child process */
Close(listenfd); /* close listening socket */
web_child(connfd); /* process request */
exit(0);
}
Close(connfd); /* parent closes connected socket */
}
}
2.2,为每个客户请求创建一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);

Pthread_create(&tid, NULL, &doit, (void *) connfd);
}
}

void *
doit(void *arg)
{
void web_child(int);

Pthread_detach(pthread_self());
web_child((int) arg);
Close((int) arg);
return(NULL);
}

3,预先派生子进程

3.1,每个子进程调用accept,无上锁

注意惊群效应:一个客户请求会唤醒所有子进程

多个进程在引用同一个监听套接字的描述符上调用accept,只适用于Berkeley的内核,要其他内核就用上锁保护accept,使其每次只能一个进程调用accept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/* include serv02 */
#include "unp.h"

static int nchildren;
static pid_t *pids;

int
main(int argc, char **argv)
{
int listenfd, i;
socklen_t addrlen;
void sig_int(int);
pid_t child_make(int, int, int);

if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv02 [ <host> ] <port#> <#children>");
nchildren = atoi(argv[argc-1]);
pids = Calloc(nchildren, sizeof(pid_t));

for (i = 0; i < nchildren; i++)
pids[i] = child_make(i, listenfd, addrlen); /* parent returns */

Signal(SIGINT, sig_int);

for ( ; ; )
pause(); /* everything done by children */
}
/* end serv02 */

/* include sigint */
void
sig_int(int signo)
{
int i;
void pr_cpu_time(void);

/* 4terminate all children */
for (i = 0; i < nchildren; i++)
kill(pids[i], SIGTERM);
while (wait(NULL) > 0) /* wait for all children */
;
if (errno != ECHILD)
err_sys("wait error");

pr_cpu_time();
exit(0);
}
/* end sigint */

pid_t
child_make(int i, int listenfd, int addrlen)
{
pid_t pid;
void child_main(int, int, int);

if ( (pid = Fork()) > 0)
return(pid); /* parent */

child_main(i, listenfd, addrlen); /* never returns */
}
/* end child_make */

/* include child_main */
void
child_main(int i, int listenfd, int addrlen)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr;

cliaddr = Malloc(addrlen);

printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);

web_child(connfd); /* process the request */
Close(connfd);
}
}
/* end child_main */
3.2,文件上锁保护accept
1
见书P659
3.3,线程互斥锁保护accept
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include	"unp.h"

static int nchildren;
static pid_t *pids;

int
main(int argc, char **argv)
{
int listenfd, i;
socklen_t addrlen;
void sig_int(int);
pid_t child_make(int, int, int);

if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv04 [ <host> ] <port#> <#children>");
nchildren = atoi(argv[argc-1]);
pids = Calloc(nchildren, sizeof(pid_t));
///////////////////////////////////////////////
my_lock_init(NULL);///比上个版本添加这行
/////////////////////////////////////////////
for (i = 0; i < nchildren; i++)
pids[i] = child_make(i, listenfd, addrlen); /* parent returns */

Signal(SIGINT, sig_int);

for ( ; ; )
pause(); /* everything done by children */
}

pid_t
child_make(int i, int listenfd, int addrlen)
{...}

void
child_main(int i, int listenfd, int addrlen)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr;

cliaddr = Malloc(addrlen);

printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
clilen = addrlen;
////////////////////////////////////////////////////
my_lock_wait();
connfd = Accept(listenfd, cliaddr, &clilen);
my_lock_release();
//////////////////////////////////////////////////////

web_child(connfd); /* process the request */
Close(connfd);
}
}



/* include my_lock_init */
#include "unpthread.h"
#include <sys/mman.h>

static pthread_mutex_t *mptr; /* actual mutex will be in shared memory */

void
my_lock_init(char *pathname)
{
int fd;
pthread_mutexattr_t mattr;

fd = Open("/dev/zero", O_RDWR, 0);

mptr = Mmap(0, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
Close(fd);

Pthread_mutexattr_init(&mattr);
Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
Pthread_mutex_init(mptr, &mattr);
}
/* end my_lock_init */

/* include my_lock_wait */
void
my_lock_wait()
{
Pthread_mutex_lock(mptr);
}

void
my_lock_release()
{
Pthread_mutex_unlock(mptr);
}
/* end my_lock_wait */
3.4,父进程向子进程传递套接字描述符

绕过为所有子进程的accept调用提供上锁保护的可能需求,不过需要从父进程到子进程的某种形式的描述符传递

1
见书P663

4,预先创建线程

4.1,以互斥锁上锁的方式保护accept
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/* include serv07 */
#include "unpthread.h"
#include "pthread07.h"

pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;

int
main(int argc, char **argv)
{
int i;
void sig_int(int), thread_make(int);

if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv07 [ <host> ] <port#> <#threads>");
nthreads = atoi(argv[argc-1]);
tptr = Calloc(nthreads, sizeof(Thread));

for (i = 0; i < nthreads; i++)
thread_make(i); /* only main thread returns */

Signal(SIGINT, sig_int);

for ( ; ; )
pause(); /* everything done by threads */
}
/* end serv07 */

void
sig_int(int signo)
{
int i;
void pr_cpu_time(void);

pr_cpu_time();

for (i = 0; i < nthreads; i++)
printf("thread %d, %ld connections\n", i, tptr[i].thread_count);

exit(0);
}

/////////////////////////////////////////////////////////////////

typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* # connections handled */
} Thread;
Thread *tptr; /* array of Thread structures; calloc'ed */

int listenfd, nthreads;
socklen_t addrlen;
pthread_mutex_t mlock;


///////////////////////////////////////////////////////////////////
#include "unpthread.h"
#include "pthread07.h"
void
thread_make(int i)
{
void *thread_main(void *);

Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
return; /* main thread returns */
}

void *
thread_main(void *arg)
{
int connfd;
void web_child(int);
socklen_t clilen;
struct sockaddr *cliaddr;

cliaddr = Malloc(addrlen);

printf("thread %d starting\n", (int) arg);
for ( ; ; ) {
clilen = addrlen;
Pthread_mutex_lock(&mlock);
connfd = Accept(listenfd, cliaddr, &clilen);
Pthread_mutex_unlock(&mlock);
tptr[(int) arg].thread_count++;

web_child(connfd); /* process request */
Close(connfd);
}
}
4.2,由主线程调用accept

在程序启动阶段创建一个线程池之后只让主线程调用accept并把每个客户连接传递给池中某个可用线程,这一点类似描述符传递的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/* include serv08 */
#include "unpthread.h"
#include "pthread08.h"

static int nthreads;
pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clifd_cond = PTHREAD_COND_INITIALIZER;

int
main(int argc, char **argv)
{
int i, listenfd, connfd;
void sig_int(int), thread_make(int);
socklen_t addrlen, clilen;
struct sockaddr *cliaddr;

if (argc == 3)
listenfd = Tcp_listen(NULL, argv[1], &addrlen);
else if (argc == 4)
listenfd = Tcp_listen(argv[1], argv[2], &addrlen);
else
err_quit("usage: serv08 [ <host> ] <port#> <#threads>");
cliaddr = Malloc(addrlen);

nthreads = atoi(argv[argc-1]);
tptr = Calloc(nthreads, sizeof(Thread));
iget = iput = 0;

/* 4create all the threads */
for (i = 0; i < nthreads; i++)
thread_make(i); /* only main thread returns */

Signal(SIGINT, sig_int);

for ( ; ; ) {
clilen = addrlen;
connfd = Accept(listenfd, cliaddr, &clilen);

Pthread_mutex_lock(&clifd_mutex);
clifd[iput] = connfd;
if (++iput == MAXNCLI)
iput = 0;
if (iput == iget)
err_quit("iput = iget = %d", iput);
Pthread_cond_signal(&clifd_cond);
Pthread_mutex_unlock(&clifd_mutex);
}
}
/* end serv08 */

void
sig_int(int signo)
{
int i;
void pr_cpu_time(void);

pr_cpu_time();

for (i = 0; i < nthreads; i++)
printf("thread %d, %ld connections\n", i, tptr[i].thread_count);

exit(0);
}

typedef struct {
pthread_t thread_tid; /* thread ID */
long thread_count; /* # connections handled */
} Thread;
Thread *tptr; /* array of Thread structures; calloc'ed */

#define MAXNCLI 32
int clifd[MAXNCLI], iget, iput;
pthread_mutex_t clifd_mutex;
pthread_cond_t clifd_cond;


#include "unpthread.h"
#include "pthread08.h"

void
thread_make(int i)
{
void *thread_main(void *);

Pthread_create(&tptr[i].thread_tid, NULL, &thread_main, (void *) i);
return; /* main thread returns */
}

void *
thread_main(void *arg)
{
int connfd;
void web_child(int);

printf("thread %d starting\n", (int) arg);
for ( ; ; ) {
Pthread_mutex_lock(&clifd_mutex);
while (iget == iput)
Pthread_cond_wait(&clifd_cond, &clifd_mutex);
connfd = clifd[iget]; /* connected socket to service */
if (++iget == MAXNCLI)
iget = 0;
Pthread_mutex_unlock(&clifd_mutex);
tptr[(int) arg].thread_count++;

web_child(connfd); /* process request */
Close(connfd);
}
}

总结:

1,当系统负载较轻时,每来一个客户请求现场派生一个子进程为之服务的传统并发服务器程序模型就够了。这个模型可以与inetd结合使用,也就是inetd处理每个连接的接受。接下来的意见是针对重负荷服务器而言的。

2,相对传统的每个客户fork一次设计范式,预先创建一个子进程池或一个线程池的设计范式能够把进程控制CPU时间降低10倍以上。上面的例子还可以优化:监视闲置子进程的个数,随着服务客户数的动态变化而增加或减少。

3,某些实现允许多个子进程或线程阻塞在同一个accept调用中,另一些实现却要求包绕accept调用设置某种类型的锁加以保护。文件上锁或Pthread互斥锁都可以使用。

4,让所有子进程或线程自行调用accept通常比让父进程或主线程独自调用accept并把描述符传递给子进程或线程来的简单快捷。

5,由于潜在select冲突的原因,让所有子进程或线程阻塞在同一个accept调用中比让他们阻塞在同一个select调用中更可取。

6,使用线程比通常比使用进程快。不过还是取决于系统,比如accept客户连接的服务器调用fork和exec,那么fork一个单线程的进程比fork一个多线程的进程快。