亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

流式計算之Storm簡介

系統 1830 0

Storm是一個分布式的、容錯的實時計算系統,遵循Eclipse Public License 1.0,Storm可以方便地在一個計算機集群中編寫與擴展復雜的實時計算,Storm之于實時處理,就好比Hadoop之于批處理。Storm保證每個消息都會得到處理,而且它很快——在一個小集群中,每秒可以處理數以百萬計的消息。可以使用任意編程語言來做開發。
主要商業應用及案例:Twitter
Storm的優點
1. 簡單的編程模型。類似于MapReduce降低了并行批處理復雜性,Storm降低了進行實時處理的復雜性。
2. 服務化,一個服務框架,支持熱部署,即時上線或下線App.
3. 可以使用各種編程語言。你可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。
4. 容錯性。Storm會管理工作進程和節點的故障。
5. 水平擴展。計算是在多個線程、進程和服務器之間并行進行的。
6. 可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。
7. 快速。系統的設計保證了消息能得到快速的處理,使用ZeroMQ作為其底層消息隊列。
8. 本地模式。Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群。這讓你可以快速進行開發和單元測試。


Storm目前存在的問題

1. ? 目前的開源版本中只是單節點 Nimbus ,掛掉只能自動重啟,可以考慮實現一個雙 nimbus 的布局。
2. Clojure
是一個在 JVM 平臺運行的動態函數式編程語言 , 優勢在于流程計算, ? Storm 的部分核心內容由 Clojure 編寫,雖然性能上提高不少但同時也提升了維護成本。

Storm 架構

Storm 集群由一個主節點和多個工作節點組成。主節點運行了一個名為 “Nimbus” 的守護進程,用于分配代碼、布置任務及故障檢測。每個工作節點都運行了一個名為 “Supervisor” 的守護進程,用于監聽工作,開始并終止工作進程。 Nimbus Supervisor 都能快速失敗,而且是無狀態的,這樣一來它們就變得十分健壯,兩者的協調工作是由 Zookeeper 來完成的。 ZooKeeper 用于管理集群中的不同組件, ZeroMQ 是內部消息系統, JZMQ ZeroMQMQ Java Binding 。有個名為 storm-deploy 的子項目,可以在 AWS 上一鍵部署 Storm 集群 .

流式計算之Storm簡介

Storm 術語解釋

Storm 的術語包括 Stream Spout Bolt Task Worker Stream Grouping Topology Stream 是被處理的數據。 Sprout 是數據源。 Bolt 處理數據。 Task 是運行于 Spout Bolt 中的 ? 線程。 Worker 是運行這些線程的進程。 Stream Grouping 規定了 Bolt 接收什么東西作為輸入數據。數據可以隨機分配(術語為 Shuffle ),或者根據字段值分配(術語為 Fields ),或者 ? 廣播(術語為 All ),或者總是發給一個 Task (術語為 Global ),也可以不關心該數據(術語為 None ),或者由自定義邏輯來決定(術語為 Direct )。 Topology 是由 Stream Grouping 連接起來的 Spout Bolt 節點網絡 . 下面進行詳細介紹:

  • Topologies ? 用于封裝一個實時計算應用程序的邏輯,類似于 Hadoop MapReduce Job

流式計算之Storm簡介

  • Stream ? 消息流,是一個沒有邊界的 tuple 序列,這些 tuples 會被以一種分布式的方式并行地創建和處理
  • Spouts ? 消息源,是消息生產者,他會從一個外部源讀取數據并向 topology 里面面發出消息: tuple
  • Bolts ? 消息處理者,所有的消息處理邏輯被封裝在 bolts 里面,處理輸入的數據流并產生輸出的新數據流 , 可執行過濾,聚合,查詢數據庫等操作

流式計算之Storm簡介

  • Task ? 每一個 Spout Bolt 會被當作很多 task 在整個集群里面執行 , 每一個 task 對應到一個線程 .

流式計算之Storm簡介

  • Stream groupings ? 消息分發策略 , 定義一個 Topology 的其中一步是定義每個 tuple 接受什么樣的流作為輸入 ,stream grouping 就是用來定義一個 stream 應該如果分配給 Bolts .

stream grouping 分類

