1.
??????
ExecutorService
?
Java
從
1.5
開始正式提供了并發包
,
而這個并發包里面除了原子變量
,synchronizer,
并發容器
,
另外一個非常重要的特性就是線程池
.
對于線程池的意義
,
我們這邊不再多說
.
上圖是線程池的主體類圖 ,ThreadPoolExecutor 是應用最為廣泛的一個線程池實現 ( 我也將在接下來的文字中詳細描述我對這個類的理解和執行機制 ),ScheduledThreadPoolExecutor 則在 ThreadPoolExecutor 上提供了定時執行的等附加功能 , 這個可以從 ScheduledExecutorService 接口的定義中看出來 .Executors 則類似工廠方法 , 提供了幾個非常常用的線程池初始化方法 .
ThreadPoolExecutor
這個類繼承了 AbstractExecutorService 抽象類 , AbstractExecutorService 主要的職責有 2 部分 , 一部分定義和實現提交任務的方法 (3 個 submit 方法的實現 ) , 實例化 FutureTask 并且交給子類執行 , 另外一部分實現 invokeAny,invokeAll 方法 . 留給子類的方法為 execute 方法 , 也就是 Executor 接口定義的方法 .











關于
FutureTask
這個類的實現
,
我在前面的
JAVA LOCK
代碼淺析有講過其實現原理
,
主要的思想就是關注任務完成與未完成的狀態
,
任務提交線程
get()
結果時被
park
住
,
等待任務執行完成被喚醒
,
任務執行線程在任務執行完畢后設置結果
,
并且
unpark
對應線程并且讓其得到執行結果
.
回到
ThreadPoolExecutor
類
.ThreadPoolExecutor
需要實現除了我們剛才說的
execute(Runnable command)
方法外
,
還得實現
ExecutorService
接口定義的部分方法
.
但
ThreadPoolExecutor
所提供的不光是這些
,
以下根據我的理解來列一下它所具有的特性
1.
??????
execute
流程
2.
??????
池
3.
??????
工作隊列
4.
??????
飽和拒絕策略
5.
??????
線程工廠
6.
??????
beforeExecute
和
afterExecute
擴展
execute
方法的實現有個機制非常重要
,
當當前線程池線程數量小于
corePoolSize,
那么生成一個新的
worker
并把提交的任務置為這個工作線程的頭一個執行任務
,
如果大于
corePoolSize,
那么會試著將提交的任務塞到
workQueue
里面供線程池里面的worker稍后執行
,
并不是直接再起一個
worker,
但是當
workQueue
也滿
,
并且當前線程池小于
maxPoolSize,
那么起一個新的
worker
并將該任務設為該
worker
執行的第一個任務執行
,
大于
maxPoolSize,workQueue
也滿負荷
,
那么調用飽和策略里面的行為
.
worker
線程在執行完一個任務之后并不會立刻關閉
,
而是嘗試著去
workQueue
里面取任務
,
如果取不到
,
根據策略關閉或者保持空閑狀態
.
所以
submit
任務的時候
,
提交的順序為
核心線程池
------
工作隊列
------
擴展線程池
.
池包括核心池
,
擴展池
(2
者的線程在同一個
hashset
中,這里只是為了方便才這么稱呼,并不是分離的
),
核心池在池內
worker
沒有用完的情況下
,
只要有任務提交都會創建新的線程
,
其代表線程池正常處理任務的能力
.
擴展池
,
是在核心線程池用完
,
并且工作隊列也已排滿任務的情況下才會開始初始化線程
,
其代表的是線程池超出正常負載時的解決方案
,
一旦任務完成
,
并且試圖從
workQueue
取不到任務
,
那么會比較當前線程池與核心線程池的大小
,
大于核心線程池數的
worker
將被銷毀
.































































當提交任務是
,
線程池都已滿
,
并且工作隊列也無空閑位置的情況下
,ThreadPoolExecutor
會執行
reject
操作
,JDK
提供了四種
reject
策略
,
包括
AbortPolicy(
直接拋
RejectedException Exception),CallerRunsPolicy(
提交任務線程自己執行
,
當然這時剩余任務也將無法提交
),DiscardOldestPolicy(
將線程池的
workQueue
任務隊列里面最老的任務剔除
,
將新任務丟入
),DiscardPolicy(
無視
,
忽略此任務
,
并且立即返回
).
實例化
ThreadPoolExecutor
時
,
如果不指定任何飽和策略
,
默認將使用
AbortPolicy.
個人認為這些飽和策略并不十分理想
,
特別是在應用既要保證快速
,
又要高可用的情況下
,
我的想法是能夠加入超時等待策略
,
也就是提交線程時線程池滿
,
能夠
park
住提交任務的線程
,
一旦有空閑
,
能在第一時間通知到等待線程
.
這個實際上和主線程執行相似
,
但是主線程執行期間即使線程池有大量空閑也不會立即可以提交任務
,
效率上后者可能會比較低
,
特別是執行慢速任務
.
實例化
Worker
的時候會調用
ThreadFactory
的
addThread(Runnable r)
方法返回一個
Thread,
這個線程工廠是可以在
ThreadPoolExecutor
實例化的時候指定的
,
如果不指定
,
那么將會使用
DefaultThreadFactory,
這個也就是提供給使用者命名線程
,
線程歸組
,
是否是
demon
等線程相關屬性設置的機會
.
beforeExecute 和 afterExecute 是提供給使用者擴展的 , 這兩個方法會在 worker runTask 之前和 run 完畢之后分別調用 .JDK 注釋里 Doug Lea(concurrent 包作者 ) 展示了 beforeExecute 一個很有趣的示例 . 代碼如下 .









































使用這個線程池
,
用戶可以隨時調用
pause
中止剩余任務執行
,
當然也可以使用
resume
重新開始執行剩余任務
.
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
是一個很實用的類
,
它的實現核心是基于
DelayedWorkQueue.
從
ScheduledThreadPoolExecutor
的繼承結構上來看
,
各位應該能夠看出些端倪來
,
就是
ScheduledThreadPoolExecutor
將
ThreadPoolExecutor
中的任務隊列設置成了
DelayedWorkQueue,
這也就是說
,
線程池
Worker
從任務隊列中取的一個任務
,
需要等待這個隊列中最短超時任務的超時
,
也就是實現定時的效果
.
所以
ScheduledThreadPoolExecutor
所做的工作其實是比較少的
.
主要就是實現任務的實例化并加入工作隊列
,
以及支持
scheduleAtFixedRate
和
scheduleAtFixedDelay
這種周期性任務執行
.























2. ?????? CompletionService

ExecutorCompletionService
CompletionService 定義了線程池執行任務集 , 可以依次拿到任務執行完畢的 Future,ExecutorCompletionService 是其實現類 , 先舉個例子 , 如下代碼 , 這個例子中 , 需要注意 ThreadPoolExecutor 核心池一定保證能夠讓任務提交并且馬上執行 , 而不是放到等待隊列中去 , 那樣次序將會無法控制 ,CompletionService 也將失去效果 ( 其實核心池中的任務完成順序還是準確的 ).






發表評論
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

評論