亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

使用libevent進行多線程socket編程demo

系統(tǒng) 2389 0

最近要對一個用libevent寫的C/C++項目進行修改,要改成多線程的,故做了一些學習和研究。

libevent是一個用C語言寫的開源的一個庫。它對socket編程里的epoll/select等功能進行了封裝,并且使用了一些設(shè)計模式(比如反應堆模式),用事件機制來簡化了socket編程。libevent的好處網(wǎng)上有很多,但是初學者往往都看不懂。我打個比方吧, 1) 假設(shè)有N個客戶端同時往服務端通過socket寫數(shù)據(jù),用了libevent之后,你的server程序里就不用再使用epoll或是select來判斷都哪些socket的緩沖區(qū)里已經(jīng)收到了客戶端寫來的數(shù)據(jù)。當某個socket的緩沖區(qū)里有可讀數(shù)據(jù)時,libevent會自動觸發(fā)一個“讀事件”,通過這個“讀事件”來調(diào)用相應的代碼來讀取socket緩沖區(qū)里的數(shù)據(jù)即可。換句話說,libevent自己調(diào)用select()或是epoll的函數(shù)來判斷哪個緩沖區(qū)可讀了,只要可讀了,就自動調(diào)用相應的處理程序。 2) 對于“寫事件”,libevent會監(jiān)控某個socket的緩沖區(qū)是否可寫(一般情況下,只要緩沖區(qū)沒滿就可寫),只要可寫,就會觸發(fā)“寫事件”,通過“寫事件”來調(diào)用相應的函數(shù),將數(shù)據(jù)寫到socket里。

以上兩個例子分別從“讀”和“寫”兩方面簡介了一下,可能不十分準確(但十分準確的描述往往會讓人看不懂)。

以下兩個鏈接關(guān)于libevent的剖析比較詳細,想學習libevent最好看一下。

  1) sparkliang的專欄 ? ? ? ? 2) 魚思故淵的專欄

=========關(guān)于libevent使用多線程的討論=========================

網(wǎng)上很多資料說libevent不支持多線程,也有很多人說libevent可以支持多線程。究竟值不支持呢?我的答案是: 得看你的多線程是怎么寫的,如何跟libevent結(jié)合的。

1)可以肯定的是,libevent的 信號事件 是不支持多線程的(因為源碼里用了個全局變量)。可以看這篇文章(http://blog.csdn.net/sparkliang/article/details/5306809)。(注:libevent里有“超時事件”,“IO事件”,“信號事件”。)

2)對于不同的線程,使用不同的base,是可以的。

3)如果不同的線程使用相同的base呢?——如果在不同的線程里的事件都注冊到同一個base上,會有問題嗎?

  (http://www.cnblogs.com/zzyoucan/p/3970578.html)這篇博客里提到說,不行!即使加鎖也不行。我最近稍微看了部分源碼,我的答案是:不加鎖會有并發(fā)問題,但如果對每個event_add(),event_del()等這些操作event的動作都用同一個臨界變量來加鎖,應該是沒問題的。——貌似也有點問題,如果某個事件沒有用event_set()設(shè)置為EV_PERSIST,當事件發(fā)生時,會被自動刪除。有可能線程a在刪除事件的時候,線程b卻在添加事件,這樣還是會出現(xiàn)并發(fā)問題。 最后的結(jié)論是——不行!

========本次實驗代碼邏輯的說明==========================

我采取的方案是對于不同的線程,使用不同的base。——即每個線程對應一個base,將線程里的事件注冊到線程的base上,而不是所有線程里的事件都用同一個base。

一 實驗需求描述:

  1)寫一個client和server程序。多個client可以同時連接一個server;

  2)client接收用戶在標準輸入的字符,發(fā)往server端;

  3)server端收到后,再把收到的數(shù)據(jù)處理一下,返回給client;

  4)client收到server返回的數(shù)據(jù)后,將其打印在終端上。

二 設(shè)計方案:

1. client:

  1)  client采用兩個線程,主線程接收用戶在終端上的輸入,并通過socket將用戶的輸入發(fā)往server。

  2)  派生一個子線程,接收server返回來的數(shù)據(jù),如果收到數(shù)據(jù),就打印出來。

2. server:

  在主線程里監(jiān)聽client有沒有連接連過來,如果有,立馬accept出一個socket,并創(chuàng)建一個子線程,在子線程里接收client傳過來的數(shù)據(jù),并對數(shù)據(jù)進行一些修改,然后將修改后的數(shù)據(jù)寫回到client端。

三 代碼實現(xiàn)