1. Shuffle Grouping: ? 隨機分組, ? 隨機派發 stream 里面的 tuple ? 保證每個 bolt 接收到的 tuple 數目相同 .
2. Fields Grouping
:按字段分組, ? 比如按 userid 來分組, ? 具有同樣 userid tuple 會被分到相同的 Bolts ? 而不同的 userid 則會被分配到不同的 Bolts.
3. All Grouping
? 廣播發送, ? 對于每一個 tuple ? 所有的 Bolts 都會收到 .
4. Global Grouping:
? 全局分組,這個 tuple 被分配到 storm 中的一個 bolt 的其中一個 task. 再具體一點就是分配給 id 值最低的那個 task.
5. Non Grouping:
? 不分組,意思是說 stream 不關心到底誰會收到它的 tuple. 目前他和 Shuffle grouping 是一樣的效果 , 有點不同的是 storm 會把這個 bolt 放到這個 bolt 的訂閱者同一個線程去執行 .
6. Direct Grouping:
? 直接分組 , 這是一種比較特別的分組方法,用這種分組意味著消息的發送者舉鼎由消息接收者的哪個 task 處理這個消息 . 只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法 . 而且這種消息 tuple 必須使用 emitDirect 方法來發射 . 消息處理者可以通過 TopologyContext 來或者處理它的消息的 taskid (OutputCollector.emit 方法也會返回 taskid)

Storm 如何保證消息被處理

storm 保證每個 tuple 會被 topology 完整的執行。 storm 會追蹤由每個 spout tuple 所產生的 tuple ( 一個 bolt 處理一個 tuple 之后可能會發射別的 tuple 從而可以形成樹狀結構 ), ? 并且跟蹤這棵 tuple 樹什么時候成功處理完。每個 topology 都有一個消息超時的設置, ? 如果 storm 在這個超時的時間內檢測不到某個 tuple 樹到底有沒有執行成功, ? 那么 topology 會把這個 tuple 標記為執行失敗,并且過一會會重新發射這個 tuple

一個 tuple 能根據新獲取到的 spout 而觸發創建基于此的上千個 tuple

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",

?????????????????????????????????????22133,

?????????????????????????????????????"sentence_queue",

?????????????????????????????????????new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

????????.shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

????????.fieldsGrouping(2, new Fields("word"));

這個 topology kestrel queue 讀取句子 , 并把句子劃分成單詞 , 然后匯總每個單詞出現的次數 , 一個 tuple 負責讀取句子 , 每一個 tuple 分別對應計算每一個單詞出現的次數 , 大概樣子如下所示 :

流式計算之Storm簡介

一個 tuple 的生命周期 :

public interface ISpout extends Serializable {

????void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

????void close();

????void nextTuple();

????void ack(Object msgId);

????void fail(Object msgId);

}

首先 storm 通過調用 spout nextTuple 方法來獲取下一個 tuple, Spout 通過 open 方法參數里面提供的 SpoutOutputCollector 來發射新 tuple 到它的其中一個輸出消息流 , ? 發射 tuple 的時候 spout 會提供一個 message-id, ? 后面我們通過這個 tuple-id 來追蹤這個 tuple 。舉例來說, ? KestrelSpout kestrel 隊列里面讀取一個消息,并且把 kestrel 提供的消息 id 作為 message-id, ? 看例子:

collector.emit(new Values("field1", "field2", 3) , msgId);

?

接下來, ? 這個發射的 tuple 被傳送到消息處理者 bolt 那里, ? storm 會跟蹤這個消息的樹形結構是否創建 , 根據 messageid 調用 Spout 里面的 ack 函數以確認 tuple 是否被完全處理。如果 tuple 超時就會調用 spout fail 方法。由此看出同一個 tuple 不管是 acked 還是 fail 都是由創建他的那個 spout 發出的 , 所以即使 spout 在集群環境中執行了很多的 task, 這個 tule 也不會被不同的 task acked failed.
kestrelspout kestrel 隊列中得到一個消息后會打開這個他 , 這意味著他并不會把此消息拿走 , 消息的狀態會顯示為 pending, 直到等待確認此消息已經處理完成 , 處于 pending 狀態直到 ack 或者 fail 被調用 , 處于 "Pending" 的消息不會再被其他隊列消費者使用 . 如果在這過程中 spout 中處理此消息的 task 斷開連接或失去響應則此 pending 的消息會回到 " 等待處理 " 狀態 .

Storm 的一些常用應用場景

1. 流聚合
流聚合把兩個或者多個數據流聚合成一個數據流 ? ? 基于一些共同的 tuple 字段。

builder.setBolt(5, new MyJoiner(), parallelism)

??.fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))

??.fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))

??.fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))

?

2. 批處理
有時候為了性能或者一些別的原因, ? 你可能想把一組 tuple 一起處理, ? 而不是一個個單獨處理。

