亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

多核中的動態任務調度

系統 1553 0

注:本文主要內容摘自筆者所著的《多核計算與程序設計》一書,略有修改,后續還會繼續發布系列文章,如有需要,可以考慮將一下地址加入到您的瀏覽器收藏夾中:http://software.intel.com/zh-cn/blogs/category/multicore/。

1、基本思想

動態任務調度可以將一系列分解好的任務進行并行運行,并取得一定程度的負載均衡。動態任務調度的最大作用就是用它來做并行計算。動態任務調度有多種方法,一般可以使用分布式隊列【1】來實現,下面講解一種最簡單的嵌套型任務調度的實現方法。

對于嵌套型任務,通常都有一個或多個開始任務,其他任務的產生都源于這些開始任務。

調度的方法為,每個線程由一個本地隊列,另外由一個所有線程共享的隊列。當每個線程產生n個新任務后,先檢查本地隊列是否為空,如果為空,則放入一個任務到本地隊列中。然后檢查共享隊列是否滿,如果未滿則將其他任務放入共享隊列中,否則放入到本地隊列中。

上面這個調度方法實際上和CDistributeQueue【1】中的進隊操作方法是一樣的,因此可以使用CDistributeQueue來實現嵌套型動態任務的調度。

一般來說,嵌套型動態任務調度會遇到以下一些問題

  • 1、 初始時可能只有一個任務運行,此種情況下只能有一個線程運行,其他線程必須掛起。當動態任務產生后,需要喚醒掛起的線程進行執行。
  • 2、 由于每個任務中會產生新的任務,因此每個任務既是消費者,同時也是生產者。在操作本地隊列時,比非嵌套型任務調度更加方便,如何將本地隊列的操作最大化是首要考慮的問題。

根據上面的思想,下面設計一個CNestTaskScheduler類來實現對嵌套型動態任務的調度。

2、CNestTaskScheduler類的設計和實現

CNestTaskScheduler類的定義如下:

class CNestTaskScheduler {

private:

??? CThreadPool???? m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);

??? CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;

??? THREADFUNC????? m_StartFunc;? //為線程池使用的線程入口函數指針

??? LONG? volatile? m_lTaskId;??? //Task Id,用于判斷是否喚醒對應的線程

?

public:

??? CNestTaskScheduler();

??? virtual ~CNestTaskScheduler();

?

??? //下面兩個函數為調度器本身直接使用

??? void SetStartFunc(THREADFUNC StartFunc);

int GetTask(TASK &Task);

??? CThreadPool & GetThreadPool();

??? LONG AtomicIncrementTaskId();

?

??? //下面三個函數為調度器的使用者使用

??? void SpawnLocalTask(TASK &Task);

??? void SpawnTask(TASK &Task);

??? void BeginRootThread(TASK &Task);

};

類中的主要三個接口為

??? void SpawnLocalTask(TASK &Task);

??? void SpawnTask(TASK &Task);

void BeginRootThread(TASK &Task);

?

SpawnLocalTask()的主要作用是將動態生成的任務放入線程的本地隊列中;SpawnTask()的作用是將動態產生的任務放入分布式隊列中,當然任務有可能被放入本地隊列,也有可能被放入共享隊列中;BeginRootThread()的作用是啟動初始的任務。

1) BeginRootTask() 的處理流程

BeginRootTask() 的處理流程較簡單,它先創建線程池,接著將一個原始任務放入到第 0 個線程的本地隊列中,然后執行第 0 個線程,最后等待所有線程執行完。處理流程如下圖所示:

?

?

? 1 嵌套型任務 BeginRootTask() 處理流程圖

BeginRootTask() 的代碼如下:

/** ?? 嵌套任務調度的開始根線程函數

?

???????? @param ? TASK &Task - 要執行的最初任務

???????? @return ? void - ?

*/

void CNestTaskScheduler::BeginRootThread(TASK &Task)

{

??? m_lTaskId = 0;

?

??? m_ThreadPool . CreateThreadPool ( m_StartFunc , this, 0);

??? m_DQueue . PushToLocalQueue ( Task , 0);

?? ? m_ThreadPool . ExecThread ( 0 ); ?

??? m_ThreadPool . WaitAllThread ();

}

?

BeginRootTask() 執行后,只有第 0 個線程被執行了,線程池中的其他線程都是處于掛起狀態。實際上在第 0 個線程的處理過程中,它會繼續調用 SpawnTask() SpawnTask() 中需要判斷是否有線程被掛起,如果有則需要喚醒掛起的線程,下面就來看看 SpawnTask() 的詳細處理過程。

2) SpawnTask() 的處理流程

SpawnTask() 的功能主要是將任務放入到分布式隊列中。由于在 BeginRootThread() 中只執行了第 0 個線程,其他線程都處于掛起狀態,因此這個函數中還需要喚醒其他被掛起的線程,整個處理流程如下圖所示:

?

? 2 嵌套型任務 SpawnLocalTask() 處理流程圖

