多进程&多线程 echo server实现

多进程&多线程 echo server实现

本文为 高性能网络编程学习 系列文章的第1篇

 

多进程版本

本文承接上文的单进程 echo server 建立一个多进程的echo server。代码的改动不多,下面贴一下完整的代码

/*************************************************************************
  > File Name: server.c
  > Author: VOID_133
  > A very simple tcp echo server illustrate use of socket
  > Ver1 only accept single connection, no multiple connection allowed
  > Mail: ################### 
  > Created Time: Wed 21 Dec 2016 04:03:25 PM HKT
 ************************************************************************/
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<string.h>
#include<sys/errno.h>

#define MAX_BUF_SIZE 1000

void usage(char *proc_name) {
    printf("%s [port]\n", proc_name);
    exit(1);
}

int main(int argc, char** argv) {
    int sockfd, clifd;

    //sockaddr_in is used to describe internet(IPV4) socket address
    struct sockaddr_in server_in_addr;
    char buf[MAX_BUF_SIZE];
    int ret = 0;
    int port = 8080;
    pid_t child_pid;

    if(argv[1] == NULL) {
        usage(argv[0]);
    }
    port = atoi(argv[1]);


    //create the socket flide
    //AF_INET is the address family for internet, SOCK_STREAM means TCP connections
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0)
    {
        perror("socket()");
        exit(-1);
    }
    //Set all bytes to zero
    bzero(&server_in_addr, sizeof(server_in_addr));
    server_in_addr.sin_family = AF_INET;

    //inet_aton convert string based address to binary data
    //Do not use inet_addr, for more info see man page
    ret = inet_aton("0.0.0.0", &server_in_addr.sin_addr);

    //You can also choose the code below to let socket
    //listen to all interface
    //code: server_in_addr.sin_addr.s_addr =  htonl(INADDR_ANY);

    //Convert unsigned short to on-wire data
    server_in_addr.sin_port = htons(port);
    if(ret < 0) {
        perror("inet_aton()");
        exit(-1);
    } 

    //Let socket bind to the server address and port
    ret = bind(sockfd, (const struct sockaddr *)&server_in_addr, sizeof(struct sockaddr_in));
    if(ret < 0) {
        perror("bind()");
        exit(-1);
    }

    //Listen for incoming connections
    ret = listen(sockfd, 50);
    if(ret < 0) {
        perror("listen()");
        exit(-1);
    }
    while(1) {
        //accept will block until a client connect to the server
        clifd = accept(sockfd, NULL, NULL);
        if(clifd < 0) {
            perror("accept()");
            exit(-1);
        }
        //Here we should fork a process to handle the connection
        child_pid = fork();
        if(child_pid < 0) {
            perror("fork()");
            exit(-1);
        }
        //parent process, continue accept connections
        if(child_pid != 0) {
            close(clifd);
            continue;
        }

        printf("Connect fd = %d\n", clifd);
        memset(buf, 0, sizeof(buf));

        //read from the client until client close/or send EOF
        while((ret = read(clifd, buf, (size_t)MAX_BUF_SIZE)) && ret != EOF) {
            if(ret < 0) {
                perror("read()");
                exit(-1);
            }
            //printf("Get data %s\n", buf);

            //write back to the client
            ret = write(clifd, buf, strlen(buf) * sizeof(char));
            if(ret < 0) {
                perror("write()");
                exit(-1);
            }
        }
        ret = close(clifd);
        if(ret) {
            perror("close()");
        }
        exit(0);
    }
    return 0;
}

首先来查看一下fork的整个流程,document_2016-12-24_17-31-43

如图,fork之后子进程会调用read/write进行IO操作,而父进程会再次转到执行accept,使得每一个client都会fork一个进程来执行IO操作。下面记录并描述一下在写这个代码的时候遇到的几个问题

在使用fork的时候可能大家也遇到过这个现象,就是自己的进程莫名其妙的变成了后台进程,比如下面这段代码:

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>

