亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

JAVA線程池代碼淺析

系統 1746 0

1. ?????? ExecutorService
JAVA線程池代碼淺析
?
Java 1.5 開始正式提供了并發包 , 而這個并發包里面除了原子變量 ,synchronizer, 并發容器 , 另外一個非常重要的特性就是線程池 . 對于線程池的意義 , 我們這邊不再多說 .

上圖是線程池的主體類圖 ,ThreadPoolExecutor 是應用最為廣泛的一個線程池實現 ( 我也將在接下來的文字中詳細描述我對這個類的理解和執行機制 ),ScheduledThreadPoolExecutor 則在 ThreadPoolExecutor 上提供了定時執行的等附加功能 , 這個可以從 ScheduledExecutorService 接口的定義中看出來 .Executors 則類似工廠方法 , 提供了幾個非常常用的線程池初始化方法 .

ThreadPoolExecutor

這個類繼承了 AbstractExecutorService 抽象類 , AbstractExecutorService 主要的職責有 2 部分 , 一部分定義和實現提交任務的方法 (3 submit 方法的實現 ) , 實例化 FutureTask 并且交給子類執行 , 另外一部分實現 invokeAny,invokeAll 方法 . 留給子類的方法為 execute 方法 , 也就是 Executor 接口定義的方法 .

// 實例化一個FutureTask,交給子類的execute方法執行.這種設計能夠保證callable和runnable的執行接口方法的一致性(FutureTask包裝了這個差別)
public ? < T > ?Future < T > ?submit(Runnable?task,?T?result)? {
????
if ?(task? == ? null )? throw ? new ?NullPointerException();
????RunnableFuture
< T > ?ftask? = ?newTaskFor(task,?result);
????execute(ftask);
????
return ?ftask;
}


protected ? < T > ?RunnableFuture < T > ?newTaskFor(Runnable?runnable,?T?value)? {
????
return ? new ?FutureTask < T > (runnable,?value);
}

關于 FutureTask 這個類的實現 , 我在前面的 JAVA LOCK 代碼淺析有講過其實現原理 , 主要的思想就是關注任務完成與未完成的狀態 , 任務提交線程 get() 結果時被 park , 等待任務執行完成被喚醒 , 任務執行線程在任務執行完畢后設置結果 , 并且 unpark 對應線程并且讓其得到執行結果 .

回到 ThreadPoolExecutor .ThreadPoolExecutor 需要實現除了我們剛才說的 execute(Runnable command) 方法外 , 還得實現 ExecutorService 接口定義的部分方法 . ThreadPoolExecutor 所提供的不光是這些 , 以下根據我的理解來列一下它所具有的特性
1. ?????? execute 流程
2. ??????
3. ?????? 工作隊列
4. ?????? 飽和拒絕策略
5. ?????? 線程工廠
6. ?????? beforeExecute afterExecute 擴展

execute 方法的實現有個機制非常重要 , 當當前線程池線程數量小于 corePoolSize, 那么生成一個新的 worker 并把提交的任務置為這個工作線程的頭一個執行任務 , 如果大于 corePoolSize, 那么會試著將提交的任務塞到 workQueue 里面供線程池里面的worker稍后執行 , 并不是直接再起一個 worker, 但是當 workQueue 也滿 , 并且當前線程池小于 maxPoolSize, 那么起一個新的 worker 并將該任務設為該 worker 執行的第一個任務執行 , 大于 maxPoolSize,workQueue 也滿負荷 , 那么調用飽和策略里面的行為 .

worker 線程在執行完一個任務之后并不會立刻關閉 , 而是嘗試著去 workQueue 里面取任務 , 如果取不到 , 根據策略關閉或者保持空閑狀態 . 所以 submit 任務的時候 , 提交的順序為 核心線程池 ------ 工作隊列 ------ 擴展線程池 .