1. client代碼如下:

        
            1
        
         #include <iostream>


        
            2
        
         #include <sys/
        
          select
        
        .h>


        
            3
        
         #include <sys/socket.h>


        
            4
        
         #include <unistd.h>


        
            5
        
         #include <pthread.h>


        
            6
        
         #include <stdio.h>


        
            7
        
         #include <stdlib.h>


        
            8
        
         #include <sys/types.h>


        
            9
        
         #include <netinet/
        
          in
        
        .h>


        
           10
        
         #include <arpa/inet.h>


        
           11
        
         #include <
        
          string
        
        >


        
           12
        
         #include <
        
          string
        
        .h>


        
           13
        
         #include <
        
          event
        
        .h>


        
           14
        
        
          using
        
        
          namespace
        
        
           std;


        
        
           15
        
        
           16
        
        
          #define
        
         BUF_SIZE 1024


        
           17
        
        
           18
        
        
          /*
        
        
          *


        
        
           19
        
        
           * 連接到server端,如果成功,返回fd,如果失敗返回-1


        
        
           20
        
        
          */
        
        
           21
        
        
          int
        
         connectServer(
        
          char
        
        * ip, 
        
          int
        
        
           port){


        
        
           22
        
        
          int
        
         fd = socket( AF_INET, SOCK_STREAM, 
        
          0
        
        
           );


        
        
           23
        
             cout<<
        
          "
        
        
          fd= 
        
        
          "
        
        <<fd<<
        
          endl;


        
        
           24
        
        
          if
        
        (-
        
          1
        
         ==
        
           fd){


        
        
           25
        
                 cout<<
        
          "
        
        
          Error, connectServer() quit
        
        
          "
        
        <<
        
          endl;


        
        
           26
        
        
          return
        
         -
        
          1
        
        
          ;


        
        
           27
        
        
              }


        
        
           28
        
        
          struct
        
         sockaddr_in remote_addr; 
        
          //
        
        
          服務器端網(wǎng)絡(luò)地址結(jié)構(gòu)體
        
        
           29
        
             memset(&remote_addr,
        
          0
        
        ,
        
          sizeof
        
        (remote_addr)); 
        
          //
        
        
          數(shù)據(jù)初始化--清零
        
        
           30
        
             remote_addr.sin_family=AF_INET; 
        
          //
        
        
          設(shè)置為IP通信
        
        
           31
        
             remote_addr.sin_addr.s_addr=inet_addr(ip);
        
          //
        
        
          服務器IP地址
        
        
           32
        
             remote_addr.sin_port=htons(port); 
        
          //
        
        
          服務器端口號
        
        
           33
        
        
          int
        
         con_result = connect(fd, (
        
          struct
        
         sockaddr*) &remote_addr, 
        
          sizeof
        
        (
        
          struct
        
        
           sockaddr));


        
        
           34
        
        
          if
        
        (con_result < 
        
          0
        
        
          ){


        
        
           35
        
                 cout<<
        
          "
        
        
          Connect Error!
        
        
          "
        
        <<
        
          endl;


        
        
           36
        
        
                  close(fd);


        
        
           37
        
        
          return
        
         -
        
          1
        
        
          ;


        
        
           38
        
        
              }


        
        
           39
        
             cout<<
        
          "
        
        
          con_result=
        
        
          "
        
        <<con_result<<
        
          endl;


        
        
           40
        
        
          return
        
        
           fd;


        
        
           41
        
        
          }


        
        
           42
        
        
           43
        
        
          void
        
         on_read(
        
          int
        
         sock, 
        
          short
        
        
          event
        
        , 
        
          void
        
        *
        
           arg)


        
        
           44
        
        
          {


        
        
           45
        
        
          char
        
        * buffer = 
        
          new
        
        
          char
        
        
          [BUF_SIZE];


        
        
           46
        
             memset(buffer, 
        
          0
        
        , 
        
          sizeof
        
        (
        
          char
        
        )*
        
          BUF_SIZE);


        
        
           47
        
        
          //
        
        
          --本來應該用while一直循環(huán),但由于用了libevent,只在可以讀的時候才觸發(fā)on_read(),故不必用while了
        
        
           48
        
        
          int
        
         size =
        
           read(sock, buffer, BUF_SIZE);


        
        
           49
        
        
          if
        
        (
        
          0
        
         == size){
        
          //
        
        
          說明socket關(guān)閉
        
        
           50
        
                 cout<<
        
          "
        
        
          read size is 0 for socket:
        
        
          "
        
        <<sock<<
        
          endl;


        
        
           51
        
        
          struct
        
        
          event
        
        * read_ev = (
        
          struct
        
        
          event
        
        *
        
          )arg;


        
        
           52
        
        
          if
        
        (NULL !=
        
           read_ev){


        
        
           53
        
        
                      event_del(read_ev);


        
        
           54
        
        
                      free(read_ev);


        
        
           55
        
        
                  }


        
        
           56
        
        
                  close(sock);


        
        
           57
        
        
          return
        
        
          ;


        
        
           58
        
        
              }


        
        
           59
        
             cout<<
        
          "
        
        
          Received from server---
        
        
          "
        
        <<buffer<<
        
          endl;


        
        
           60
        
        
              delete[]buffer;


        
        
           61
        
        
          }


        
        
           62
        
        
           63
        
        
          void
        
        * init_read_event(
        
          void
        
        *
        
           arg){


        
        
           64
        
        
          long
        
         long_sock = (
        
          long
        
        
          )arg;


        
        
           65
        
        
          int
        
         sock = (
        
          int
        
        
          )long_sock;


        
        
           66
        
        
          //
        
        
          -----初始化libevent,設(shè)置回調(diào)函數(shù)on_read()------------
        
        
           67
        
        
          struct
        
         event_base* 
        
          base
        
         =
        
           event_base_new();


        
        
           68
        
        
          struct
        
        
          event
        
        * read_ev = (
        
          struct
        
        
          event
        
        *)malloc(
        
          sizeof
        
        (
        
          struct
        
        
          event
        
        ));
        
          //
        
        
          發(fā)生讀事件后,從socket中取出數(shù)據(jù)
        
        
           69
        
             event_set(read_ev, sock, EV_READ|
        
          EV_PERSIST, on_read, read_ev);


        
        
           70
        
             event_base_set(
        
          base
        
        
          , read_ev);


        
        
           71
        
        
              event_add(read_ev, NULL);


        
        
           72
        
             event_base_dispatch(
        
          base
        
        
          );


        
        
           73
        
        
          //
        
        
          --------------
        
        
           74
        
             event_base_free(
        
          base
        
        
          );


        
        
           75
        
        
          }


        
        
           76
        
        
          /*
        
        
          *


        
        
           77
        
        
           * 創(chuàng)建一個新線程,在新線程里初始化libevent讀事件的相關(guān)設(shè)置,并開啟event_base_dispatch


        
        
           78
        
        
          */
        
        
           79
        
        
          void
        
         init_read_event_thread(
        
          int
        
        
           sock){


        
        
           80
        
        
              pthread_t thread;


        
        
           81
        
             pthread_create(&thread,NULL,init_read_event,(
        
          void
        
        *
        
          )sock);


        
        
           82
        
        
              pthread_detach(thread);


        
        
           83
        
        
          }


        
        
           84
        
        
          int
        
        
           main() {


        
        
           85
        
             cout << 
        
          "
        
        
          main started
        
        
          "
        
         << endl; 
        
          //
        
        
           prints Hello World!!!
        
        
           86
        
             cout << 
        
          "
        
        
          Please input server IP:
        
        
          "
        
        <<
        
          endl;


        
        
           87
        
        
          char
        
         ip[
        
          16
        
        
          ];


        
        
           88
        
             cin >>
        
           ip;


        
        
           89
        
             cout << 
        
          "
        
        
          Please input port:
        
        
          "
        
        <<
        
          endl;


        
        
           90
        
        
          int
        
        
           port;


        
        
           91
        
             cin >>
        
           port;


        
        
           92
        
             cout << 
        
          "
        
        
          ServerIP is 
        
        
          "
        
        <<ip<<
        
          "
        
        
           ,port=
        
        
          "
        
        <<port<<
        
          endl;


        
        
           93
        
        
          int
        
         socket_fd =
        
           connectServer(ip, port);


        
        
           94
        
             cout << 
        
          "
        
        
          socket_fd=
        
        
          "
        
        <<socket_fd<<
        
          endl;


        
        
           95
        
        
              init_read_event_thread(socket_fd);


        
        
           96
        
        
          //
        
        
          --------------------------
        
        
           97
        
        
          char
        
        
           buffer[BUF_SIZE];


        
        
           98
        
        
          bool
        
         isBreak = 
        
          false
        
        
          ;


        
        
           99
        
        
          while
        
        (!
        
          isBreak){


        
        
          100
        
                 cout << 
        
          "
        
        
          Input your data to server(\'q\' or \"quit\" to exit)
        
        
          "
        
        <<
        
          endl;


        
        
          101
        
                 cin >>
        
           buffer;


        
        
          102
        
        
          if
        
        (strcmp(
        
          "
        
        
          q
        
        
          "
        
        , buffer)==
        
          0
        
         || strcmp(
        
          "
        
        
          quit
        
        
          "
        
        , buffer)==
        
          0
        
        
          ){


        
        
          103
        
                     isBreak=
        
          true
        
        
          ;


        
        
          104
        
        
                      close(socket_fd);


        
        
          105
        
        
          break
        
        
          ;


        
        
          106
        
        
                  }


        
        
          107
        
                 cout << 
        
          "
        
        
          Your input is 
        
        
          "
        
        <<buffer<<
        
          endl;


        
        
          108
        
        
          int
        
         write_num =
        
           write(socket_fd, buffer, strlen(buffer));


        
        
          109
        
                 cout << write_num <<
        
          "
        
        
           characters written
        
        
          "
        
        <<
        
          endl;


        
        
          110
        
                 sleep(
        
          2
        
        
          );


        
        
          111
        
        
              }


        
        
          112
        
             cout<<
        
          "
        
        
          main finished
        
        
          "
        
        <<
        
          endl;


        
        
          113
        
        
          return
        
        
          0
        
        
          ;


        
        
          114
        
         }
      
client端的代碼

  1)在main()里先調(diào)用init_read_event_thread()來生成一個子線程,子線程里調(diào)用init_read_event()來將socket的讀事件注冊到libevent的base上,并調(diào)用libevent的event_base_dispatch()不斷地進行輪詢。一旦socket可讀,libevent就調(diào)用“讀事件”上綁定的on_read()函數(shù)來讀取數(shù)據(jù)。

  2)在main()的主線程里,通過一個while循環(huán)來接收用戶從終端的輸入,并通過socket將用戶的輸入寫到server端。

