注:本文主要內容摘自筆者所著的《多核計算與程序設計》一書,略有修改,后續還會繼續發布系列文章,如有需要,可以考慮將一下地址加入到您的瀏覽器收藏夾中:http://software.intel.com/zh-cn/blogs/category/multicore/。
1、基本思想
動態任務調度可以將一系列分解好的任務進行并行運行,并取得一定程度的負載均衡。動態任務調度的最大作用就是用它來做并行計算。動態任務調度有多種方法,一般可以使用分布式隊列【1】來實現,下面講解一種最簡單的嵌套型任務調度的實現方法。
對于嵌套型任務,通常都有一個或多個開始任務,其他任務的產生都源于這些開始任務。
調度的方法為,每個線程由一個本地隊列,另外由一個所有線程共享的隊列。當每個線程產生n個新任務后,先檢查本地隊列是否為空,如果為空,則放入一個任務到本地隊列中。然后檢查共享隊列是否滿,如果未滿則將其他任務放入共享隊列中,否則放入到本地隊列中。
上面這個調度方法實際上和CDistributeQueue【1】中的進隊操作方法是一樣的,因此可以使用CDistributeQueue來實現嵌套型動態任務的調度。
一般來說,嵌套型動態任務調度會遇到以下一些問題 :
- 1、 初始時可能只有一個任務運行,此種情況下只能有一個線程運行,其他線程必須掛起。當動態任務產生后,需要喚醒掛起的線程進行執行。
- 2、 由于每個任務中會產生新的任務,因此每個任務既是消費者,同時也是生產者。在操作本地隊列時,比非嵌套型任務調度更加方便,如何將本地隊列的操作最大化是首要考慮的問題。
根據上面的思想,下面設計一個CNestTaskScheduler類來實現對嵌套型動態任務的調度。
2、CNestTaskScheduler類的設計和實現
CNestTaskScheduler類的定義如下:
class CNestTaskScheduler {
private:
??? CThreadPool???? m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);
??? CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;
??? THREADFUNC????? m_StartFunc;? //為線程池使用的線程入口函數指針
??? LONG? volatile? m_lTaskId;??? //Task Id,用于判斷是否喚醒對應的線程
?
public:
??? CNestTaskScheduler();
??? virtual ~CNestTaskScheduler();
?
??? //下面兩個函數為調度器本身直接使用
??? void SetStartFunc(THREADFUNC StartFunc);
int GetTask(TASK &Task);
??? CThreadPool & GetThreadPool();
??? LONG AtomicIncrementTaskId();
?
??? //下面三個函數為調度器的使用者使用
??? void SpawnLocalTask(TASK &Task);
??? void SpawnTask(TASK &Task);
??? void BeginRootThread(TASK &Task);
};
類中的主要三個接口為
??? void SpawnLocalTask(TASK &Task);
??? void SpawnTask(TASK &Task);
void BeginRootThread(TASK &Task);
?
SpawnLocalTask()的主要作用是將動態生成的任務放入線程的本地隊列中;SpawnTask()的作用是將動態產生的任務放入分布式隊列中,當然任務有可能被放入本地隊列,也有可能被放入共享隊列中;BeginRootThread()的作用是啟動初始的任務。
1) BeginRootTask() 的處理流程
BeginRootTask() 的處理流程較簡單,它先創建線程池,接著將一個原始任務放入到第 0 個線程的本地隊列中,然后執行第 0 個線程,最后等待所有線程執行完。處理流程如下圖所示:
?
?
? 圖 1 嵌套型任務 BeginRootTask() 處理流程圖
BeginRootTask() 的代碼如下:
/** ?? 嵌套任務調度的開始根線程函數
?
???????? @param ? TASK &Task - 要執行的最初任務
???????? @return ? void - 無 ?
*/
void CNestTaskScheduler::BeginRootThread(TASK &Task)
{
??? m_lTaskId = 0;
?
??? m_ThreadPool . CreateThreadPool ( m_StartFunc , this, 0);
??? m_DQueue . PushToLocalQueue ( Task , 0);
?? ? m_ThreadPool . ExecThread ( 0 ); ?
??? m_ThreadPool . WaitAllThread ();
}
?
BeginRootTask() 執行后,只有第 0 個線程被執行了,線程池中的其他線程都是處于掛起狀態。實際上在第 0 個線程的處理過程中,它會繼續調用 SpawnTask() , SpawnTask() 中需要判斷是否有線程被掛起,如果有則需要喚醒掛起的線程,下面就來看看 SpawnTask() 的詳細處理過程。
2) SpawnTask() 的處理流程
SpawnTask() 的功能主要是將任務放入到分布式隊列中。由于在 BeginRootThread() 中只執行了第 0 個線程,其他線程都處于掛起狀態,因此這個函數中還需要喚醒其他被掛起的線程,整個處理流程如下圖所示:
? 圖 2 嵌套型任務 SpawnLocalTask() 處理流程圖
根據上面的處理流程, SpawnLocalTask() 的代碼實現如下:
/** ?? 嵌套任務調度的生成任務函數
??? 生成的任務被放入到分布式隊列中
?
???????? @param ? TASK &Task - 待執行的任務
???????? @return ? void - 無 ?
*/
void CNestTaskScheduler::SpawnTask(TASK &Task)
{
??? if ( m_lTaskId < m_ThreadPool . GetThreadCount () )
??? {
??????? // 依次喚醒各個掛起的線程
??????? LONG Id = AtomicIncrement (& m_lTaskId );
??????? if ( Id < m_ThreadPool . GetThreadCount () )
??????? {
??????????? // 下面之所以可以對其他線程的本地隊列進行無同步的操作,是因為
??????????? // 訪問這些隊列的線程在進隊操作之后才開始運行
??????????? m_DQueue . PushToLocalQueue ( Task , Id );
??????????? m_ThreadPool . ExecThread ( Id );
??????? }
??????? else
??????? {
??????????? m_DQueue . EnQueue ( Task );
??????? }
??? }
??? else
??? {
??????? // 先判斷偷取隊列是否滿,如果未滿則放入偷取隊列中
??????? // 如果滿了則放入本地隊列中
??????? m_DQueue . EnQueue ( Task );
??? }
};
在處理喚醒其他線程的過程中,采用了原子操作來實現,當變量 m_lTaskId 的值小于給定線程數量時,表明還有線程被掛起,因此將任務放入對應被掛起線程的本地隊列中,然后再喚醒并執行對應被掛起的線程。
當任務被放入分布式隊列后,線程池中的各個線程是如何處理分布式隊列中的任務的呢?下面就來看看線程池的入口函數的處理過程。
?
?
?
?
3、 CNestTaskScheduler 使用方法
注:完整的 CNestTaskScheduler 的源代碼,請到 CAPI 開源項目進行下載,下載地址為: http://gforge.osdn.net.cn/projects/capi
?
下面以一個區間遞歸分拆為例講解如何使用 CNestTaskScheduler 。首先需要寫一個任務處理入口函數,代碼如下:
struct RANGE {
??? int begin ;
??? int end ;
};
?
CNestTaskScheduler ? * pTaskSched = NULL ;
?
/** ?? 任務處理入口函數
???????? 將一個大的區間均分成兩個更小的區間
?
???????? @param ? void *args - 參數,實際為 RANGE 類型 ??????
???????? @return ? unsigned int WINAPI - 總是返回 CAPI_SUCCESS ???
*/
unsigned int WINAPI RootTask (void * args )
{
??? RANGE ? * p = ( RANGE *) args ;
??? if ( p != NULL )
??? {
????? ? ? printf ( "Range: %ld - %ld\n" , p -> begin , p -> end );
??????? if ( p -> end - p -> begin < 128 )
??????? {
??????????? // 當區間大小小于時,不再進行分拆
??????????? delete p ;
??????????? return 0;
??????? }
??????? int mid = ( p -> begin + p -> end + 1) / 2;
??????? RANGE * range1 , * range2 ;
?
??????? range1 = new RANGE ;
??????? range2 = new RANGE ;
?
????? ?? range1 -> begin = p -> begin ;
??????? range1 -> end = mid - 1;
??????? range2 -> begin = mid ;
??????? range2 -> end = p -> end ;
?
??????? TASK t1 , t2 ;
??????? t1 . pArg = range1 ;
??????? t2 . pArg = range2 ;
??????? t1 . func = RootTask ;
??????? t2 . func = RootTask ;
?
?????? ? pTaskSched -> SpawnLocalTask ( t1 );
??????? pTaskSched -> SpawnTask ( t2 );
?
??????? delete p ;
??? }
??? return 1;
}
?
任務處理函數 RootTask() 中,先將一個大區間拆分成兩個更小的區間,然后將每個區間看成一個新的任務,得到兩個新的任務 t1 、 t2 ,然后調用 SpawnLocalTask() 將任務 t1 放進任務調度器的分布式隊列的本地隊列中。如果拆分后的區間小于給定的大小,就不再分拆。
下面的代碼演示了如何調用 CNestTaskScheduler 類來對一個 0 ~ 1023 的區間進行并行拆分。
void main ( void )
{
?
??? TASK ??? task ;
??? RANGE ?? * pRange = new RANGE ;
?
??? pRange -> begin = 0;
??? pRange -> end = 1023;
?
??? task.func = RootTask ;
??? task . pArg = pRange ;
?
??? pTaskSched = new CNestTaskScheduler ;
???
??? pTaskSched -> BeginRootThread ( task );
?
??? delete pTaskSched ;
?
}
上面程序執行后,打印的結果如下,從打印結果可以看出整個程序執行中進行的分拆過程。
Range: 0 - 1023
Range: 0 - 511
Range: 512 - 1023
Range: 0 - 255
Range: 512 - 767
Range: 0 - 127
Range: 512 - 639
Range: 256 - 511
Range: 768 - 1023
Range: 256 - 383
Range: 768 - 895
Range: 128 - 255
Range: 640 - 767
Range: 384 - 511
Range: 896 – 1023
?
當然,我們需要用任務調度來實現并行計算,下面就來講一個具體的用任務調度進行并行快速排序的實例。
?
?
3) 線程池入口函數處理流程
線程池入口函數的處理在一個循環中進行,每次循環中,從分布式隊列中獲取任務,然后執行任務的啟動入口函數,如果從分布式隊列中獲取任務失敗,則認為所有任務被處理完,此時需要判斷是否還有掛起的線程,有則需要將掛起線程執行起來讓其退出,然后退出循環并結束當前線程。
? 圖 3 ? 線程池入口函數處理流程圖
?
/** ?? 嵌套任務調度的獲取任務函數
?
???????? @param ? TASK &Task - 接收從分布式隊列中獲取的任務
???????? @return ? int - 成功返回 CAPI_SUCCESS, 失敗返回 CAPI_FAILED. ??????
*/
int CNestTaskScheduler::GetTask(TASK &Task)
{
??? // 先從本地隊列獲取任務
??? // 本地獲取任務失敗后從偷取隊列獲取任務
??? return m_DQueue . DeQueue ( Task );
};
?
/** ?? 嵌套任務調度的線程池入口函數
?
???????? @param ? void *pArgs - CNestTaskScheduler 類型的參數 ?????
???????? @return ? unsigned int WINAPI - 返回 ???
*/
unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)
{
??? CNestTaskScheduler ? * pSched = ( CNestTaskScheduler *) pArgs ;
?
??? TASK ??? Task ;
??? int ???? nRet ;
?
??? for ( ;; )
??? {
??????? nRet = pSched -> GetTask ( Task );
??????? if ( nRet == CAPI_FAILED )
??????? {
??????????? CThreadPool &ThreadPool = pSched->GetThreadPool();
???????????
??????????? // 喚醒一個掛起的線程 , 防止任務數量小于 CPU 核數時,
??????????? // 仍然有任務處于掛起狀態 , 從而導致 WaitAllThread() 處于死等狀態
??????????? // 這個喚醒過程是一個串行的過程,被喚醒的任務會繼續喚醒一個掛起線程
??????????? LONG Id = pSched->AtomicIncrementTaskId();
??????????? if ( Id < ThreadPool.GetThreadCount() )
??????????? {
??????????????? ThreadPool.ExecThread(Id);
??????????? }
??????? ???? break;
??????? }
??????? (*( Task . func ))( Task . pArg );
??? }
??? return 0;
}
?
在上面的線程入口處理函數 NestTaskScheduler_StartFunc() 中,當獲取任務失敗時,表明所有任務都處理完畢。此時需要考慮一種特殊情況,即任務總數量小于線程數量的情況。由于線程池 CThreadPool 采用預創建線程的方法,所有預創建的線程初始處于掛起狀態,獲取任務失敗后,可能還有若干線程沒有被分配到任務,仍然處于掛起狀態。必須將這些掛起的任務恢復執行讓其退出,否則 WaitAllThread() 函數將處于死等狀態。
NestTaskScheduler_StartFunc() 在處理喚醒掛起的線程的方法是逐個喚醒的方法,當有某個執行線程獲取任務失敗后,它先喚醒一個被掛起的線程,然后這個被喚醒的線程執行后,它也會執行 NestTaskScheduler_StartFunc() 函數,當然它獲取任務會失敗,接著它也會喚醒一個被掛起的線程,這樣一直下去,所有被掛起線程都會被喚醒并被退出。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