池包括核心池
, 擴展池 (2 者的線程在同一個 hashset 中,這里只是為了方便才這么稱呼,并不是分離的 ), 核心池在池內 worker 沒有用完的情況下 , 只要有任務提交都會創建新的線程 , 其代表線程池正常處理任務的能力 . 擴展池 , 是在核心線程池用完 , 并且工作隊列也已排滿任務的情況下才會開始初始化線程 , 其代表的是線程池超出正常負載時的解決方案 , 一旦任務完成 , 并且試圖從 workQueue 取不到任務 , 那么會比較當前線程池與核心線程池的大小 , 大于核心線程池數的 worker 將被銷毀 .

Runnable?getTask()? {
????
for ?(;;)? {
????????
try ? {
????????????
int ?state? = ?runState;
????????????
// >SHUTDOWN就是STOP或者TERMINATED
????????????
// 直接返回
???????????? if ?(state? > ?SHUTDOWN)
????????????????
return ? null ;
????????????Runnable?r;
????????????
// 如果是SHUTDOWN狀態,那么取任務,如果有
??????????????
// 將剩余任務執行完畢,否則就結束了
???????????? if ?(state? == ?SHUTDOWN)?? // ?Help?drain?queue
????????????????r? = ?workQueue.poll();
????????????
// 如果不是以上狀態的(也就是RUNNING狀態的),那么如果當前池大于核心池數量,
????????????
// 或者允許核心線程池取任務超時就可以關閉,那么從任務隊列取任務,
????????????
// 如果超出keepAliveTime,那么就返回null了,也就意味著這個worker結束了
???????????? else ? if ?(poolSize? > ?corePoolSize? || ?allowCoreThreadTimeOut)
????????????????r?
= ?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS);
????????????
// 如果當前池小于核心池,并且不允許核心線程池取任務超時就關閉,那么take(),直到拿到任務或者被interrupt
???????????? else
????????????????r?
= ?workQueue.take();
????????????
// 如果經過以上判定,任務不為空,那么返回任務
???????????? if ?(r? != ? null )
????????????????
return ?r;
????????????
// 如果取到任務為空,那么判定是否可以退出
???????????? if ?(workerCanExit())? {
????????????????
// 如果整個線程池狀態變為SHUTDOWN或者TERMINATED,那么將所有worker?interrupt?(如果正在執行,那繼續讓其執行)
???????????????? if ?(runState? >= ?SHUTDOWN)? // ?Wake?up?others
????????????????????interruptIdleWorkers();
????????????????
return ? null ;
????????????}

????????????
// ?Else?retry
????????}
? catch ?(InterruptedException?ie)? {
????????????
// ?On?interruption,?re-check?runState
????????}

}

????}


// worker從workQueue中取不到數據的時候調用此方法,以決定自己是否跳出取任務的無限循環,從而結束此worker的運行
private ? boolean ?workerCanExit()? {
????
final ?ReentrantLock?mainLock? = ? this .mainLock;
????mainLock.lock();
????
boolean ?canExit;
????
try ? {
????????
/**/ /*
????????*線程池狀態為stop或者terminated,
????????*或者任務隊列里面任務已經為空,
????????*或者允許線程池線程空閑超時(實現方式是從工作隊列拿最多keepAliveTime的任務,超過這個時間就返回null了)并且
?????????*當前線程池大于corePoolSize(>1)
????????*那么允許線程結束
????????*static?final?int?RUNNING????=?0;
????????*static?final?int?SHUTDOWN???=?1;
????????*static?final?int?STOP???????=?2;
????????*static?final?int?TERMINATED?=?3;
????????
*/

????????canExit?
= ?runState? >= ?STOP? ||
????????workQueue.isEmpty()?
||
???????(allowCoreThreadTimeOut?
&&
????????poolSize?
> ?Math.max( 1 ,corePoolSize));
????}
? finally ? {
????????mainLock.unlock();
????}

????
return ?canExit;
}