3.BasicBolt
1.
? 讀一個輸入 tuple
2.
? 根據這個輸入 tuple 發射一個或者多個 tuple
3.
? execute 的方法的最后 ack 那個輸入 tuple
遵循這類模式的 bolt 一般是函數或者是過濾器 , ? 這種模式太常見, storm 為這類模式單獨封裝了一個接口 : IbasicBolt

4. 內存內緩存 +Fields grouping 組合
bolt 的內存里面緩存一些東西非常常見。緩存在和 fields grouping 結合起來之后就更有用了。比如,你有一個 bolt 把短鏈接變成長鏈接 (bit.ly, t.co 之類的 ) 。你可以把短鏈接到長鏈接的對應關系利用 LRU 算法緩存在內存里面以避免重復計算。比如組件一發射短鏈接,組件二把短鏈接轉化成長鏈接并緩存在內存里面。看一下下面兩段代碼有什么不一樣:

builder.setBolt(2, new ExpandUrl(), parallelism)

??.shuffleGrouping(1);

builder.setBolt(2, new ExpandUrl(), parallelism)

??.fieldsGrouping(1, new Fields("url"));

5. 計算 top N
比如你有一個 bolt 發射這樣的 tuple: "value", "count" 并且你想一個 bolt 基于這些信息算出 top N tuple 。最簡單的辦法是有一個 bolt 可以做一個全局的 grouping 的動作并且在內存里面保持這 top N 的值。
這個方式對于大數據量的流顯然是沒有擴展性的, ? 因為所有的數據會被發到同一臺機器。一個更好的方法是在多臺機器上面并行的計算這個流每一部分的 top N, ? 然后再有一個 bolt 合并這些機器上面所算出來的 top N 以算出最后的 top N, ? 代碼大概是這樣的 :

builder.setBolt(2, new RankObjects(), parallellism)

??.fieldsGrouping(1, new Fields("value"));

builder.setBolt(3, new MergeObjects())

??.globalGrouping(2);

這個模式之所以可以成功是因為第一個 bolt fields grouping 使得這種并行算法在語義上是正確的。
TimeCacheMap 來高效地保存一個最近被更新的對象的緩存

6. TimeCacheMap 來高效地保存一個最近被更新的對象的緩存
有時候你想在內存里面保存一些最近活躍的對象,以及那些不再活躍的對象。 ? TimeCacheMap ? 是一個非常高效的數據結構,它提供了一些 callback 函數使得我們在對象不再活躍的時候我們可以做一些事情 .

7. 分布式 RPC:CoordinatedBolt KeyedFairBolt
storm 做分布式 RPC 應用的時候有兩種比較常見的模式 : 它們被封裝在 CoordinatedBolt KeyedFairBolt 里面 . CoordinatedBolt 包裝你的 bolt, 并且確定什么時候你的 bolt 已經接收到所有的 tuple, 它主要使用 Direct Stream 來做這個 .
KeyedFairBolt
同樣包裝你的 bolt 并且保證你的 topology 同時處理多個 DRPC 調用,而不是串行地一次只執行一個。

流式計算之Storm簡介


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 最新国产精品精品视频 | 99国产精品久久久久久久... | 免费在线欧美 | 婷婷热| 成人亚洲欧美综合 | 亚洲综合日韩欧美一区二区三 | 久久久久久免费视频 | 亚洲精品国产第一区二区多人 | 免费看久久 | 天天艹夜夜艹 | 中文字幕在线影院 | 中文字幕亚洲一区 | 日韩欧美高清在线 | 日本午夜影院 | 欧美成人精品一级高清片 | 国产精品国产亚洲精品不卡 | 在线观看精品国产 | 色中涩 | 神马色片 | 97在线免费观看视频 | 日本成人一区二区 | 九九热线精品视频6一 | 2046影院视频大全在线观看 | 农村寡妇一级毛片免费看视频 | 91视频网页版 | 中文字幕亚洲欧美日韩高清 | 成人三级视频在线观看 | 欧美一级毛片免费看高清 | 热99re久久国超精品首页 | 国内精品久久久久久久影视麻豆 | 亚洲精品国产第一区第二区国 | 97影院午夜午夜伦不卡 | 欧美日韩国产在线 | 四虎影院视频 | 亚洲日本人成网站在线观看 | 欧美中文字幕在线看 | 97干干| 九九热线| 日日干狠狠干 | 久在线视频| 午夜一区|