int main(void) {
    pid_t i = fork();
    if(i == 0) {
        while(1);
    }
    exit(0);
}

这段代码fork了两个进程,其中一个执行while(1)死循环在那里,另一个调用exit(0)退出,执行这段代码之后, 用ps可以查看到此进程的PPID(parent pid)变为了1。 其实这个问题很好解释,注意看 if 的判断,执行while(1)的并不是父进程,而是父进程fork出来的子进程,而父进程在子进程退出前就退出了,因而子进程变为孤儿进程,被init(pid=1)收养,所以就出现了上述的PPID=1的效果

 

第二个问题要注意的是关闭socket的操作,要在父子进程均关闭,只关闭子进程的socket对父进程同样fd的socket是没有影响的。

运行一下我们的benchmark程序,效果如下

./benchmark -S 127.0.0.1:8080 -cno 1000 -rno 1000

Opening 1000 goroutines to connect to server 127.0.0.1:8080 ...All clients quit, time used is 3.632001309s
1000 Clients successfully finished 973210 communicate finally

可以看到,此性能相比单进程的已经好了很多,不过fork进程有个很致命的问题,当进程数过多的时候,fork会失败掉;以及进程切换开销会很大,fork进程数不受我们控制,导致fork进程过多系统load变高,系统性能下降

 

多线程版本

和多进程的类似,不过有一些地方需要注意,代码如下

/*************************************************************************
  > File Name: server.c
  > Author: VOID_133
  > A very simple tcp echo server illustrate use of socket > Ver1 only accept single connection, no multiple connection allowed
  > Mail: ################### 
  > Created Time: Wed 21 Dec 2016 04:03:25 PM HKT
 ************************************************************************/
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<string.h>
#include<sys/errno.h>

#define MAX_BUF_SIZE 1000

typedef struct thread_args {
    int clifd;
} ThreadArgs;


void usage(char *proc_name) {
    printf("%s [port]\n", proc_name);
    exit(1);
}

// Handle connections per thread
void *handle_conn(void *args) {
    ThreadArgs *targs = args;
    //printf("fd = %d\n", targs->clifd);
    //printf("args = %x\n", args);
    int ret = 0;
    char buf[MAX_BUF_SIZE];
    memset(buf, 0, sizeof(buf));
    while(ret = read(targs->clifd, buf,  (size_t)MAX_BUF_SIZE) && ret != EOF) {
        //printf("Client message: %s\n", buf);
        ret = write(targs->clifd, buf, sizeof(char) * (strlen(buf) + 1));
        if(ret < 0) {
            perror("write()");
            break;
        }
        memset(buf, 0, sizeof(buf));
    }
    close(targs->clifd);
    free(args);
    return NULL;
}