當提交任務是 , 線程池都已滿 , 并且工作隊列也無空閑位置的情況下 ,ThreadPoolExecutor 會執行 reject 操作 ,JDK 提供了四種 reject 策略 , 包括 AbortPolicy( 直接拋 RejectedException Exception),CallerRunsPolicy( 提交任務線程自己執行 , 當然這時剩余任務也將無法提交 ),DiscardOldestPolicy( 將線程池的 workQueue 任務隊列里面最老的任務剔除 , 將新任務丟入 ),DiscardPolicy( 無視 , 忽略此任務 , 并且立即返回 ). 實例化 ThreadPoolExecutor , 如果不指定任何飽和策略 , 默認將使用 AbortPolicy.

個人認為這些飽和策略并不十分理想
, 特別是在應用既要保證快速 , 又要高可用的情況下 , 我的想法是能夠加入超時等待策略 , 也就是提交線程時線程池滿 , 能夠 park 住提交任務的線程 , 一旦有空閑 , 能在第一時間通知到等待線程 . 這個實際上和主線程執行相似 , 但是主線程執行期間即使線程池有大量空閑也不會立即可以提交任務 , 效率上后者可能會比較低 , 特別是執行慢速任務 .

實例化 Worker 的時候會調用 ThreadFactory addThread(Runnable r) 方法返回一個 Thread, 這個線程工廠是可以在 ThreadPoolExecutor 實例化的時候指定的 , 如果不指定 , 那么將會使用 DefaultThreadFactory, 這個也就是提供給使用者命名線程 , 線程歸組 , 是否是 demon 等線程相關屬性設置的機會 .

beforeExecute afterExecute 是提供給使用者擴展的 , 這兩個方法會在 worker runTask 之前和 run 完畢之后分別調用 .JDK 注釋里 Doug Lea(concurrent 包作者 ) 展示了 beforeExecute 一個很有趣的示例 . 代碼如下 .

class ?PausableThreadPoolExecutor? extends ?ThreadPoolExecutor? {
????
private ? boolean ?isPaused;
????
private ?ReentrantLock?pauseLock? = ? new ?ReentrantLock();
????
private ?Condition?unpaused? = ?pauseLock.newCondition();
?
public ?PausableThreadPoolExecutor( )? {? super ( );?}

protected ? void ?beforeExecute(Thread?t,?Runnable?r)? {
????
super .beforeExecute(t,?r);
????pauseLock.lock();
????
try ? {
????????
while ?(isPaused)?unpaused.await();
????}
? catch ?(InterruptedException?ie)? {
????????t.interrupt();
????}
? finally ? {
????????pauseLock.unlock();
????}

}

?
public ? void ?pause()? {
????pauseLock.lock();
????
try ? {
????????isPaused?
= ? true ;
????}
? finally ? {
????????pauseLock.unlock();
????}

}


public ? void ?resume()? {
????pauseLock.lock();
????
try ? {
????????isPaused?
= ? false ;
????????unpaused.signalAll();
????}
? finally ? {
????????pauseLock.unlock();
????}

}

??}

使用這個線程池 , 用戶可以隨時調用 pause 中止剩余任務執行 , 當然也可以使用 resume 重新開始執行剩余任務 .

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor
是一個很實用的類 , 它的實現核心是基于 DelayedWorkQueue. ScheduledThreadPoolExecutor 的繼承結構上來看 , 各位應該能夠看出些端倪來 , 就是 ScheduledThreadPoolExecutor ThreadPoolExecutor 中的任務隊列設置成了 DelayedWorkQueue, 這也就是說 , 線程池 Worker 從任務隊列中取的一個任務 , 需要等待這個隊列中最短超時任務的超時 , 也就是實現定時的效果 . 所以 ScheduledThreadPoolExecutor 所做的工作其實是比較少的 . 主要就是實現任務的實例化并加入工作隊列 , 以及支持 scheduleAtFixedRate scheduleAtFixedDelay 這種周期性任務執行 .

public ?ScheduledThreadPoolExecutor( int ?corePoolSize,ThreadFactory?threadFactory)? {
???????????
super (corePoolSize,?Integer.MAX_VALUE,? 0 ,?TimeUnit.NANOSECONDS, new ?DelayedWorkQueue(),?threadFactory);
}

