我需要的 pthread 線程集結點功能,使用同一集結點的線程將通過 rend_wait 函數等待,當集結點到達指定數量的線程后同時激發繼續執行。使用 pthread 的 mutex 和 cond 超輕量實現。下面 rend.h 是集結點實現,rendezvous.c 是測試應用。
- /*
- *rend.h
- *
- *Createdon:2009-11-14
- *Author:liuzy(lzy.dev@gmail.com)
- */
- #ifndefREND_H_
- #defineREND_H_
- #include<pthread.h>
- #include<assert.h>
- struct rend_t{
- volatile int count;
- pthread_mutex_tcount_lock;
- pthread_cond_tready;
- };
- #defineDECLARE_REND(name,count)/
- struct rend_tname={(count),PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER}
- int rend_init( struct rend_t*prend, int count){
- int ret=0;
- assert(prend);
- prend->count=count;
- if ((ret=pthread_mutex_init(&prend->count_lock,NULL)))
- return ret;
- if ((ret=pthread_cond_init(&prend->ready,NULL)))
- return ret;
- return EXIT_SUCCESS;
- }
- int rend_wait( struct rend_t*prend){
- int ret=0;
- assert(prend);
- if ((ret=pthread_mutex_lock(&prend->count_lock)))
- return ret;
- /*checkcountvalueisreadytoweakupblockcode*/
- if (prend->count==1){
- if ((ret=pthread_cond_broadcast(&prend->ready)))
- return ret;
- if ((ret=pthread_mutex_unlock(&prend->count_lock)))
- return ret;
- } else {
- prend->count--;
- ret=pthread_cond_wait(&prend->ready,&prend->count_lock);
- prend->count++;
- if (ret){
- pthread_mutex_unlock(&prend->count_lock);
- return ret;
- }
- if ((ret=pthread_mutex_unlock(&prend->count_lock)))
- return ret;
- }
- return EXIT_SUCCESS;
- }
- int rend_free( struct rend_t*prend){
- int ret=0;
- assert(prend);
- prend->count=0;
- if ((ret=pthread_mutex_destroy(&prend->count_lock)))
- return ret;
- if ((ret=pthread_cond_destroy(&prend->ready)))
- return ret;
- return EXIT_SUCCESS;
- }
- #endif/*REND_H_*/
rend 使用更簡單:
- 定義/初始化 rend_t 集結點對象。DECLARE_REND 宏用于靜態定義,rend_init 函數可以對動態創建的集結點結構初始化;
- pthread 線程通過調用 rend_wait 函數 P/V 集結狀態。集結關系的線程要 P/V 在同一個 rend_t 集結對象上;
- 釋放集結對象,rend_free 函數。
以上函數都是成功返回 0,出錯返回 errno 值(非 0)。
- /*
- ==============================
- Name:rendezvous.c
- Author:liuzy(lzy.dev@gmail.com)
- Version:0.1
- ==============================
- */
- #include<stdio.h>
- #include<stdlib.h>
- #include<stdarg.h>/*va_list*/
- #include<unistd.h>
- #include<string.h>
- #include<errno.h>/*errno*/
- #include<syslog.h>/*forsyslog(2)andlevel*/
- #include<pthread.h>
- #include"rend.h"
- static int daemon_proc=0; /*forsysloginerr_doit*/
- #defineMAXLINE4096/*maxtextlinelength*/
- void err_doit( int errnoflag, int level, const char *fmt, va_list ap){
- char buf[MAXLINE+1]={0};
- int errno_save=errno,n=0;
- #ifdefHAVE_VSNPRINTF
- vsnprintf(buf,MAXLINE,fmt,ap);
- #else
- vsprintf(buf,fmt,ap);
- #endif/*HAVE_VSNPRINTF*/
- n=strlen(buf);
- if (errnoflag)
- snprintf(buf+n,MAXLINE-n, ":%s" ,strerror(errno_save));
- strcat(buf, "/n" );
- if (daemon_proc){
- syslog(level, "%s" ,buf);
- } else {
- fflush(stdout);
- fputs(buf,stderr);
- fflush(stderr);
- }
- return ;
- }
- void err_msg( const char *fmt,...){
- va_list ap;
- va_start(ap,fmt);
- err_doit(0,LOG_INFO,fmt,ap);
- va_end(ap);
- return ;
- }
- void err_sys( const char *fmt,...){
- va_list ap;
- va_start(ap,fmt);
- err_doit(1,LOG_ERR,fmt,ap);
- va_end(ap);
- exit(EXIT_FAILURE);
- }
- #defineTHREAD_COUNT100/*rendezvoustestthreadworkers*/
- struct worker_arg{
- int worker_id;
- struct rend_t*prend;
- };
- static void *pthread_worker( void *arg){
- struct worker_arg*parg=( struct worker_arg*)arg;
- err_msg( "worker#%drunning." ,( int )parg->worker_id);
- srand(parg->worker_id*2);
- sleep(rand()%5);
- rend_wait(parg->prend); /*workersrendezvous*/
- err_msg( "worker#%dexiting." ,( int )parg->worker_id);
- return EXIT_SUCCESS;
- }
- int main( void ){
- int idx=0;
- void *exitcode=NULL;
- pthread_tthds[THREAD_COUNT];
- struct worker_argarg[THREAD_COUNT];
- DECLARE_REND(rend,THREAD_COUNT);
- err_msg( "workerscreating." );
- for (idx=0;idx<THREAD_COUNT;idx++){
- arg[idx].prend=&rend;
- arg[idx].worker_id=idx;
- if (pthread_create(thds+idx,NULL,pthread_worker,( void *)&arg[idx]))
- err_sys( "worker#%dcreateerror." ,idx);
- }
- puts( "workersexiting." );
- for (idx=0;idx<THREAD_COUNT;idx++)
- if (pthread_join(thds[idx],&exitcode)||(exitcode!=EXIT_SUCCESS))
- err_msg( "worker#%dexiterror." ,idx);
- err_msg( "alldone.exit0." );
- rend_free(&rend);
- return EXIT_SUCCESS;
- }
看了下 semaphore os syscall 及其 infrastructure,也許以后還需要進程間(非 pthread)集結時用得上。kernel 實現的超強啊,呵呵~
// 2009.11.17 14:34 添加 ////
futex在非競爭情況下可從用戶空間獲取和釋放,不需要進入內核。與信號量類似,它有一個可以原子增減的計數器,進程可以等待計數器值變為正數。用戶進程通過系統調用對資源的競爭作一個公斷。
futex 是一個用戶空間的整數值,被多個線程或進程共享。Futex的系統調用對該整數值時進行操作,仲裁競爭的訪問。 glibc中的NPTL庫封裝了futex 系統調用,對futex接口進行了抽象。用戶通過NPTL庫像傳統編程一樣地使用線程同步API函數,而不會感覺到futex的存在。
futex 的實現機制是:如果當前進程訪問臨界區時,該臨界區正被另一個進程使用,當前進程將鎖用一個值標識,表示“有一個等待者正掛起”,并且調用 sys_futex(FUTEX_WAIT)等待其他進程釋放它。內核在內部創建futex隊列,以便以后與喚醒者匹配等待者。當臨界區擁有者線程釋放了 futex,它通過變量值發出通知表示還有多個等待者在掛起,并調用系統調用sys_futex(FUTEX_WAKE)喚醒它們。一旦所有等待者已獲取資源并釋放鎖時,futex回到非競爭狀態,并沒有內核狀態與它相關。
robust futex是為了解決futex鎖崩潰而對futex進行了增強。例如:當一個進程在持有pthread_mutex_t鎖正與其他進程發生競爭時,進程因某種意外原因而提前退出,如:進程發生段錯誤,或者被用戶用shell命令kill -9-ed”強行退出,此時,需要有一種機制告訴等待者“鎖的最一個持有者已經非正常地退出”。“
為了解決此類問題,NPTL創建了robust mutex用戶空間API pthread_mutex_lock(),如果鎖的擁有者進程提前退出,pthread_mutex_lock()返回一個錯誤值,新的擁有者進程可以決定是否可以安全恢復被鎖保護的數據。
有幾點不還不理解:
- “futex 如果說是一個用戶空間的整數值,那怎么被多個進程共享?Futex 系統調用在 kernel 態怎么操作該值并仲裁競爭?這是那種直接映射到 userspace 的 kernel 地址么。 這個需要程序間通過 mmap 在共享段中訪問,與 futex 沒什么關系。
- 這個“robust futex”機制指的應該就是 SVRx 傳統 sem IPC 里的 SEM_UNDO flag 吧?
一篇不錯的文章,引發對 glibc nptl 實現源碼的探索:
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