-------------------------------------------------------------

2. server端代碼如下:

        
            1
        
         #include <iostream>


        
            2
        
         #include <sys/
        
          select
        
        .h>


        
            3
        
         #include <sys/socket.h>


        
            4
        
         #include <stdio.h>


        
            5
        
         #include <unistd.h>


        
            6
        
         #include <pthread.h>


        
            7
        
         #include <stdio.h>


        
            8
        
         #include <sys/types.h>


        
            9
        
         #include <netinet/
        
          in
        
        .h>


        
           10
        
         #include <arpa/inet.h>


        
           11
        
         #include <
        
          string
        
        >


        
           12
        
         #include <
        
          string
        
        .h>


        
           13
        
         #include <
        
          event
        
        .h>


        
           14
        
         #include <stdlib.h>


        
           15
        
        
          using
        
        
          namespace
        
        
           std;


        
        
           16
        
        
           17
        
        
          #define
        
         SERVER_IP "127.0.0.1"


        
           18
        
        
          #define
        
         SERVER_PORT 9090


        
           19
        
        
          #define
        
         BUF_SIZE 1024


        
           20
        
        
           21
        
        
          struct
        
         sock_ev_write{
        
          //
        
        
          用戶寫事件完成后的銷毀,在on_write()中執(zhí)行
        
        
           22
        
        
          struct
        
        
          event
        
        *
        
           write_ev;


        
        
           23
        
        
          char
        
        *
        
           buffer;


        
        
           24
        
        
          };


        
        
           25
        
        
          struct
        
         sock_ev {
        
          //
        
        
          用于讀事件終止(socket斷開)后的銷毀
        
        
           26
        
        
          struct
        
         event_base* 
        
          base
        
        ;
        
          //
        
        
          因為socket斷掉后,讀事件的loop要終止,所以要有base指針
        
        
           27
        
        
          struct
        
        
          event
        
        *
        
           read_ev;


        
        
           28
        
        
          };


        
        
           29
        
        
           30
        
        
          /*
        
        
          *


        
        
           31
        
        
           * 銷毀寫事件用到的結(jié)構(gòu)體


        
        
           32
        
        
          */
        
        
           33
        
        
          void
        
         destroy_sock_ev_write(
        
          struct
        
         sock_ev_write*
        
           sock_ev_write_struct){


        
        
           34
        
        
          if
        
        (NULL !=
        
           sock_ev_write_struct){


        
        
           35
        
        
          //
        
        
                  event_del(sock_ev_write_struct->write_ev);
        
        
          //
        
        
          因為寫事件沒用EV_PERSIST,故不用event_del
        
        
           36
        
        
          if
        
        (NULL != sock_ev_write_struct->
        
          write_ev){


        
        
           37
        
                     free(sock_ev_write_struct->
        
          write_ev);


        
        
           38
        
        
                  }


        
        
           39
        
        
          if
        
        (NULL != sock_ev_write_struct->
        
          buffer){


        
        
           40
        
                     delete[]sock_ev_write_struct->
        
          buffer;


        
        
           41
        
        
                  }


        
        
           42
        
        
                  free(sock_ev_write_struct);


        
        
           43
        
        
              }


        
        
           44
        
        
          }


        
        
           45
        
        
           46
        
        
           47
        
        
          /*
        
        
          *


        
        
           48
        
        
           * 讀事件結(jié)束后,用于銷毀相應的資源


        
        
           49
        
        
          */
        
        
           50
        
        
          void
        
         destroy_sock_ev(
        
          struct
        
         sock_ev*
        
           sock_ev_struct){


        
        
           51
        
        
          if
        
        (NULL ==
        
           sock_ev_struct){


        
        
           52
        
        
          return
        
        
          ;


        
        
           53
        
        
              }


        
        
           54
        
             event_del(sock_ev_struct->
        
          read_ev);


        
        
           55
        
             event_base_loopexit(sock_ev_struct->
        
          base
        
        , NULL);
        
          //
        
        
          停止loop循環(huán)
        
        
           56
        
        
          if
        
        (NULL != sock_ev_struct->
        
          read_ev){


        
        
           57
        
                 free(sock_ev_struct->
        
          read_ev);


        
        
           58
        
        
              }


        
        
           59
        
             event_base_free(sock_ev_struct->
        
          base
        
        
          );


        
        
           60
        
        
          //
        
        
              destroy_sock_ev_write(sock_ev_struct->sock_ev_write_struct);
        
        
           61
        
        
              free(sock_ev_struct);


        
        
           62
        
        
          }


        
        
           63
        
        
          int
        
        
           getSocket(){


        
        
           64
        
        
          int
        
         fd =socket( AF_INET, SOCK_STREAM, 
        
          0
        
        
           );


        
        
           65
        
        
          if
        
        (-
        
          1
        
         ==
        
           fd){


        
        
           66
        
                 cout<<
        
          "
        
        
          Error, fd is -1
        
        
          "
        
        <<
        
          endl;


        
        
           67
        
        
              }


        
        
           68
        
        
          return
        
        
           fd;


        
        
           69
        
        
          }


        
        
           70
        
        
           71
        
        
          void
        
         on_write(
        
          int
        
         sock, 
        
          short
        
        
          event
        
        , 
        
          void
        
        *
        
           arg)


        
        
           72
        
        
          {


        
        
           73
        
             cout<<
        
          "
        
        
          on_write() called, sock=
        
        
          "
        
        <<sock<<
        
          endl;


        
        
           74
        
        
          if
        
        (NULL ==
        
           arg){


        
        
           75
        
                 cout<<
        
          "
        
        
          Error! void* arg is NULL in on_write()
        
        
          "
        
        <<
        
          endl;


        
        
           76
        
        
          return
        
        
          ;


        
        
           77
        
        
              }


        
        
           78
        
        
          struct
        
         sock_ev_write* sock_ev_write_struct = (
        
          struct
        
         sock_ev_write*
        
          )arg;


        
        
           79
        
        
           80
        
        
          char
        
        
           buffer[BUF_SIZE];


        
        
           81
        
             sprintf(buffer, 
        
          "
        
        
          fd=%d, received[%s]
        
        
          "
        
        , sock, sock_ev_write_struct->
        
          buffer);


        
        
           82
        
        
          //
        
        
              int write_num0 = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));


        
        
           83
        
        
          //
        
        
              int write_num = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
        
        
           84
        
        
          int
        
         write_num =
        
           write(sock, buffer, strlen(buffer));


        
        
           85
        
        
              destroy_sock_ev_write(sock_ev_write_struct);


        
        
           86
        
             cout<<
        
          "
        
        
          on_write() finished, sock=
        
        
          "
        
        <<sock<<
        
          endl;


        
        
           87
        
        
          }


        
        
           88
        
        
           89
        
        
          void
        
         on_read(
        
          int
        
         sock, 
        
          short
        
        
          event
        
        , 
        
          void
        
        *
        
           arg)


        
        
           90
        
        
          {


        
        
           91
        
             cout<<
        
          "
        
        
          on_read() called, sock=
        
        
          "
        
        <<sock<<
        
          endl;


        
        
           92
        
        
          if
        
        (NULL ==
        
           arg){


        
        
           93
        
        
          return
        
        
          ;


        
        
           94
        
        
              }


        
        
           95
        
        
          struct
        
         sock_ev* event_struct = (
        
          struct
        
         sock_ev*) arg;
        
          //
        
        
          獲取傳進來的參數(shù)
        
        
           96
        
        
          char
        
        * buffer = 
        
          new
        
        
          char
        
        
          [BUF_SIZE];


        
        
           97
        
             memset(buffer, 
        
          0
        
        , 
        
          sizeof
        
        (
        
          char
        
        )*
        
          BUF_SIZE);


        
        
           98
        
        
          //
        
        
          --本來應該用while一直循環(huán),但由于用了libevent,只在可以讀的時候才觸發(fā)on_read(),故不必用while了
        
        
           99
        
        
          int
        
         size =
        
           read(sock, buffer, BUF_SIZE);


        
        
          100
        
        
          if
        
        (
        
          0
        
         == size){
        
          //
        
        
          說明socket關(guān)閉
        
        
          101
        
                 cout<<
        
          "
        
        
          read size is 0 for socket:
        
        
          "
        
        <<sock<<
        
          endl;


        
        
          102
        
        
                  destroy_sock_ev(event_struct);


        
        
          103
        
        
                  close(sock);


        
        
          104
        
        
          return
        
        
          ;


        
        
          105
        
        
              }


        
        
          106
        
        
          struct
        
         sock_ev_write* sock_ev_write_struct = (
        
          struct
        
         sock_ev_write*)malloc(
        
          sizeof
        
        (
        
          struct
        
        
           sock_ev_write));


        
        
          107
        
             sock_ev_write_struct->buffer =
        
           buffer;


        
        
          108
        
        
          struct
        
        
          event
        
        * write_ev = (
        
          struct
        
        
          event
        
        *)malloc(
        
          sizeof
        
        (
        
          struct
        
        
          event
        
        ));
        
          //
        
        
          發(fā)生寫事件(也就是只要socket緩沖區(qū)可寫)時,就將反饋數(shù)據(jù)通過socket寫回客戶端
        
        
          109
        
             sock_ev_write_struct->write_ev =
        
           write_ev;


        
        
          110
        
        
              event_set(write_ev, sock, EV_WRITE, on_write, sock_ev_write_struct);


        
        
          111
        
             event_base_set(event_struct->
        
          base
        
        
          , write_ev);


        
        
          112
        
        
              event_add(write_ev, NULL);


        
        
          113
        
             cout<<
        
          "
        
        
          on_read() finished, sock=
        
        
          "
        
        <<sock<<
        
          endl;


        
        
          114
        
        
          }


        
        
          115
        
        
          116
        
        
          117
        
        
          /*
        
        
          *


        
        
          118
        
        
           * main執(zhí)行accept()得到新socket_fd的時候,執(zhí)行這個方法


        
        
          119
        
        
           * 創(chuàng)建一個新線程,在新線程里反饋給client收到的信息


        
        
          120
        
        
          */
        
        
          121
        
        
          void
        
        * process_in_new_thread_when_accepted(
        
          void
        
        *
        
           arg){


        
        
          122
        
        
          long
        
         long_fd = (
        
          long
        
        
          )arg;


        
        
          123
        
        
          int
        
         fd = (
        
          int
        
        
          )long_fd;


        
        
          124
        
        
          if
        
        (fd<
        
          0
        
        
          ){


        
        
          125
        
                 cout<<
        
          "
        
        
          process_in_new_thread_when_accepted() quit!
        
        
          "
        
        <<
        
          endl;


        
        
          126
        
        
          return
        
        
          0
        
        
          ;


        
        
          127
        
        
              }


        
        
          128
        
        
          //
        
        
          -------初始化base,寫事件和讀事件--------
        
        
          129
        
        
          struct
        
         event_base* 
        
          base
        
         =
        
           event_base_new();


        
        
          130
        
        
          struct
        
        
          event
        
        * read_ev = (
        
          struct
        
        
          event
        
        *)malloc(
        
          sizeof
        
        (
        
          struct
        
        
          event
        
        ));
        
          //
        
        
          發(fā)生讀事件后,從socket中取出數(shù)據(jù)


        
        
          131
        
        
          132
        
        
          //
        
        
          -------將base,read_ev,write_ev封裝到一個event_struct對象里,便于銷毀---------
        
        
          133
        
        
          struct
        
         sock_ev* event_struct = (
        
          struct
        
         sock_ev*)malloc(
        
          sizeof
        
        (
        
          struct
        
        
           sock_ev));


        
        
          134
        
             event_struct->
        
          base
        
         = 
        
          base
        
        
          ;


        
        
          135
        
             event_struct->read_ev =
        
           read_ev;


        
        
          136
        
        
          //
        
        
          -----對讀事件進行相應的設(shè)置------------
        
        
          137
        
             event_set(read_ev, fd, EV_READ|
        
          EV_PERSIST, on_read, event_struct);


        
        
          138
        
             event_base_set(
        
          base
        
        
          , read_ev);


        
        
          139
        
        
              event_add(read_ev, NULL);


        
        
          140
        
        
          //
        
        
          --------開始libevent的loop循環(huán)-----------
        
        
          141
        
             event_base_dispatch(
        
          base
        
        
          );


        
        
          142
        
             cout<<
        
          "
        
        
          event_base_dispatch() stopped for sock(
        
        
          "
        
        <<fd<<
        
          "
        
        
          )
        
        
          "
        
        <<
        
          "
        
        
           in process_in_new_thread_when_accepted()
        
        
          "
        
        <<
        
          endl;


        
        
          143
        
        
          return
        
        
          0
        
        
          ;


        
        
          144
        
        
          }


        
        
          145
        
        
          146
        
        
          /*
        
        
          *


        
        
          147
        
        
           * 每當accept出一個新的socket_fd時,調(diào)用這個方法。


        
        
          148
        
        
           * 創(chuàng)建一個新線程,在新線程里與client做交互


        
        
          149
        
        
          */
        
        
          150
        
        
          void
        
         accept_new_thread(
        
          int
        
        
           sock){


        
        
          151
        
        
              pthread_t thread;


        
        
          152
        
             pthread_create(&thread,NULL,process_in_new_thread_when_accepted,(
        
          void
        
        *
        
          )sock);


        
        
          153
        
        
              pthread_detach(thread);


        
        
          154
        
        
          }


        
        
          155
        
        
          156
        
        
          /*
        
        
          *


        
        
          157
        
        
           * 每當有新連接連到server時,就通過libevent調(diào)用此函數(shù)。


        
        
          158
        
        
           *    每個連接對應一個新線程


        
        
          159
        
        
          */
        
        
          160
        
        
          void
        
         on_accept(
        
          int
        
         sock, 
        
          short
        
        
          event
        
        , 
        
          void
        
        *
        
           arg)


        
        
          161
        
        
          {


        
        
          162
        
        
          struct
        
        
           sockaddr_in remote_addr;


        
        
          163
        
        
          int
        
         sin_size=
        
          sizeof
        
        (
        
          struct
        
        
           sockaddr_in);


        
        
          164
        
        
          int
        
         new_fd = accept(sock,  (
        
          struct
        
         sockaddr*) &remote_addr, (socklen_t*)&
        
          sin_size);


        
        
          165
        
        
          if
        
        (new_fd < 
        
          0
        
        
          ){


        
        
          166
        
                 cout<<
        
          "
        
        
          Accept error in on_accept()
        
        
          "
        
        <<
        
          endl;


        
        
          167
        
        
          return
        
        
          ;


        
        
          168
        
        
              }


        
        
          169
        
             cout<<
        
          "
        
        
          new_fd accepted is 
        
        
          "
        
        <<new_fd<<
        
          endl;


        
        
          170
        
        
              accept_new_thread(new_fd);


        
        
          171
        
             cout<<
        
          "
        
        
          on_accept() finished for fd=
        
        
          "
        
        <<new_fd<<
        
          endl;


        
        
          172
        
        
          }


        
        
          173
        
        
          174
        
        
          int
        
        
           main(){


        
        
          175
        
        
          int
        
         fd =
        
           getSocket();


        
        
          176
        
        
          if
        
        (fd<
        
          0
        
        
          ){


        
        
          177
        
                 cout<<
        
          "
        
        
          Error in main(), fd<0
        
        
          "
        
        <<
        
          endl;


        
        
          178
        
        
              }


        
        
          179
        
             cout<<
        
          "
        
        
          main() fd=
        
        
          "
        
        <<fd<<
        
          endl;


        
        
          180
        
        
          //
        
        
          ----為服務器主線程綁定ip和port------------------------------
        
        
          181
        
        
          struct
        
         sockaddr_in local_addr; 
        
          //
        
        
          服務器端網(wǎng)絡(luò)地址結(jié)構(gòu)體
        
        
          182
        
             memset(&local_addr,
        
          0
        
        ,
        
          sizeof
        
        (local_addr)); 
        
          //
        
        
          數(shù)據(jù)初始化--清零
        
        
          183
        
             local_addr.sin_family=AF_INET; 
        
          //
        
        
          設(shè)置為IP通信
        
        
          184
        
             local_addr.sin_addr.s_addr=inet_addr(SERVER_IP);
        
          //
        
        
          服務器IP地址
        
        
          185
        
             local_addr.sin_port=htons(SERVER_PORT); 
        
          //
        
        
          服務器端口號
        
        
          186
        
        
          int
        
         bind_result = bind(fd, (
        
          struct
        
         sockaddr*) &local_addr, 
        
          sizeof
        
        (
        
          struct
        
        
           sockaddr));


        
        
          187
        
        
          if
        
        (bind_result < 
        
          0
        
        
          ){


        
        
          188
        
                 cout<<
        
          "
        
        
          Bind Error in main()
        
        
          "
        
        <<
        
          endl;


        
        
          189
        
        
          return
        
         -
        
          1
        
        
          ;


        
        
          190
        
        
              }


        
        
          191
        
             cout<<
        
          "
        
        
          bind_result=
        
        
          "
        
        <<bind_result<<
        
          endl;


        
        
          192
        
             listen(fd, 
        
          10
        
        
          );


        
        
          193
        
        
          //
        
        
          -----設(shè)置libevent事件,每當socket出現(xiàn)可讀事件,就調(diào)用on_accept()------------
        
        
          194
        
        
          struct
        
         event_base* 
        
          base
        
         =
        
           event_base_new();


        
        
          195
        
        
          struct
        
        
          event
        
        
           listen_ev;


        
        
          196
        
             event_set(&listen_ev, fd, EV_READ|
        
          EV_PERSIST, on_accept, NULL);


        
        
          197
        
             event_base_set(
        
          base
        
        , &
        
          listen_ev);


        
        
          198
        
             event_add(&
        
          listen_ev, NULL);


        
        
          199
        
             event_base_dispatch(
        
          base
        
        
          );


        
        
          200
        
        
          //
        
        
          ------以下語句理論上是不會走到的---------------------------
        
        
          201
        
             cout<<
        
          "
        
        
          event_base_dispatch() in main() finished
        
        
          "
        
        <<
        
          endl;


        
        
          202
        
        
          //
        
        
          ----銷毀資源-------------
        
        
          203
        
             event_del(&
        
          listen_ev);


        
        
          204
        
             event_base_free(
        
          base
        
        
          );


        
        
          205
        
             cout<<
        
          "
        
        
          main() finished
        
        
          "
        
        <<
        
          endl;


        
        
          206
        
         }
      
