Util.concurrent工具包概述
Doug Lea
State University of New York at Oswego
dl@cs.oswego.edu
http://gee.cs.oswego.edu
翻譯:
Cocia Lin( cocia@163.com )
Huihoo.org
原文
http://gee.cs.oswego.edu/dl/cpjslides/util.pdf
要點
-- 目標和結構
-- 主要的接口和實現
Sync : 獲得 / 釋放 (acquire/release) 協議
Channel : 放置 / 取走 (put/take) 協議
Executor : 執行 Runnable 任務
-- 每一個部分都有一些關聯的接口和支持類
-- 簡單的涉及其他的類和特性
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
目標
-- 一些簡單的接口
- 但是覆蓋大部分程序員需要小心處理代碼的問題
-- 高質量實現
- 正確的,保守的,有效率的,可移植的
-- 可能作為將來標準的基礎
- 獲取經驗和收集反饋信息

Sync
-- acquire/release 協議的主要接口
- 用來定制鎖,資源管理,其他的同步用途
- 高層抽象接口
- 沒有區分不同的加鎖用法
-- 實現
- Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore, FIFOSemaphore, PrioritySemaphore
<!-- [if !supportLists]--> n ?? <!-- [endif]--> 還有,有幾個簡單的實現,例如 ObservableSync, LayeredSync
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
獨占鎖
try {
lock.acquire();
try {
action();
}
finally {
lock.release();
}
}
catch (InterruptedException ie) { ... }
-- Java 同步塊不適用的時候使用它
- 超時,回退 (back-off)
- 確保可中斷
- 大量迅速鎖定
- 創建 Posix 風格應用 (condvar)
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
獨占例子
class ParticleUsingMutex {
int x; int y;
final Random rng = new Random();
final Mutex mutex = new Mutex();
public void move() {
try {
mutex.acquire();
try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }
finally { mutex.release(); }
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt(); }
}
public void draw(Graphics g) {
int lx, ly;
try {
mutex.acquire();
try { lx = x; ly = y; }
finally { mutex.release(); }
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt(); return; }
g.drawRect(lx, ly, 10, 10);
}
}
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
回退 (Backoff) 例子
class CellUsingBackoff {
private long val;
private final Mutex mutex = new Mutex();
void swapVal(CellUsingBackoff other)
throws InterruptedException {
if (this == other) return; // alias check
for (;;) {
mutex.acquire();
try {
I f ( other.mutex.attempt(0) ) {
try {
long t = val;
val = other.val;
other.val = t;
return;
}
finally { other.mutex.release(); }
}
}
finally { mutex.release(); };
Thread.sleep(100); // heuristic retry interval
}
}
}
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
讀寫鎖
interface ReadWriteLock {
Sync readLock();
Sync writeLock();
}
-- 管理一對鎖
- 和普通的鎖一樣的使用習慣
-- 對集合類很有用
- 半自動的方式實現 SyncSet, SyncMap, ...
-- 實現者使用不同的鎖策略
- WriterPreference, ReentrantWriterPreference,
ReaderPreference, FIFO
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
ReadWriteLock 例子
-- 示范在讀寫鎖中執行任何 Runnable 的包裝類
class WithRWLock {
final ReadWriteLock rw;
public WithRWLock(ReadWriteLock l) { rw = l; }
public void performRead(Runnable readCommand)
throws InterruptedException {
rw.readLock().acquire();
try { readCommand.run(); }
finally { rw.readlock().release(); }
}
public void performWrite(...) // similar
}
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
閉鎖 (Latch)
-- 閉鎖是開始時設置為 false, 但一旦被設置為 true ,他將永遠保持 true 狀態
- 初始化標志
- 流結束定位
- 線程中斷
- 事件出發指示器
-- CountDown 和他有點類似,不同的是, CountDown 需要一定數量的觸發設置,而不是一次
-- 非常簡單,但是廣泛使用的類
- 替換容易犯錯的開發代碼
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
Latch Example 閉鎖例子
class Worker implements Runnable {
Latch startSignal;
Worker(Latch l) { startSignal = l; }
public void run() {
startSignal.acquire();
// ... doWork();
}
}
class Driver { // ...
void main() {
Latch ss = new Latch();
for (int i = 0; i < N; ++i) // make threads
new Thread(new Worker( ss )).start();
doSomethingElse(); // don’t let run yet
ss.release(); // now let all threads proceed
}
}
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
信號 (Semaphores)
-- 服務于數量有限的占有者
- 使用許可數量構造對象 ( 通常是 0)
- 如果需要一個許可才能獲取,等待,然后取走一個許可
- 釋放的時候將許可添加回來
-- 但是真正的許可并沒有轉移 (But no actual permits change hands.)
- 信號量僅僅保留當前的計數值
-- 應用程序
- 鎖:一個信號量可以被用作互斥體 (mutex)
- 一個獨立的等待緩存或者資源控制的操作
- 設計系統是想忽略底層的系統信號
-- (phores ‘remember’ past signals) 記住已經消失的信號量
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
信號量例子
class Pool {
ArrayList items = new ArrayList();
HashSet busy = new HashSet();
final Semaphore available;
public Pool(int n) {
available = new Semaphore(n);
// ... somehow initialize n items ...;
}
public Object getItem() throws InterruptedException {
available.acquire();
return doGet();
}
public void returnItem(Object x) {
if (doReturn(x)) available.release();
}
synchronized Object doGet() {
Object x = items.remove(items.size()-1);
busy.add(x); // put in set to check returns
return x;
}
synchronized boolean doReturn(Object x) {
return busy.remove(x); // true if was present
}
}
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
屏障 (Barrier)
-- 多部分同步接口
- 每一部分都必須等待其他的分不撞倒屏障
-- CyclicBarrier 類
- CountDown 的一個可以重新設置的版本
- 對于反復劃分算法很有用 (iterative partitioning algorithms)
-- Rendezvous 類
- 一個每部分都能夠和其他部分交換信息的屏障
- 行為類似同時的在一個同步通道上 put 和 take
- 對于資源交換協議很有用 (resource-exchange protocols)
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
通道 (Channel)
-- 為緩沖,隊列等服務的主接口

-- 具體實現
- LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
通道屬性
-- 被定義為 Puttable 和 Takable 的子接口
- 允許安裝生產者 / 消費者模式執行
-- 支持可超時的操作 offer 和 poll
- 當超時值是 0 時,可能會被阻塞
- 所有的方法能夠拋出 InterruptedException 異常
-- 沒有接口需要 size 方法
- 但是一些實現定義了這個方法
- BoundedChannel 有 capacity 方法
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
通道例子
class Service { // ...
final Channel msgQ = new LinkedQueue();
public void serve() throws InterruptedException {
String status = doService();
msgQ.put(status);
}
public Service() { // start background thread
Runnable logger = new Runnable() {
public void run() {
try {
for(;;)
System.out.println( msqQ.take() );
}
catch(InterruptedException ie) {} }
};
new Thread(logger).start();
}
}
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
<!-- [if !supportEmptyParas]-->?<!-- [endif]-->
運行器 (Executor)
-- 類似線程的類的主接口 font-size: 18pt; fon
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
