最近要對一個用libevent寫的C/C++項目進行修改,要改成多線程的,故做了一些學習和研究。
libevent是一個用C語言寫的開源的一個庫。它對socket編程里的epoll/select等功能進行了封裝,并且使用了一些設(shè)計模式(比如反應堆模式),用事件機制來簡化了socket編程。libevent的好處網(wǎng)上有很多,但是初學者往往都看不懂。我打個比方吧, 1) 假設(shè)有N個客戶端同時往服務端通過socket寫數(shù)據(jù),用了libevent之后,你的server程序里就不用再使用epoll或是select來判斷都哪些socket的緩沖區(qū)里已經(jīng)收到了客戶端寫來的數(shù)據(jù)。當某個socket的緩沖區(qū)里有可讀數(shù)據(jù)時,libevent會自動觸發(fā)一個“讀事件”,通過這個“讀事件”來調(diào)用相應的代碼來讀取socket緩沖區(qū)里的數(shù)據(jù)即可。換句話說,libevent自己調(diào)用select()或是epoll的函數(shù)來判斷哪個緩沖區(qū)可讀了,只要可讀了,就自動調(diào)用相應的處理程序。 2) 對于“寫事件”,libevent會監(jiān)控某個socket的緩沖區(qū)是否可寫(一般情況下,只要緩沖區(qū)沒滿就可寫),只要可寫,就會觸發(fā)“寫事件”,通過“寫事件”來調(diào)用相應的函數(shù),將數(shù)據(jù)寫到socket里。
以上兩個例子分別從“讀”和“寫”兩方面簡介了一下,可能不十分準確(但十分準確的描述往往會讓人看不懂)。
以下兩個鏈接關(guān)于libevent的剖析比較詳細,想學習libevent最好看一下。
1) sparkliang的專欄 ? ? ? ? 2) 魚思故淵的專欄
=========關(guān)于libevent使用多線程的討論=========================
網(wǎng)上很多資料說libevent不支持多線程,也有很多人說libevent可以支持多線程。究竟值不支持呢?我的答案是: 得看你的多線程是怎么寫的,如何跟libevent結(jié)合的。
1)可以肯定的是,libevent的 信號事件 是不支持多線程的(因為源碼里用了個全局變量)。可以看這篇文章(http://blog.csdn.net/sparkliang/article/details/5306809)。(注:libevent里有“超時事件”,“IO事件”,“信號事件”。)
2)對于不同的線程,使用不同的base,是可以的。
3)如果不同的線程使用相同的base呢?——如果在不同的線程里的事件都注冊到同一個base上,會有問題嗎?
(http://www.cnblogs.com/zzyoucan/p/3970578.html)這篇博客里提到說,不行!即使加鎖也不行。我最近稍微看了部分源碼,我的答案是:不加鎖會有并發(fā)問題,但如果對每個event_add(),event_del()等這些操作event的動作都用同一個臨界變量來加鎖,應該是沒問題的。——貌似也有點問題,如果某個事件沒有用event_set()設(shè)置為EV_PERSIST,當事件發(fā)生時,會被自動刪除。有可能線程a在刪除事件的時候,線程b卻在添加事件,這樣還是會出現(xiàn)并發(fā)問題。 最后的結(jié)論是——不行! 。
========本次實驗代碼邏輯的說明==========================
我采取的方案是對于不同的線程,使用不同的base。——即每個線程對應一個base,將線程里的事件注冊到線程的base上,而不是所有線程里的事件都用同一個base。
一 實驗需求描述:
1)寫一個client和server程序。多個client可以同時連接一個server;
2)client接收用戶在標準輸入的字符,發(fā)往server端;
3)server端收到后,再把收到的數(shù)據(jù)處理一下,返回給client;
4)client收到server返回的數(shù)據(jù)后,將其打印在終端上。
二 設(shè)計方案:
1. client:
1) client采用兩個線程,主線程接收用戶在終端上的輸入,并通過socket將用戶的輸入發(fā)往server。
2) 派生一個子線程,接收server返回來的數(shù)據(jù),如果收到數(shù)據(jù),就打印出來。
2. server:
在主線程里監(jiān)聽client有沒有連接連過來,如果有,立馬accept出一個socket,并創(chuàng)建一個子線程,在子線程里接收client傳過來的數(shù)據(jù),并對數(shù)據(jù)進行一些修改,然后將修改后的數(shù)據(jù)寫回到client端。
三 代碼實現(xiàn)
1. client代碼如下:

1 #include <iostream> 2 #include <sys/ select .h> 3 #include <sys/socket.h> 4 #include <unistd.h> 5 #include <pthread.h> 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <sys/types.h> 9 #include <netinet/ in .h> 10 #include <arpa/inet.h> 11 #include < string > 12 #include < string .h> 13 #include < event .h> 14 using namespace std; 15 16 #define BUF_SIZE 1024 17 18 /* * 19 * 連接到server端,如果成功,返回fd,如果失敗返回-1 20 */ 21 int connectServer( char * ip, int port){ 22 int fd = socket( AF_INET, SOCK_STREAM, 0 ); 23 cout<< " fd= " <<fd<< endl; 24 if (- 1 == fd){ 25 cout<< " Error, connectServer() quit " << endl; 26 return - 1 ; 27 } 28 struct sockaddr_in remote_addr; // 服務器端網(wǎng)絡(luò)地址結(jié)構(gòu)體 29 memset(&remote_addr, 0 , sizeof (remote_addr)); // 數(shù)據(jù)初始化--清零 30 remote_addr.sin_family=AF_INET; // 設(shè)置為IP通信 31 remote_addr.sin_addr.s_addr=inet_addr(ip); // 服務器IP地址 32 remote_addr.sin_port=htons(port); // 服務器端口號 33 int con_result = connect(fd, ( struct sockaddr*) &remote_addr, sizeof ( struct sockaddr)); 34 if (con_result < 0 ){ 35 cout<< " Connect Error! " << endl; 36 close(fd); 37 return - 1 ; 38 } 39 cout<< " con_result= " <<con_result<< endl; 40 return fd; 41 } 42 43 void on_read( int sock, short event , void * arg) 44 { 45 char * buffer = new char [BUF_SIZE]; 46 memset(buffer, 0 , sizeof ( char )* BUF_SIZE); 47 // --本來應該用while一直循環(huán),但由于用了libevent,只在可以讀的時候才觸發(fā)on_read(),故不必用while了 48 int size = read(sock, buffer, BUF_SIZE); 49 if ( 0 == size){ // 說明socket關(guān)閉 50 cout<< " read size is 0 for socket: " <<sock<< endl; 51 struct event * read_ev = ( struct event * )arg; 52 if (NULL != read_ev){ 53 event_del(read_ev); 54 free(read_ev); 55 } 56 close(sock); 57 return ; 58 } 59 cout<< " Received from server--- " <<buffer<< endl; 60 delete[]buffer; 61 } 62 63 void * init_read_event( void * arg){ 64 long long_sock = ( long )arg; 65 int sock = ( int )long_sock; 66 // -----初始化libevent,設(shè)置回調(diào)函數(shù)on_read()------------ 67 struct event_base* base = event_base_new(); 68 struct event * read_ev = ( struct event *)malloc( sizeof ( struct event )); // 發(fā)生讀事件后,從socket中取出數(shù)據(jù) 69 event_set(read_ev, sock, EV_READ| EV_PERSIST, on_read, read_ev); 70 event_base_set( base , read_ev); 71 event_add(read_ev, NULL); 72 event_base_dispatch( base ); 73 // -------------- 74 event_base_free( base ); 75 } 76 /* * 77 * 創(chuàng)建一個新線程,在新線程里初始化libevent讀事件的相關(guān)設(shè)置,并開啟event_base_dispatch 78 */ 79 void init_read_event_thread( int sock){ 80 pthread_t thread; 81 pthread_create(&thread,NULL,init_read_event,( void * )sock); 82 pthread_detach(thread); 83 } 84 int main() { 85 cout << " main started " << endl; // prints Hello World!!! 86 cout << " Please input server IP: " << endl; 87 char ip[ 16 ]; 88 cin >> ip; 89 cout << " Please input port: " << endl; 90 int port; 91 cin >> port; 92 cout << " ServerIP is " <<ip<< " ,port= " <<port<< endl; 93 int socket_fd = connectServer(ip, port); 94 cout << " socket_fd= " <<socket_fd<< endl; 95 init_read_event_thread(socket_fd); 96 // -------------------------- 97 char buffer[BUF_SIZE]; 98 bool isBreak = false ; 99 while (! isBreak){ 100 cout << " Input your data to server(\'q\' or \"quit\" to exit) " << endl; 101 cin >> buffer; 102 if (strcmp( " q " , buffer)== 0 || strcmp( " quit " , buffer)== 0 ){ 103 isBreak= true ; 104 close(socket_fd); 105 break ; 106 } 107 cout << " Your input is " <<buffer<< endl; 108 int write_num = write(socket_fd, buffer, strlen(buffer)); 109 cout << write_num << " characters written " << endl; 110 sleep( 2 ); 111 } 112 cout<< " main finished " << endl; 113 return 0 ; 114 }
1)在main()里先調(diào)用init_read_event_thread()來生成一個子線程,子線程里調(diào)用init_read_event()來將socket的讀事件注冊到libevent的base上,并調(diào)用libevent的event_base_dispatch()不斷地進行輪詢。一旦socket可讀,libevent就調(diào)用“讀事件”上綁定的on_read()函數(shù)來讀取數(shù)據(jù)。
2)在main()的主線程里,通過一個while循環(huán)來接收用戶從終端的輸入,并通過socket將用戶的輸入寫到server端。
-------------------------------------------------------------
2. server端代碼如下:

1 #include <iostream> 2 #include <sys/ select .h> 3 #include <sys/socket.h> 4 #include <stdio.h> 5 #include <unistd.h> 6 #include <pthread.h> 7 #include <stdio.h> 8 #include <sys/types.h> 9 #include <netinet/ in .h> 10 #include <arpa/inet.h> 11 #include < string > 12 #include < string .h> 13 #include < event .h> 14 #include <stdlib.h> 15 using namespace std; 16 17 #define SERVER_IP "127.0.0.1" 18 #define SERVER_PORT 9090 19 #define BUF_SIZE 1024 20 21 struct sock_ev_write{ // 用戶寫事件完成后的銷毀,在on_write()中執(zhí)行 22 struct event * write_ev; 23 char * buffer; 24 }; 25 struct sock_ev { // 用于讀事件終止(socket斷開)后的銷毀 26 struct event_base* base ; // 因為socket斷掉后,讀事件的loop要終止,所以要有base指針 27 struct event * read_ev; 28 }; 29 30 /* * 31 * 銷毀寫事件用到的結(jié)構(gòu)體 32 */ 33 void destroy_sock_ev_write( struct sock_ev_write* sock_ev_write_struct){ 34 if (NULL != sock_ev_write_struct){ 35 // event_del(sock_ev_write_struct->write_ev); // 因為寫事件沒用EV_PERSIST,故不用event_del 36 if (NULL != sock_ev_write_struct-> write_ev){ 37 free(sock_ev_write_struct-> write_ev); 38 } 39 if (NULL != sock_ev_write_struct-> buffer){ 40 delete[]sock_ev_write_struct-> buffer; 41 } 42 free(sock_ev_write_struct); 43 } 44 } 45 46 47 /* * 48 * 讀事件結(jié)束后,用于銷毀相應的資源 49 */ 50 void destroy_sock_ev( struct sock_ev* sock_ev_struct){ 51 if (NULL == sock_ev_struct){ 52 return ; 53 } 54 event_del(sock_ev_struct-> read_ev); 55 event_base_loopexit(sock_ev_struct-> base , NULL); // 停止loop循環(huán) 56 if (NULL != sock_ev_struct-> read_ev){ 57 free(sock_ev_struct-> read_ev); 58 } 59 event_base_free(sock_ev_struct-> base ); 60 // destroy_sock_ev_write(sock_ev_struct->sock_ev_write_struct); 61 free(sock_ev_struct); 62 } 63 int getSocket(){ 64 int fd =socket( AF_INET, SOCK_STREAM, 0 ); 65 if (- 1 == fd){ 66 cout<< " Error, fd is -1 " << endl; 67 } 68 return fd; 69 } 70 71 void on_write( int sock, short event , void * arg) 72 { 73 cout<< " on_write() called, sock= " <<sock<< endl; 74 if (NULL == arg){ 75 cout<< " Error! void* arg is NULL in on_write() " << endl; 76 return ; 77 } 78 struct sock_ev_write* sock_ev_write_struct = ( struct sock_ev_write* )arg; 79 80 char buffer[BUF_SIZE]; 81 sprintf(buffer, " fd=%d, received[%s] " , sock, sock_ev_write_struct-> buffer); 82 // int write_num0 = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer)); 83 // int write_num = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer)); 84 int write_num = write(sock, buffer, strlen(buffer)); 85 destroy_sock_ev_write(sock_ev_write_struct); 86 cout<< " on_write() finished, sock= " <<sock<< endl; 87 } 88 89 void on_read( int sock, short event , void * arg) 90 { 91 cout<< " on_read() called, sock= " <<sock<< endl; 92 if (NULL == arg){ 93 return ; 94 } 95 struct sock_ev* event_struct = ( struct sock_ev*) arg; // 獲取傳進來的參數(shù) 96 char * buffer = new char [BUF_SIZE]; 97 memset(buffer, 0 , sizeof ( char )* BUF_SIZE); 98 // --本來應該用while一直循環(huán),但由于用了libevent,只在可以讀的時候才觸發(fā)on_read(),故不必用while了 99 int size = read(sock, buffer, BUF_SIZE); 100 if ( 0 == size){ // 說明socket關(guān)閉 101 cout<< " read size is 0 for socket: " <<sock<< endl; 102 destroy_sock_ev(event_struct); 103 close(sock); 104 return ; 105 } 106 struct sock_ev_write* sock_ev_write_struct = ( struct sock_ev_write*)malloc( sizeof ( struct sock_ev_write)); 107 sock_ev_write_struct->buffer = buffer; 108 struct event * write_ev = ( struct event *)malloc( sizeof ( struct event )); // 發(fā)生寫事件(也就是只要socket緩沖區(qū)可寫)時,就將反饋數(shù)據(jù)通過socket寫回客戶端 109 sock_ev_write_struct->write_ev = write_ev; 110 event_set(write_ev, sock, EV_WRITE, on_write, sock_ev_write_struct); 111 event_base_set(event_struct-> base , write_ev); 112 event_add(write_ev, NULL); 113 cout<< " on_read() finished, sock= " <<sock<< endl; 114 } 115 116 117 /* * 118 * main執(zhí)行accept()得到新socket_fd的時候,執(zhí)行這個方法 119 * 創(chuàng)建一個新線程,在新線程里反饋給client收到的信息 120 */ 121 void * process_in_new_thread_when_accepted( void * arg){ 122 long long_fd = ( long )arg; 123 int fd = ( int )long_fd; 124 if (fd< 0 ){ 125 cout<< " process_in_new_thread_when_accepted() quit! " << endl; 126 return 0 ; 127 } 128 // -------初始化base,寫事件和讀事件-------- 129 struct event_base* base = event_base_new(); 130 struct event * read_ev = ( struct event *)malloc( sizeof ( struct event )); // 發(fā)生讀事件后,從socket中取出數(shù)據(jù) 131 132 // -------將base,read_ev,write_ev封裝到一個event_struct對象里,便于銷毀--------- 133 struct sock_ev* event_struct = ( struct sock_ev*)malloc( sizeof ( struct sock_ev)); 134 event_struct-> base = base ; 135 event_struct->read_ev = read_ev; 136 // -----對讀事件進行相應的設(shè)置------------ 137 event_set(read_ev, fd, EV_READ| EV_PERSIST, on_read, event_struct); 138 event_base_set( base , read_ev); 139 event_add(read_ev, NULL); 140 // --------開始libevent的loop循環(huán)----------- 141 event_base_dispatch( base ); 142 cout<< " event_base_dispatch() stopped for sock( " <<fd<< " ) " << " in process_in_new_thread_when_accepted() " << endl; 143 return 0 ; 144 } 145 146 /* * 147 * 每當accept出一個新的socket_fd時,調(diào)用這個方法。 148 * 創(chuàng)建一個新線程,在新線程里與client做交互 149 */ 150 void accept_new_thread( int sock){ 151 pthread_t thread; 152 pthread_create(&thread,NULL,process_in_new_thread_when_accepted,( void * )sock); 153 pthread_detach(thread); 154 } 155 156 /* * 157 * 每當有新連接連到server時,就通過libevent調(diào)用此函數(shù)。 158 * 每個連接對應一個新線程 159 */ 160 void on_accept( int sock, short event , void * arg) 161 { 162 struct sockaddr_in remote_addr; 163 int sin_size= sizeof ( struct sockaddr_in); 164 int new_fd = accept(sock, ( struct sockaddr*) &remote_addr, (socklen_t*)& sin_size); 165 if (new_fd < 0 ){ 166 cout<< " Accept error in on_accept() " << endl; 167 return ; 168 } 169 cout<< " new_fd accepted is " <<new_fd<< endl; 170 accept_new_thread(new_fd); 171 cout<< " on_accept() finished for fd= " <<new_fd<< endl; 172 } 173 174 int main(){ 175 int fd = getSocket(); 176 if (fd< 0 ){ 177 cout<< " Error in main(), fd<0 " << endl; 178 } 179 cout<< " main() fd= " <<fd<< endl; 180 // ----為服務器主線程綁定ip和port------------------------------ 181 struct sockaddr_in local_addr; // 服務器端網(wǎng)絡(luò)地址結(jié)構(gòu)體 182 memset(&local_addr, 0 , sizeof (local_addr)); // 數(shù)據(jù)初始化--清零 183 local_addr.sin_family=AF_INET; // 設(shè)置為IP通信 184 local_addr.sin_addr.s_addr=inet_addr(SERVER_IP); // 服務器IP地址 185 local_addr.sin_port=htons(SERVER_PORT); // 服務器端口號 186 int bind_result = bind(fd, ( struct sockaddr*) &local_addr, sizeof ( struct sockaddr)); 187 if (bind_result < 0 ){ 188 cout<< " Bind Error in main() " << endl; 189 return - 1 ; 190 } 191 cout<< " bind_result= " <<bind_result<< endl; 192 listen(fd, 10 ); 193 // -----設(shè)置libevent事件,每當socket出現(xiàn)可讀事件,就調(diào)用on_accept()------------ 194 struct event_base* base = event_base_new(); 195 struct event listen_ev; 196 event_set(&listen_ev, fd, EV_READ| EV_PERSIST, on_accept, NULL); 197 event_base_set( base , & listen_ev); 198 event_add(& listen_ev, NULL); 199 event_base_dispatch( base ); 200 // ------以下語句理論上是不會走到的--------------------------- 201 cout<< " event_base_dispatch() in main() finished " << endl; 202 // ----銷毀資源------------- 203 event_del(& listen_ev); 204 event_base_free( base ); 205 cout<< " main() finished " << endl; 206 }
1)在main()里(運行在主線程中),先設(shè)置服務端的socket,然后為主線程生成一個libevent的base,并將一個“讀事件”注冊到base上。“讀事件”綁定了一個on_accept(),每當client有新連接連過來時,就會觸發(fā)這個“讀事件”,進而調(diào)用on_accept()方法。
2)在on_accept()里(運行在主線程中),每當有新連接連過來時,就會accept出一個新的new_fd,并調(diào)用accept_new_thread()來創(chuàng)建一個新的子線程。子線程里會調(diào)用process_in_new_thread_when_accepted()方法。
3)process_in_new_thread_when_accepted()方法里(運行在子線程中),創(chuàng)建一個子線程的base,并創(chuàng)建一個“讀事件”,注冊到“子線程的base”上。并調(diào)用event_base_dispatch(base)進入libevent的loop中。當發(fā)現(xiàn)new_fd的socket緩沖區(qū)中有數(shù)據(jù)可讀時,就觸發(fā)了這個“讀事件”,繼而調(diào)用on_read()方法。
4)on_read()方法里(運行在子線程中),從socket緩沖區(qū)里讀取數(shù)據(jù)。讀完數(shù)據(jù)之后,將一個“寫事件”注冊到“子線程的base”上。一旦socket可寫,就調(diào)用on_write()函數(shù)。
5)on_write()方法(運行在子線程中),對數(shù)據(jù)進行修改,然后通過socket寫回到client端。
注:其實可以不用注冊“寫事件”——在on_read()方法中直接修改數(shù)據(jù),然后寫回到client端也是可以的——但這有個問題。就是如果socket的寫緩沖區(qū)是滿的,那么這時候?write(sock, buffer, strlen(buffer))會阻塞的。這會導致整個on_read()方法阻塞掉,而無法讀到接下來client傳過來的數(shù)據(jù)了。而用了libevent的”寫事件“之后,雖然?write(sock, buffer, strlen(buffer))仍然會阻塞,但是只要socket緩沖區(qū)不可以寫就不會觸發(fā)這個“寫事件”,所以程序就不會阻塞,也就不會影響on_read()函數(shù)里的流程了。
更多文章、技術(shù)交流、商務合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