server端的代碼

  1)在main()里(運行在主線程中),先設(shè)置服務端的socket,然后為主線程生成一個libevent的base,并將一個“讀事件”注冊到base上。“讀事件”綁定了一個on_accept(),每當client有新連接連過來時,就會觸發(fā)這個“讀事件”,進而調(diào)用on_accept()方法。

  2)在on_accept()里(運行在主線程中),每當有新連接連過來時,就會accept出一個新的new_fd,并調(diào)用accept_new_thread()來創(chuàng)建一個新的子線程。子線程里會調(diào)用process_in_new_thread_when_accepted()方法。

  3)process_in_new_thread_when_accepted()方法里(運行在子線程中),創(chuàng)建一個子線程的base,并創(chuàng)建一個“讀事件”,注冊到“子線程的base”上。并調(diào)用event_base_dispatch(base)進入libevent的loop中。當發(fā)現(xiàn)new_fd的socket緩沖區(qū)中有數(shù)據(jù)可讀時,就觸發(fā)了這個“讀事件”,繼而調(diào)用on_read()方法。

  4)on_read()方法里(運行在子線程中),從socket緩沖區(qū)里讀取數(shù)據(jù)。讀完數(shù)據(jù)之后,將一個“寫事件”注冊到“子線程的base”上。一旦socket可寫,就調(diào)用on_write()函數(shù)。

  5)on_write()方法(運行在子線程中),對數(shù)據(jù)進行修改,然后通過socket寫回到client端。

  注:其實可以不用注冊“寫事件”——在on_read()方法中直接修改數(shù)據(jù),然后寫回到client端也是可以的——但這有個問題。就是如果socket的寫緩沖區(qū)是滿的,那么這時候?write(sock, buffer, strlen(buffer))會阻塞的。這會導致整個on_read()方法阻塞掉,而無法讀到接下來client傳過來的數(shù)據(jù)了。而用了libevent的”寫事件“之后,雖然?write(sock, buffer, strlen(buffer))仍然會阻塞,但是只要socket緩沖區(qū)不可以寫就不會觸發(fā)這個“寫事件”,所以程序就不會阻塞,也就不會影響on_read()函數(shù)里的流程了。

