Storm是一個分布式的、容錯的實時計算系統,遵循Eclipse Public License 1.0,Storm可以方便地在一個計算機集群中編寫與擴展復雜的實時計算,Storm之于實時處理,就好比Hadoop之于批處理。Storm保證每個消息都會得到處理,而且它很快——在一個小集群中,每秒可以處理數以百萬計的消息。可以使用任意編程語言來做開發。
主要商業應用及案例:Twitter
Storm的優點
1. 簡單的編程模型。類似于MapReduce降低了并行批處理復雜性,Storm降低了進行實時處理的復雜性。
2. 服務化,一個服務框架,支持熱部署,即時上線或下線App.
3. 可以使用各種編程語言。你可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。
4. 容錯性。Storm會管理工作進程和節點的故障。
5. 水平擴展。計算是在多個線程、進程和服務器之間并行進行的。
6. 可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。
7. 快速。系統的設計保證了消息能得到快速的處理,使用ZeroMQ作為其底層消息隊列。
8. 本地模式。Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群。這讓你可以快速進行開發和單元測試。
Storm目前存在的問題
1.
?
目前的開源版本中只是單節點
Nimbus
,掛掉只能自動重啟,可以考慮實現一個雙
nimbus
的布局。
2. Clojure
是一個在
JVM
平臺運行的動態函數式編程語言
,
優勢在于流程計算,
?
Storm
的部分核心內容由
Clojure
編寫,雖然性能上提高不少但同時也提升了維護成本。
Storm 集群由一個主節點和多個工作節點組成。主節點運行了一個名為 “Nimbus” 的守護進程,用于分配代碼、布置任務及故障檢測。每個工作節點都運行了一個名為 “Supervisor” 的守護進程,用于監聽工作,開始并終止工作進程。 Nimbus 和 Supervisor 都能快速失敗,而且是無狀態的,這樣一來它們就變得十分健壯,兩者的協調工作是由 Zookeeper 來完成的。 ZooKeeper 用于管理集群中的不同組件, ZeroMQ 是內部消息系統, JZMQ 是 ZeroMQMQ 的 Java Binding 。有個名為 storm-deploy 的子項目,可以在 AWS 上一鍵部署 Storm 集群 .
Storm 的術語包括 Stream 、 Spout 、 Bolt 、 Task 、 Worker 、 Stream Grouping 和 Topology 。 Stream 是被處理的數據。 Sprout 是數據源。 Bolt 處理數據。 Task 是運行于 Spout 或 Bolt 中的 ? 線程。 Worker 是運行這些線程的進程。 Stream Grouping 規定了 Bolt 接收什么東西作為輸入數據。數據可以隨機分配(術語為 Shuffle ),或者根據字段值分配(術語為 Fields ),或者 ? 廣播(術語為 All ),或者總是發給一個 Task (術語為 Global ),也可以不關心該數據(術語為 None ),或者由自定義邏輯來決定(術語為 Direct )。 Topology 是由 Stream Grouping 連接起來的 Spout 和 Bolt 節點網絡 . 下面進行詳細介紹:
- Topologies ? 用于封裝一個實時計算應用程序的邏輯,類似于 Hadoop 的 MapReduce Job
- Stream ? 消息流,是一個沒有邊界的 tuple 序列,這些 tuples 會被以一種分布式的方式并行地創建和處理
- Spouts ? 消息源,是消息生產者,他會從一個外部源讀取數據并向 topology 里面面發出消息: tuple
- Bolts ? 消息處理者,所有的消息處理邏輯被封裝在 bolts 里面,處理輸入的數據流并產生輸出的新數據流 , 可執行過濾,聚合,查詢數據庫等操作
- Task ? 每一個 Spout 和 Bolt 會被當作很多 task 在整個集群里面執行 , 每一個 task 對應到一個線程 .
- Stream groupings ? 消息分發策略 , 定義一個 Topology 的其中一步是定義每個 tuple 接受什么樣的流作為輸入 ,stream grouping 就是用來定義一個 stream 應該如果分配給 Bolts 們 .
stream grouping 分類
1. Shuffle Grouping:
?
隨機分組,
?
隨機派發
stream
里面的
tuple
,
?
保證每個
bolt
接收到的
tuple
數目相同
.
2. Fields Grouping
:按字段分組,
?
比如按
userid
來分組,
?
具有同樣
userid
的
tuple
會被分到相同的
Bolts
,
?
而不同的
userid
則會被分配到不同的
Bolts.
3. All Grouping
:
?
廣播發送,
?
對于每一個
tuple
,
?
所有的
Bolts
都會收到
.
4. Global Grouping:
?
全局分組,這個
tuple
被分配到
storm
中的一個
bolt
的其中一個
task.
再具體一點就是分配給
id
值最低的那個
task.
5. Non Grouping:
?
不分組,意思是說
stream
不關心到底誰會收到它的
tuple.
目前他和
Shuffle grouping
是一樣的效果
,
有點不同的是
storm
會把這個
bolt
放到這個
bolt
的訂閱者同一個線程去執行
.
6. Direct Grouping:
?
直接分組
,
這是一種比較特別的分組方法,用這種分組意味著消息的發送者舉鼎由消息接收者的哪個
task
處理這個消息
.
只有被聲明為
Direct Stream
的消息流可以聲明這種分組方法
.
而且這種消息
tuple
必須使用
emitDirect
方法來發射
.
消息處理者可以通過
TopologyContext
來或者處理它的消息的
taskid (OutputCollector.emit
方法也會返回
taskid)
Storm 如何保證消息被處理
storm 保證每個 tuple 會被 topology 完整的執行。 storm 會追蹤由每個 spout tuple 所產生的 tuple 樹 ( 一個 bolt 處理一個 tuple 之后可能會發射別的 tuple 從而可以形成樹狀結構 ), ? 并且跟蹤這棵 tuple 樹什么時候成功處理完。每個 topology 都有一個消息超時的設置, ? 如果 storm 在這個超時的時間內檢測不到某個 tuple 樹到底有沒有執行成功, ? 那么 topology 會把這個 tuple 標記為執行失敗,并且過一會會重新發射這個 tuple 。
一個 tuple 能根據新獲取到的 spout 而觸發創建基于此的上千個 tuple
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",
?????????????????????????????????????22133,
?????????????????????????????????????"sentence_queue",
?????????????????????????????????????new StringScheme()));
builder.setBolt(2, new SplitSentence(), 10)
????????.shuffleGrouping(1);
builder.setBolt(3, new WordCount(), 20)
????????.fieldsGrouping(2, new Fields("word"));
這個 topology 從 kestrel queue 讀取句子 , 并把句子劃分成單詞 , 然后匯總每個單詞出現的次數 , 一個 tuple 負責讀取句子 , 每一個 tuple 分別對應計算每一個單詞出現的次數 , 大概樣子如下所示 :
一個 tuple 的生命周期 :
public interface ISpout extends Serializable {
????void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
????void close();
????void nextTuple();
????void ack(Object msgId);
????void fail(Object msgId);
}
首先 storm 通過調用 spout 的 nextTuple 方法來獲取下一個 tuple, Spout 通過 open 方法參數里面提供的 SpoutOutputCollector 來發射新 tuple 到它的其中一個輸出消息流 , ? 發射 tuple 的時候 spout 會提供一個 message-id, ? 后面我們通過這個 tuple-id 來追蹤這個 tuple 。舉例來說, ? KestrelSpout 從 kestrel 隊列里面讀取一個消息,并且把 kestrel 提供的消息 id 作為 message-id, ? 看例子:
collector.emit(new Values("field1", "field2", 3) , msgId);
?
接下來,
?
這個發射的
tuple
被傳送到消息處理者
bolt
那里,
?
storm
會跟蹤這個消息的樹形結構是否創建
,
根據
messageid
調用
Spout
里面的
ack
函數以確認
tuple
是否被完全處理。如果
tuple
超時就會調用
spout
的
fail
方法。由此看出同一個
tuple
不管是
acked
還是
fail
都是由創建他的那個
spout
發出的
,
所以即使
spout
在集群環境中執行了很多的
task,
這個
tule
也不會被不同的
task
去
acked
或
failed.
當
kestrelspout
從
kestrel
隊列中得到一個消息后會打開這個他
,
這意味著他并不會把此消息拿走
,
消息的狀態會顯示為
pending,
直到等待確認此消息已經處理完成
,
處于
pending
狀態直到
ack
或者
fail
被調用
,
處于
"Pending"
的消息不會再被其他隊列消費者使用
.
如果在這過程中
spout
中處理此消息的
task
斷開連接或失去響應則此
pending
的消息會回到
"
等待處理
"
狀態
.
1.
流聚合
流聚合把兩個或者多個數據流聚合成一個數據流
?
—
?
基于一些共同的
tuple
字段。
builder.setBolt(5, new MyJoiner(), parallelism)
??.fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))
??.fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))
??.fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))
?
2.
批處理
有時候為了性能或者一些別的原因,
?
你可能想把一組
tuple
一起處理,
?
而不是一個個單獨處理。
3.BasicBolt
1.
?
讀一個輸入
tuple
2.
?
根據這個輸入
tuple
發射一個或者多個
tuple
3.
?
在
execute
的方法的最后
ack
那個輸入
tuple
遵循這類模式的
bolt
一般是函數或者是過濾器
,
?
這種模式太常見,
storm
為這類模式單獨封裝了一個接口
: IbasicBolt
4.
內存內緩存
+Fields grouping
組合
在
bolt
的內存里面緩存一些東西非常常見。緩存在和
fields grouping
結合起來之后就更有用了。比如,你有一個
bolt
把短鏈接變成長鏈接
(bit.ly, t.co
之類的
)
。你可以把短鏈接到長鏈接的對應關系利用
LRU
算法緩存在內存里面以避免重復計算。比如組件一發射短鏈接,組件二把短鏈接轉化成長鏈接并緩存在內存里面。看一下下面兩段代碼有什么不一樣:
builder.setBolt(2, new ExpandUrl(), parallelism)
??.shuffleGrouping(1);
builder.setBolt(2, new ExpandUrl(), parallelism)
??.fieldsGrouping(1, new Fields("url"));
5.
計算
top N
比如你有一個
bolt
發射這樣的
tuple: "value", "count"
并且你想一個
bolt
基于這些信息算出
top N
的
tuple
。最簡單的辦法是有一個
bolt
可以做一個全局的
grouping
的動作并且在內存里面保持這
top N
的值。
這個方式對于大數據量的流顯然是沒有擴展性的,
?
因為所有的數據會被發到同一臺機器。一個更好的方法是在多臺機器上面并行的計算這個流每一部分的
top N,
?
然后再有一個
bolt
合并這些機器上面所算出來的
top N
以算出最后的
top N,
?
代碼大概是這樣的
:
builder.setBolt(2, new RankObjects(), parallellism)
??.fieldsGrouping(1, new Fields("value"));
builder.setBolt(3, new MergeObjects())
??.globalGrouping(2);
這個模式之所以可以成功是因為第一個
bolt
的
fields grouping
使得這種并行算法在語義上是正確的。
用
TimeCacheMap
來高效地保存一個最近被更新的對象的緩存
6.
用
TimeCacheMap
來高效地保存一個最近被更新的對象的緩存
有時候你想在內存里面保存一些最近活躍的對象,以及那些不再活躍的對象。
?
TimeCacheMap
?
是一個非常高效的數據結構,它提供了一些
callback
函數使得我們在對象不再活躍的時候我們可以做一些事情
.
7.
分布式
RPC:CoordinatedBolt
和
KeyedFairBolt
用
storm
做分布式
RPC
應用的時候有兩種比較常見的模式
:
它們被封裝在
CoordinatedBolt
和
KeyedFairBolt
里面
. CoordinatedBolt
包裝你的
bolt,
并且確定什么時候你的
bolt
已經接收到所有的
tuple,
它主要使用
Direct Stream
來做這個
.
KeyedFairBolt
同樣包裝你的
bolt
并且保證你的
topology
同時處理多個
DRPC
調用,而不是串行地一次只執行一個。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