int main(int argc, char** argv) {
    int sockfd, clifd;

    //sockaddr_in is used to describe internet(IPV4) socket address
    struct sockaddr_in server_in_addr;
    int ret = 0;
    int port = 8080;
    ThreadArgs *targs = NULL;
    pid_t child_pid;

    if(argv[1] == NULL) {
        usage(argv[0]);
    }
    port = atoi(argv[1]);


    //create the socket flide
    //AF_INET is the address family for internet, SOCK_STREAM means TCP connections
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0)
    {
        perror("socket()");
        exit(-1);
    }
    //Set all bytes to zero
    bzero(&server_in_addr, sizeof(server_in_addr));
    server_in_addr.sin_family = AF_INET;

    //inet_aton convert string based address to binary data
    //Do not use inet_addr, for more info see man page
    ret = inet_aton("0.0.0.0", &server_in_addr.sin_addr);

    //You can also choose the code below to let socket
    //listen to all interface
    //code: server_in_addr.sin_addr.s_addr =  htonl(INADDR_ANY);

    //Convert unsigned short to on-wire data
    server_in_addr.sin_port = htons(port);
    if(ret < 0) {
        perror("inet_aton()");
        exit(-1);
    } 

    //Let socket bind to the server address and port
    ret = bind(sockfd, (const struct sockaddr *)&server_in_addr, sizeof(struct sockaddr_in));
    if(ret < 0) {
        perror("bind()");
        exit(-1);
    }

    //Listen for incoming connections
    ret = listen(sockfd, 50);
    if(ret < 0) {
        perror("listen()");
        exit(-1);
    }
    // Initialize thread_list variable
    int thread_cnt = 0;
    pthread_t *thread_list = NULL;
    while(1) {
        //accept will block until a client connect to the server
        //Use pthread_create to create thread for handling connections
        clifd = accept(sockfd, NULL, NULL);
        if (clifd < 0) {
            perror("accept()");
            exit(-1);
        }
        printf("Create thread with fd = %d\n", clifd);
        pthread_attr_t attr;
        pthread_attr_init(&attr);
        thread_cnt++;
        targs = (ThreadArgs *)malloc(sizeof(ThreadArgs));
        targs->clifd = clifd;
        thread_list = (pthread_t *)realloc((void *)thread_list, thread_cnt * sizeof(pthread_t));
        ret = pthread_create(&thread_list[thread_cnt - 1], &attr, handle_conn, (void *)targs);
        if(ret < 0) {
            perror("pthread_create()");
            exit(-1);
        }
    }
    return 0;
}

相对于多进程版, 我们把子进程交互IO的逻辑拿到了handle_conn这个函数里进行处理,通过pthread_create创建线程,这里我们需要为每一个线程分配pthread_t结构,pthread_create创建的进程所拿到的函数参数是由(void *)指向的地址传递的。

在编写代码的时候遇到这样几个问题,

write error bad file descriptor
return code 141
fd is not incremental from 3 to 53 when benchmark open 50 connections

 

其实都是因为同一个原因导致的。下面把出问题的代码片段post一下

while(1) {
        //accept will block until a client connect to the server
        //Use pthread_create to create thread for handling connections
        clifd = accept(sockfd, NULL, NULL);
        if (clifd < 0) {
            perror("accept()");
            exit(-1);
        }
        printf("Create thread with fd = %d\n", clifd);
        pthread_attr_t attr;
        pthread_attr_init(&attr);
        thread_cnt++;
        //-- CORRECT: targs = (ThreadArgs *)malloc(sizeof(ThreadArgs));
        targs.clifd = clifd; // WRONG
        //-- CORRECT: targs->clifd = clifd;
        thread_list = (pthread_t *)realloc((void *)thread_list, thread_cnt * sizeof(pthread_t)); //WRONG
        //-- CORRECT: ret = pthread_create(&thread_list[thread_cnt - 1], &attr, handle_conn, (void *)targs);
        ret = pthread_create(&thread_list[thread_cnt - 1], &attr, handle_conn, (void *)&targs);
        if(ret < 0) {
            perror("pthread_create()");
            exit(-1);
        }
    }

 

上面的代码就是把targs作为一个Stack上的变量,每一次循环不改变targs的地址,只改变targs的值。而多线程之间是共享同一个内存空间的,因而,每一次改变的都是同一个targs,所以不同线程的handl_conn看到的targs都是同一个,这不是我们期望要的效果,这里和多进程就不一样了,如果是多进程改变一个同名的变量,是通过COW技术修改同名变量的副本,而不是直接对父进程上的同一个变量进行修改。

 

运行上述正确的代码的benchmark如下

./benchmark -S 127.0.0.1:8080 -cno 1000 -rno 1000
Opening 1000 goroutines to connect to server 127.0.0.1:8080 ...All clients quit, time used is 3.42811072s
1000 Clients successfully finished 984447 communicate finally

 

Leave a Reply

Your email address will not be published. Required fields are marked *

seven + eighteen =

This site uses Akismet to reduce spam. Learn how your comment data is processed.