根據上面的處理流程, SpawnLocalTask() 的代碼實現如下:

/** ?? 嵌套任務調度的生成任務函數

??? 生成的任務被放入到分布式隊列中

?

???????? @param ? TASK &Task - 待執行的任務

???????? @return ? void - ?

*/

void CNestTaskScheduler::SpawnTask(TASK &Task)

{

??? if ( m_lTaskId < m_ThreadPool . GetThreadCount () )

??? {

??????? // 依次喚醒各個掛起的線程

??????? LONG Id = AtomicIncrement (& m_lTaskId );

??????? if ( Id < m_ThreadPool . GetThreadCount () )

??????? {

??????????? // 下面之所以可以對其他線程的本地隊列進行無同步的操作,是因為

??????????? // 訪問這些隊列的線程在進隊操作之后才開始運行

??????????? m_DQueue . PushToLocalQueue ( Task , Id );

??????????? m_ThreadPool . ExecThread ( Id );

??????? }

??????? else

??????? {

??????????? m_DQueue . EnQueue ( Task );

??????? }

??? }

??? else

??? {

??????? // 先判斷偷取隊列是否滿,如果未滿則放入偷取隊列中

??????? // 如果滿了則放入本地隊列中

??????? m_DQueue . EnQueue ( Task );

??? }

};

在處理喚醒其他線程的過程中,采用了原子操作來實現,當變量 m_lTaskId 的值小于給定線程數量時,表明還有線程被掛起,因此將任務放入對應被掛起線程的本地隊列中,然后再喚醒并執行對應被掛起的線程。

當任務被放入分布式隊列后,線程池中的各個線程是如何處理分布式隊列中的任務的呢?下面就來看看線程池的入口函數的處理過程。

?

?

?

?

3、 CNestTaskScheduler 使用方法

注:完整的 CNestTaskScheduler 的源代碼,請到 CAPI 開源項目進行下載,下載地址為: http://gforge.osdn.net.cn/projects/capi

?

下面以一個區間遞歸分拆為例講解如何使用 CNestTaskScheduler 。首先需要寫一個任務處理入口函數,代碼如下:

struct RANGE {

??? int begin ;

??? int end ;

};

?

CNestTaskScheduler ? * pTaskSched = NULL ;

?

/** ?? 任務處理入口函數

???????? 將一個大的區間均分成兩個更小的區間

?

???????? @param ? void *args - 參數,實際為 RANGE 類型 ??????

???????? @return ? unsigned int WINAPI - 總是返回 CAPI_SUCCESS ???

*/

unsigned int WINAPI RootTask (void * args )

{

??? RANGE ? * p = ( RANGE *) args ;

??? if ( p != NULL )

??? {

????? ? ? printf ( "Range: %ld - %ld\n" , p -> begin , p -> end );

??????? if ( p -> end - p -> begin < 128 )

??????? {

??????????? // 當區間大小小于時,不再進行分拆

??????????? delete p ;

??????????? return 0;

??????? }

??????? int mid = ( p -> begin + p -> end + 1) / 2;

??????? RANGE * range1 , * range2 ;

?

??????? range1 = new RANGE ;

??????? range2 = new RANGE ;

?

????? ?? range1 -> begin = p -> begin ;

??????? range1 -> end = mid - 1;

??????? range2 -> begin = mid ;

??????? range2 -> end = p -> end ;

?

??????? TASK t1 , t2 ;

??????? t1 . pArg = range1 ;

??????? t2 . pArg = range2 ;

??????? t1 . func = RootTask ;

??????? t2 . func = RootTask ;

?

?????? ? pTaskSched -> SpawnLocalTask ( t1 );

??????? pTaskSched -> SpawnTask ( t2 );

?

??????? delete p ;

??? }

??? return 1;

}

?

任務處理函數 RootTask() 中,先將一個大區間拆分成兩個更小的區間,然后將每個區間看成一個新的任務,得到兩個新的任務 t1 t2 ,然后調用 SpawnLocalTask() 將任務 t1 放進任務調度器的分布式隊列的本地隊列中。如果拆分后的區間小于給定的大小,就不再分拆。

下面的代碼演示了如何調用 CNestTaskScheduler 類來對一個 0 1023 的區間進行并行拆分。

void main ( void )

{

?

??? TASK ??? task ;

??? RANGE ?? * pRange = new RANGE ;

?

??? pRange -> begin = 0;

??? pRange -> end = 1023;

?

??? task.func = RootTask ;

??? task . pArg = pRange ;

?

??? pTaskSched = new CNestTaskScheduler ;

???

??? pTaskSched -> BeginRootThread ( task );

?

??? delete pTaskSched ;

?

}

上面程序執行后,打印的結果如下,從打印結果可以看出整個程序執行中進行的分拆過程。

Range: 0 - 1023

Range: 0 - 511

Range: 512 - 1023

Range: 0 - 255

Range: 512 - 767

Range: 0 - 127

Range: 512 - 639

Range: 256 - 511

Range: 768 - 1023