對于 scheduleAfFixedRate scheduleAtFiexedDelay 這種周期性任務支持 , 是由 ScheduledThreadPoolExecutor 內部封裝任務的 ScheduledFutureTask 來實現的 . 這個類在執行任務后 , 對于周期性任務 , 它會處理周期時間 , 并將自己再次丟入線程池的工作隊列 , 從而達到周期執行的目的 .
private ? void ?runPeriodic()? {
?????????
boolean ?ok? = ?ScheduledFutureTask. super .runAndReset();
???????? ?
boolean ?down? = ?isShutdown();
?????????
// ?Reschedule?if?not?cancelled?and?not?shutdown?or?policy?allows
????? if ?(ok? && ?( ! down? || (getContinueExistingPeriodicTasksAfterShutdownPolicy()? && ? ! isStopped())))? {
???????????????
long ?p? = ?period;
???????????????
if ?(p? > ? 0 )
??????????? ????????? time?
+= ?p;
???????????????
else
??????????? ????????? time?
= ?triggerTime( - p);
?????
??????????????? ScheduledThreadPoolExecutor.
super .getQueue().add( this );
?????????}

????????
// ?This?might?have?been?the?final?executed?delayed
???????
// ?task.??Wake?up?threads?to?check.
??????? else ? if ?(down)
???????????? ?interruptIdleWorkers();
}

?

2. ?????? CompletionService

JAVA線程池代碼淺析
ExecutorCompletionService

CompletionService 定義了線程池執行任務集 , 可以依次拿到任務執行完畢的 Future,ExecutorCompletionService 是其實現類 , 先舉個例子 , 如下代碼 , 這個例子中 , 需要注意 ThreadPoolExecutor 核心池一定保證能夠讓任務提交并且馬上執行 , 而不是放到等待隊列中去 , 那樣次序將會無法控制 ,CompletionService 也將失去效果 ( 其實核心池中的任務完成順序還是準確的 ).

public ? static ? void ?main(String[]?args)? throws ?InterruptedException,?ExecutionException {
????ThreadPoolExecutor?es
= new ?ThreadPoolExecutor( 10 ,? 15 ,? 2000 ,?TimeUnit.MILLISECONDS,? new ?ArrayBlockingQueue < Runnable > ( 10 ), new ?ThreadPoolExecutor.AbortPolicy());
????CompletionService
< String > ?cs = new ?ExecutorCompletionService < String > (es);????
????cs.submit(
new ?Callable < String > ()? {
?????@Override
?????
public ?String?call()? throws ?Exception? Codehi
分享到:
評論

JAVA線程池代碼淺析


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 老头与老头同性tube可播放 | 久久精品五月天 | xxxxbbbb性猛hd高清| 蜜桃精品免费久久久久影院 | 欧美激情精品久久久久久久久久 | a网在线 | 亚洲高清国产一线久久 | 成人网在线免费观看 | 欧美影院一区二区三区 | 欧美成人爽毛片在线视频 | 性欧美4k高清精品 | 久久综合久久伊人 | 日韩操| 中国一级毛片欧美一级毛片 | 99久久精品久久久 | 深夜啪啪网站 | 久久中文网 | 欧美一级色 | 国产操片| 男人边吃奶边爱边做视频日韩 | 国产精品久久久久激情影院 | 深夜影院深a久久 | 一级毛片免费不卡 | h片在线播放免费高清 | 97综合视频 | 99se亚洲综合色区 | 免费的一级片网站 | 国产精品日韩欧美一区二区三区 | 福利视频欧美一区二区三区 | 青青草国产97免久久费观看 | 国产不卡福利 | 久久久久久久久一级毛片 | 草久视频在线观看 | 四虎免费影院4hu永久免费 | 亚洲国产高清在线精品一区 | 成人a大片高清在线观看 | 国产精品久久久久久久久久久不卡 | 日本a∨在线观看 | 国产欧美精品 | 四虎澳门永久8848在线影院 | 一区二区三区高清 |