使用libevent進行多線程socket編程demo


更多文章、技術(shù)交流、商務合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 久久精品爱 | 无遮挡一级毛片性视频不卡 | 四虎精品成在线播放 | 久久精品国产一区二区小说 | 最新中文字幕一区二区乱码 | 欧美成人精品一区二区三区 | 成人午夜影视全部免费看 | 日韩小视频 | www.777奇米| 久久无码精品一区二区三区 | 国产精品久久久久国产精品 | 狠狠色丁香婷婷综合最新地址 | 久久美女网| 亚洲qingse中文久久网 | 久久这里有精品 | 久久精品蜜芽亚洲国产a | 国产精品久久久久国产精品 | 色国产在线 | 国产精品中文字幕在线 | 一级床上爽高清播放 | 超高清欧美videos360 | 成年人黄视频大全 | 精产国品一二二区视 | 99视频免费看 | 亚洲一区二区三区四区五区 | 亚洲欧洲视频 | 精品国产免费久久久久久 | 欧美成人精品高清在线播放 | 日韩精品一区二区三区国语自制 | 国产免费不卡v片在线观看 国产免费不卡视频 | a极毛片| 久久久婷婷| 亚洲爱爱久久精品 | 九九色影院 | 在线观看年轻的母亲 | a拍拍男女免费看全片 | 欧美另类性视频在线看 | 真人特级毛片免费视频 | 亚欧成人毛片一区二区三区四区 | 色爱区综合激情五月综合激情 | 亚洲精品中文字幕一区 |