Range: 256 - 383

Range: 768 - 895

Range: 128 - 255

Range: 640 - 767

Range: 384 - 511

Range: 896 – 1023

?

當然,我們需要用任務調度來實現并行計算,下面就來講一個具體的用任務調度進行并行快速排序的實例。

?

?

3) 線程池入口函數處理流程

線程池入口函數的處理在一個循環中進行,每次循環中,從分布式隊列中獲取任務,然后執行任務的啟動入口函數,如果從分布式隊列中獲取任務失敗,則認為所有任務被處理完,此時需要判斷是否還有掛起的線程,有則需要將掛起線程執行起來讓其退出,然后退出循環并結束當前線程。

?

? 3 ? 線程池入口函數處理流程圖

?

/** ?? 嵌套任務調度的獲取任務函數

?

???????? @param ? TASK &Task - 接收從分布式隊列中獲取的任務

???????? @return ? int - 成功返回 CAPI_SUCCESS, 失敗返回 CAPI_FAILED. ??????

*/

int CNestTaskScheduler::GetTask(TASK &Task)

{

??? // 先從本地隊列獲取任務

??? // 本地獲取任務失敗后從偷取隊列獲取任務

??? return m_DQueue . DeQueue ( Task );

};

?

/** ?? 嵌套任務調度的線程池入口函數

?

???????? @param ? void *pArgs - CNestTaskScheduler 類型的參數 ?????

???????? @return ? unsigned int WINAPI - 返回 ???

*/

unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)

{

??? CNestTaskScheduler ? * pSched = ( CNestTaskScheduler *) pArgs ;

?

??? TASK ??? Task ;

??? int ???? nRet ;

?

??? for ( ;; )

??? {

??????? nRet = pSched -> GetTask ( Task );

??????? if ( nRet == CAPI_FAILED )

??????? {

??????????? CThreadPool &ThreadPool = pSched->GetThreadPool();

???????????

??????????? // 喚醒一個掛起的線程 , 防止任務數量小于 CPU 核數時,

??????????? // 仍然有任務處于掛起狀態 , 從而導致 WaitAllThread() 處于死等狀態

??????????? // 這個喚醒過程是一個串行的過程,被喚醒的任務會繼續喚醒一個掛起線程

??????????? LONG Id = pSched->AtomicIncrementTaskId();

??????????? if ( Id < ThreadPool.GetThreadCount() )

??????????? {

??????????????? ThreadPool.ExecThread(Id);

??????????? }

??????? ???? break;

??????? }

??????? (*( Task . func ))( Task . pArg );

??? }

??? return 0;

}

?

在上面的線程入口處理函數 NestTaskScheduler_StartFunc() 中,當獲取任務失敗時,表明所有任務都處理完畢。此時需要考慮一種特殊情況,即任務總數量小于線程數量的情況。由于線程池 CThreadPool 采用預創建線程的方法,所有預創建的線程初始處于掛起狀態,獲取任務失敗后,可能還有若干線程沒有被分配到任務,仍然處于掛起狀態。必須將這些掛起的任務恢復執行讓其退出,否則 WaitAllThread() 函數將處于死等狀態。

NestTaskScheduler_StartFunc() 在處理喚醒掛起的線程的方法是逐個喚醒的方法,當有某個執行線程獲取任務失敗后,它先喚醒一個被掛起的線程,然后這個被喚醒的線程執行后,它也會執行 NestTaskScheduler_StartFunc() 函數,當然它獲取任務會失敗,接著它也會喚醒一個被掛起的線程,這樣一直下去,所有被掛起線程都會被喚醒并被退出。

多核中的動態任務調度


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦?。。?/p>

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 欧美成人免费 | 久久精品操 | 精品免费国产一区二区女 | 四虎成人欧美精品在永久在线 | 日日干日日干 | 久久精品在线免费观看 | 一级毛片人与动免费观看 | 中文字幕免费视频 | 91精品久久国产青草 | 亚洲高清一区二区三区四区 | 国产色婷婷 | 瑟瑟综合| 97在线观免费视频观看 | 国产精品视频一区麻豆 | 亚洲国产精品网 | 国产亚洲精品国产 | 人成午夜视频 | 久久这里只有精品免费看青草 | 亚洲视频精品在线观看 | 日韩欧美国产偷亚洲清高 | 欧美在线成人午夜网站 | 日韩在线中文字幕 | 91精品国产福利尤物免费 | 五月婷亚洲 | 日本工番囗番全彩本子大全 | 亚洲第一人黄所 | www.黄色片网站 | 午夜老司机福利 | 99秒拍福利大尺度视频 | 免费看搡女人的视频 | 亚洲国产中文字幕在线观看 | 欧美中文字幕在线视频 | 男人看的网址 | 国产精品久久香蕉免费播放 | 青草视频网址 | 999久久狠狠免费精品 | 久久免费久久 | 久久国产免费观看精品 | 国产伦一区二区三区四区久久 | 福利色姬网站视频入口 | 国产一级精品毛片 |