我的新浪微博: http://weibo.com/freshairbrucewoo 。
歡迎大家相互交流,共同提高技術。
?
第三節、 rpc 通信過程分析
前面兩個小節分別對rpc服務端和客戶端的建立流程做了詳細的分析,也就是說rpc客戶端和服務器端已經能夠進行正常的通信了(rpc客戶端已經通過connect鏈接上rpc服務器了),那么這一小節主要根據一個實際的例子來分析一個完整的rpc通信過程。
下面以客戶端創建邏輯卷(volume)為例來分析rpc的通信過程,就以下面這個客戶端的命令開始:
gluster?volume?create?test-volume?server3:/exp3?server4:/exp4
先簡單看看glusterfs的客戶端是怎樣開始提交rpc請求的,提交準備過程流程圖如下:
?
從上面的流程圖可以看出真正開始提交 rpc 請求調用還是從具體命令的回調函數開始發起的,上面的流程圖主要展示的是準備工作,下面從具體命令的回調函數開始分析,這里分析的實例是創建邏輯卷的命令,執行的函數是 cli_cmd_volume_create_cbk ,主要實現代碼如下:
?
1 proc = &cli_rpc_prog->proctable[GLUSTER_CLI_CREATE_VOLUME]; // 從rpc程序表中選擇對應函數 2 3 frame = create_frame (THIS, THIS->ctx->pool); // 創建幀 4 5 ret = cli_cmd_volume_create_parse (words, wordcount, &options); // 創建邏輯卷的命令解析 6 7 if (proc-> fn) { 8 9 ret = proc->fn (frame, THIS, options); // 執行命令的回調函數 10 11 } 12 13 if (ret) { 14 15 cli_cmd_sent_status_get (&sent); // 得到命令發送狀態 16 17 if ((sent == 0 ) && (parse_error == 0 )) 18 19 cli_out ( " Volume create failed " ); // 如果失敗,錯誤提示 20 21 }
?
首先選擇對應命令的 rpc 客戶端創建邏輯卷的命令函數,然后解析命令以后執行相應的創建邏輯卷的 rpc 函數,下面是對應的函數存放表項:
1 [GLUSTER_CLI_CREATE_VOLUME] = { " CREATE_VOLUME " , gf_cli3_1_create_volume}
?
所以真正的提交函數是 gf_cli3_1_create_volume 函數,繼續分析這個函數,主要實現代碼如下:
1 ret = cli_cmd_submit (& req, frame, cli_rpc_prog, GLUSTER_CLI_CREATE_VOLUME, NULL, 2 3 gf_xdr_from_cli_create_vol_req, this , gf_cli3_1_create_volume_cbk);
?
主要代碼也只有一行,其余代碼就是為了這一行的函數調用做相應參數準備的,這一行的這個函數就是所有客戶端命令提交 rpc 請求服務的實現函數,只是提交的數據不同而已!下面重點分析這個函數,還是先看看主要代碼:
1 cli_cmd_lock (); // 命令對象加鎖 2 3 cmd_sent = 0 ; // 初始化命令發送狀態標示為0 4 5 ret = cli_submit_request (req, frame, prog, procnum, NULL, sfunc, this , cbkfn); // 提交請求 6 7 if (! ret) { 8 9 cmd_sent = 1 ; // 標示已經發送 10 11 ret = cli_cmd_await_response (); // 等待響應 12 13 } else 14 15 cli_cmd_unlock (); // 不成功解鎖
?
在發送具體的 rpc 請求以前先鎖住命令對象,然后調用函數 cli_submit_request? 把 rpc 請求發送出去(應該是異步的),然后設置命令以發送標志,并調用函數 cli_cmd_await_response 等待響應。繼續看提交 rpc 請求的函數:
1 iobuf = iobuf_get ( this ->ctx->iobuf_pool); // 從緩沖池取一個io緩存 2 3 if (! iobref) { 4 5 iobref = iobref_new (); // 新建一個iobuf引用池 6 7 new_iobref = 1 ; // 標志 8 9 } 10 11 iobref_add (iobref, iobuf); // 把io緩存加入io緩存引用池 12 13 iov.iov_base = iobuf->ptr; // io向量基地址(供用戶使用的內存) 14 15 iov.iov_len = 128 * GF_UNIT_KB; // 大小 16 17 if (req && sfunc) { 18 19 ret = sfunc (iov, req); // 序列化為xdr格式數據(表示層數據格式) 20 21 iov.iov_len = ret; // 序列化以后的長度 22 23 count = 1 ; // 計數初始化為1 24 25 } 26 27 ret = rpc_clnt_submit (global_rpc, prog, procnum, cbkfn, &iov, count, // 提交客戶端rpc請求 28 29 NULL, 0 , iobref, frame, NULL, 0 , NULL, 0 , NULL);
?
Xdr 數據格式的轉換是調用函數庫實現的,不具體分析,需要明白的是經過 sfunc? 函數調用以后就是 xdr 格式的數據了,最后根據轉化后的數據調用 rpc_clnt_submit 提交客戶端的 rpc 請求。繼續深入函數:
1 rpcreq = mem_get (rpc->reqpool); // 重rpc對象的請求對象池得到一個請求對象 2 3 if (! iobref) { 4 5 iobref = iobref_new (); // 如果io緩存引用池為null就新建一個 6 7 new_iobref = 1 ; // 新建標志 8 9 } 10 11 callid = rpc_clnt_new_callid (rpc); // 新建一個rpc調用的id號 12 13 conn = &rpc->conn; // 從rpc對象中取得鏈接對象 14 15 rpcreq->prog = prog; // 賦值rpc請求對象的程序 16 17 rpcreq->procnum = procnum; // 程序號 18 19 rpcreq->conn = conn; // 鏈接對象 20 21 rpcreq->xid = callid; // 調用id號 22 23 rpcreq->cbkfn = cbkfn; // 回調函數 24 25 if (proghdr) { // 程序頭不為空 26 27 proglen += iov_length (proghdr, proghdrcount); // 計算頭部長度加入程序消息總長度 28 29 } 30 31 if (progpayload) { 32 33 proglen += iov_length (progpayload, progpayloadcount); // 計算io向量的長度加入總長度 34 35 } 36 37 request_iob = rpc_clnt_record (rpc, frame, prog, procnum, proglen, &rpchdr, callid); // 建立rpc記錄 38 39 iobref_add (iobref, request_iob); // 添加rpc記錄的io緩存區到io緩存引用池 40 41 req.msg.rpchdr = &rpchdr; // rpc請求消息頭部 42 43 req.msg.rpchdrcount = 1 ; // 頭部數量 44 45 req.msg.proghdr = proghdr; // 程序頭部 46 47 req.msg.proghdrcount = proghdrcount; // 程序頭部數量 48 49 req.msg.progpayload = progpayload; // xdr格式數據 50 51 req.msg.progpayloadcount = progpayloadcount; // 數量 52 53 req.msg.iobref = iobref; // io緩存引用池 54 55 req.rsp.rsphdr = rsphdr; // 響應頭部 56 57 req.rsp.rsphdr_count = rsphdr_count; // 數量 58 59 req.rsp.rsp_payload = rsp_payload; // 負載 60 61 req.rsp.rsp_payload_count = rsp_payload_count; // 數量 62 63 req.rsp.rsp_iobref = rsp_iobref; // 響應緩存引用池 64 65 req.rpc_req = rpcreq; // rpc請求 66 67 pthread_mutex_lock (&conn-> lock ); // 加鎖 68 69 { 70 71 if (conn->connected == 0 ) { // 還沒有建立連接 72 73 rpc_transport_connect (conn->trans, conn->config.remote_port); // 建立連接 74 75 } 76 77 ret = rpc_transport_submit_request (rpc->conn.trans, &req); // 提交傳輸層rpc請求 78 79 if ((ret >= 0 ) && frame) { 80 81 gettimeofday (&conn->last_sent, NULL); // 設置最后發送時間 82 83 __save_frame (rpc, frame, rpcreq); // 保存幀到隊列 84 85 } 86 87 } 88 89 pthread_mutex_unlock (&conn-> lock ); // 解鎖
?
經過上面的代碼,現在數據已經到達傳輸層,所以現在就開始調用傳輸層的 rpc 請求發送函數 rpc_transport_submit_request ,代碼如下:
1 ret = this ->ops->submit_request ( this , req);
?
這里采用函數指針的方式進行調用的,具體的傳輸協議調用具體的傳輸函數,這些函數都是在裝載協議庫的時候已經賦值具體函數的實現了,分析的是 tcp ,所以看看 tcp 的發送函數:
1 struct rpc_transport_ops tops = { 2 3 ...... 4 5 .submit_request = socket_submit_request, 6 7 ...... 8 9 };
?
從上面可以看出 tcp 的發送函數是 socket_submit_request ,主要實現代碼如下:
1 pthread_mutex_lock (&priv-> lock ); // 加鎖 2 3 { 4 5 priv->submit_log = 0 ; // 提交標志初始化為0 6 7 entry = __socket_ioq_new ( this , &req->msg); // 根據請求對象的消息新建一個io請求隊列 8 9 if (list_empty (&priv->ioq)) { // 判斷提交io請求隊列是否為空 10 11 ret = __socket_ioq_churn_entry ( this , entry); // 開始依次提交傳輸層的io請求 12 13 if (ret == 0 ) 14 15 need_append = 0 ; // 需要添加到entry鏈表 16 17 if (ret > 0 ) 18 19 need_poll_out = 1 ; // 需要注冊可寫事件 20 21 } 22 23 if (need_append) { 24 25 list_add_tail (&entry->list, &priv->ioq); // 添加到entry的鏈表 26 27 } 28 29 if (need_poll_out) { // 注冊可寫事件 30 31 priv->idx = event_select_on (ctx->event_pool, priv->sock, priv->idx, - 1 , 1 ); 32 33 } 34 35 } 36 37 pthread_mutex_unlock (&priv-> lock ); // 解鎖
?
這段加鎖的代碼就是完成整個 rpc 請求信息的發送,如果沒有發送完畢就在注冊一個可寫事件啟動下一次請求,到此客戶端的 rpc 請求已經發送完畢,就開始等待服務器的響應。
下面就看看 rpc 服務器端怎么響應客戶端的請求,并根據相應的請求命令做怎樣的處理。在分析 rpc 服務啟動的時候知道注冊了監聽事件,監聽事件的處理函數是 socket_server_event_handler ,它的主要實現代碼如下:
?
1 pthread_mutex_lock (&priv-> lock ); 2 3 { 4 5 if (poll_in) { // 連接到來是可讀事件 6 7 new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen); // 接收客戶端連接 8 9 if (!priv->bio) { // 設置非阻塞 10 11 ret = __socket_nonblock (new_sock); 12 13 } 14 15 if (priv->nodelay) { // 設置無延遲發送 16 17 ret = __socket_nodelay (new_sock); 18 19 } 20 21 if (priv->keepalive) { // 設置保持連接 22 23 ret = __socket_keepalive (new_sock, priv->keepaliveintvl, priv-> keepaliveidle); 24 25 } 26 27 // 為連接對象 28 29 new_trans = GF_CALLOC ( 1 , sizeof (* new_trans), gf_common_mt_rpc_trans_t); 30 31 new_trans->name = gf_strdup ( this ->name); // 賦值名稱 32 33 memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen); // 賦值地址信息 34 35 new_trans->peerinfo.sockaddr_len = addrlen; // 長度 36 37 new_trans->myinfo.sockaddr_len = sizeof (new_trans-> myinfo.sockaddr); 38 39 ret = getsockname (new_sock, SA (&new_trans-> myinfo.sockaddr), 40 41 &new_trans->myinfo.sockaddr_len); // 得到新socket的地址信息 42 43 get_transport_identifiers (new_trans); 44 45 socket_init (new_trans); // 初始化新的傳輸層對象(新的socket) 46 47 pthread_mutex_lock (&new_priv-> lock ); 48 49 { 50 51 new_priv->connected = 1 ; // 連接已經建立 52 53 rpc_transport_ref (new_trans); // 傳輸對象引用計數加1 54 55 new_priv->idx = event_register (ctx->event_pool, new_sock, // 注冊可讀事件 56 57 socket_event_handler, new_trans, 1 , 0 ); 58 59 } 60 61 pthread_mutex_unlock (&new_priv-> lock ); 62 63 // 執行傳輸對象注冊的通知函數,通知已經接受客戶端連接請求 64 65 ret = rpc_transport_notify ( this , RPC_TRANSPORT_ACCEPT, new_trans); 66 67 } 68 69 } 70 71 pthread_mutex_unlock (&priv-> lock );
?
上面的代碼主要就是處理客戶端的連接請求,然后在新的 socket 上注冊可讀事件(準備讀取客戶端發送來的 rpc 請求信息),并且執行通知函數做相應的處理。注冊的可讀事件的處理函數是 socket_event_handler ,主要是實現代碼如下:
?
1 if (!priv->connected) { // 如果連接還沒有完成就繼續完成連接,因為連接是異步的可能沒有立即完成 2 3 ret = socket_connect_finish ( this ); 4 5 } 6 7 if (!ret && poll_out) { // 處理可寫事件 8 9 ret = socket_event_poll_out ( this ); 10 11 } 12 13 if (!ret && poll_in) { // 處理可讀事件 14 15 ret = socket_event_poll_in ( this ); 16 17 }
?
客戶端的連接請求對于服務器來說是可讀事件,所以執行的 socket_event_poll_in 函數,當服務器需要發送響應信息到 rpc 客戶端的時候就會執行可寫事件處理函數。繼續分析接收客戶端請求信息的處理函數 socket_event_poll_in 主要代碼如下:
??
1 ret = socket_proto_state_machine ( this , &pollin); // 根據rpc服務記錄的狀態做相應處理 2 3 if (pollin != NULL) { 4 5 ret = rpc_transport_notify ( this , RPC_TRANSPORT_MSG_RECEIVED, pollin); // 執行通知函數 6 7 rpc_transport_pollin_destroy (pollin); // 完成處理就銷毀資源 8 9 }
?
上面的代碼主要還是調用其它函數繼續處理 rpc 客戶端的請求信息,然后執行通知函數通知傳輸對象消息已經被接收,最后銷毀傳輸層相關不在需要的資源。處理具體請求信息的實現是在函數 socket_proto_state_machine ,而這個函數又調用 __socket_proto_state_machine 來處理,所以看看這個函數實現功能的主要代碼:
1 while (priv->incoming.record_state != SP_STATE_COMPLETE) { // 直到rpc服務記錄狀態完成為止 2 3 switch (priv->incoming.record_state) { // 根據現在rpc服務記錄的狀態做相應處理 4 5 case SP_STATE_NADA: // 開始狀態 6 7 iobuf = iobuf_get ( this ->ctx->iobuf_pool); // 取得一個io緩存 8 9 priv->incoming.record_state = SP_STATE_READING_FRAGHDR; // 改變狀態為讀取頭部 10 11 case SP_STATE_READING_FRAGHDR: // 讀取頭部信息 12 13 ret = __socket_readv ( this , priv->incoming.pending_vector, 1 , // 讀取信息 14 15 &priv-> incoming.pending_vector, 16 17 &priv-> incoming.pending_count, NULL); 18 19 if (ret > 0 ) { // 讀取了部分頭部信息 20 21 } 22 23 if (ret == 0 ) { // 讀取了所有頭部信息,繼續下一步的處理 24 25 priv->incoming.record_state = SP_STATE_READ_FRAGHDR; // 改變為下一步 26 27 } 28 29 case SP_STATE_READ_FRAGHDR: // 處理已經讀取的頭部信息 30 31 priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); // 轉換頭部信息為主機字節 32 33 priv->incoming.record_state = SP_STATE_READING_FRAG; // 轉化為讀取幀數據狀態 34 35 priv->incoming.total_bytes_read += RPC_FRAGSIZE(priv->incoming.fraghdr); // 字節數 36 37 case SP_STATE_READING_FRAG: // 讀取所有的數據 38 39 ret = __socket_read_frag ( this ); // 讀取所有幀數據 40 41 priv->incoming.frag.bytes_read = 0 ; 42 43 if (!RPC_LASTFRAG (priv->incoming.fraghdr)) { // 是否為最后一幀數據 44 45 priv->incoming.record_state = SP_STATE_READING_FRAGHDR; // 不是 46 47 break ; // 退出循環,從新讀取頭部信息 48 49 } 50 51 if (pollin != NULL) { 52 53 int count = 0 ; // 計數 54 55 priv->incoming.iobuf_size = priv-> incoming.total_bytes_read 56 57 - priv->incoming.payload_vector.iov_len; // 計算io緩存大小 58 59 memset (vector, 0 , sizeof (vector)); // io向量清零 60 61 if (priv->incoming.iobref == NULL) { // io緩存引用池為null就新建一個 62 63 priv->incoming.iobref = iobref_new (); 64 65 } 66 67 vector[count].iov_base = iobuf_ptr (priv->incoming.iobuf); // 取io緩存基地址 68 69 vector[count].iov_len = priv->incoming.iobuf_size; // io緩存長度 70 71 iobref = priv->incoming.iobref; // io緩存引用池 72 73 count++; // 計數加1 74 75 if (priv->incoming.payload_vector.iov_base != NULL) { // 負載向量不為null 76 77 vector[count] = priv->incoming.payload_vector; // 保存負載io向量 78 79 count++; // 計數加1 80 81 } 82 83 // 新建一個傳輸層可取對象 84 85 *pollin = rpc_transport_pollin_alloc ( this , vector, count, priv-> incoming.iobuf, 86 87 iobref, priv-> incoming.request_info); 88 89 iobuf_unref (priv->incoming.iobuf); // io緩存引用計算減1 90 91 priv->incoming.iobuf = NULL; // 清零 92 93 if (priv->incoming.msg_type == REPLY) // 消息類型是回復 94 95 (*pollin)->is_reply = 1 ; // 設置回復標志 96 97 priv->incoming.request_info = NULL; // 請求信息清零 98 99 } 100 101 priv->incoming.record_state = SP_STATE_COMPLETE; // 設置為完成狀態 102 103 break ; 104 105 } 106 107 } 108 109 if (priv->incoming.record_state == SP_STATE_COMPLETE) { // 如果rpc請求記錄為完成狀態 110 111 priv->incoming.record_state = SP_STATE_NADA; // 重新初始化為開始狀態 112 113 __socket_reset_priv (priv); // 復位私有數據對象 114 115 }
?
整個處理過程分為了幾個階段,而且每一個階段只處理相應的事情,然后就進入下一個階段,因為前幾個階段 case 語言都是不帶 break 的,所以直接進入下一個階段,最終達到完成狀態就退出循環,一個完成的處理過程其實就只需要一次循環就解決了。當所有 rpc 請求消息都已經接收以后就調用通知函數(在傳輸對象上注冊的通知函數)通知傳輸對象消息已經接收,由 rpc 服務器的初始化過程我們知道注冊的傳輸對象通知函數是 rpcsvc_notify? ,這個函數主要實現代碼如下:
?
1 switch ( event ) { 2 3 case RPC_TRANSPORT_ACCEPT: // rpc請求已經被接收處理 4 5 new_trans = data; 6 7 ret = rpcsvc_accept (svc, trans, new_trans); // 處理函數 8 9 break ; 10 11 case RPC_TRANSPORT_DISCONNECT: // 斷開連接消息 12 13 ret = rpcsvc_handle_disconnect (svc, trans); // 處理函數 14 15 break ; 16 17 case RPC_TRANSPORT_MSG_RECEIVED: // 消息已經接收 18 19 msg = data; 20 21 ret = rpcsvc_handle_rpc_call (svc, trans, msg); // rpc調用處理函數 22 23 break ; 24 25 case RPC_TRANSPORT_MSG_SENT: // 消息已經發生,不需要處理 26 27 break ; 28 29 case RPC_TRANSPORT_CONNECT: // 已經連接 30 31 break ; 32 33 case RPC_TRANSPORT_CLEANUP: // 清零消息 34 35 listener = rpcsvc_get_listener (svc, - 1 , trans->listener); // 得到對應的監聽器對象 36 37 rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY, trans); // 通知上層 38 39 break ; 40 41 case RPC_TRANSPORT_MAP_XID_REQUEST: 42 43 break ; 44 45 }
?
傳輸對象注冊的通知函數會根據傳遞過來的信息類型做相應的處理,這里傳遞過來的消息是消息已經接收,它的處理就是開始執行 rpc 調用了,執行的函數是 rpcsvc_handle_rpc_call ,它的主要實現代碼如下:
?
1 req = rpcsvc_request_create (svc, trans, msg); // 創建一個rpc服務請求對象 2 3 if (!rpcsvc_request_accepted (req)) // 判斷rpc請求是否被接受 4 5 ; 6 7 actor = rpcsvc_program_actor (req); // 根據請求對象取得rpc過程調用對象 8 9 if (actor && (req->rpc_err == SUCCESS)) { // rpc過程調用對象不為null并且請求信息是成功的 10 11 THIS = svc->mydata; // 取得xlator對象 12 13 if (req->count == 2 ) { // 請求的數量等于2 14 15 if (actor->vector_actor) { // 向量過程不為null,就執行向量處理函數 16 17 ret = actor->vector_actor (req, &req->msg[ 1 ], 1 , req-> iobref); 18 19 } else { 20 21 rpcsvc_request_seterr (req, PROC_UNAVAIL); // 出錯,不可用的函數 22 23 ret = RPCSVC_ACTOR_ERROR; // 調用過程出錯 24 25 } 26 27 } else if (actor-> actor) { 28 29 ret = actor->actor (req); // 調用rpc請求函數 30 31 } 32 33 } 34 35 if (ret == RPCSVC_ACTOR_ERROR) { // 出錯 36 37 ret = rpcsvc_error_reply (req); // 回復客戶端rpc請求處理出錯 38 39 }
?
上面代碼首先根據接收到的信息建立一個請求對象,然后根據建立的請求對象判斷是都已經成功接納此次 rpc 請求調用,如果是就繼續執行函數 rpcsvc_program_actor ,這個函數會根據程序號、函數號等信息查找對應的 rpc 請求的遠程過程調用,如果找到就執行相應的函數調用。我們分析的是客戶端發送一條創建邏輯卷的命令道服務器端,根據服務器端在啟動初始化的過程中注冊的程序集中我們能夠找到如下一條對應的函數信息:
1 [GLUSTER_CLI_CREATE_VOLUME] = { " CLI_CREATE_VOLUME " , GLUSTER_CLI_CREATE_VOLUME, glusterd_handle_create_volume, NULL,NULL},
?
所以服務器端就會調用函數 glusterd_handle_create_volume ,如果在處理 rpc 請求的過程中遇到錯誤就會向客戶端發送一個錯誤信息的相應消息。當然如果調用成功的話也同樣會返回給客戶端一個相應的結果信息。客戶端會接收服務器端的回復,然后根據消息內容做相應的處理,如:創建成功等提示信息。這樣一次完整的 rpc 通信就完成了。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
