http://www.cnblogs.com/panfeng412/category/367117.html
本文首先介紹了Storm的基本概念和數據流模型,然后結合一個典型應用場景來說明Storm支持Topology之間數據流訂閱的必要性,最后對比了Storm與另一個流處理系統在數據流模型上的區別之處。
Storm基本概念
Storm是一個開源的實時計算系統,它提供了一系列的基本元素用于進行計算:Topology、Stream、Spout、Bolt等等。
在Storm中,一個實時應用的計算任務被打包作為Topology發布,這同Hadoop的MapReduce任務相似。但是有一點不同的是:在Hadoop中,MapReduce任務最終會執行完成后結束;而在Storm中,Topology任務一旦提交后永遠不會結束,除非你顯示去停止任務。
計算任務Topology是由不同的Spouts和Bolts,通過數據流(Stream)連接起來的圖。下面是一個Topology的結構示意圖:
其中包含有:
Spout:Storm中的消息源,用于為Topology生產消息(數據),一般是從外部數據源(如Message Queue、RDBMS、NoSQL、Realtime Log)不間斷地讀取數據并發送給Topology消息(tuple元組)。
Bolt:Storm中的消息處理者,用于為Topology進行消息的處理,Bolt可以執行過濾, 聚合, 查詢數據庫等操作,而且可以一級一級的進行處理。
最終,Topology會被提交到storm集群中運行;也可以通過命令停止Topology的運行,將Topology占用的計算資源歸還給Storm集群。
Storm數據流模型
數據流(Stream)是Storm中對數據進行的抽象,它是時間上無界的tuple元組序列。在Topology中,Spout是Stream的源頭,負責為Topology從特定數據源發射Stream;Bolt可以接收任意多個Stream作為輸入,然后進行數據的加工處理過程,如果需要,Bolt還可以發射出新的Stream給下級Bolt進行處理。
下面是一個Topology內部Spout和Bolt之間的數據流關系:
Topology中每一個計算組件(Spout和Bolt)都有一個并行執行度,在創建Topology時可以進行指定,Storm會在集群內分配對應并行度個數的線程來同時執行這一組件。
那么,有一個問題:既然對于一個Spout或Bolt,都會有多個task線程來運行,那么如何在兩個組件(Spout和Bolt)之間發送tuple元組呢?
Storm提供了若干種數據流分發(Stream Grouping)策略用來解決這一問題。在Topology定義時,需要為每個Bolt指定接收什么樣的Stream作為其輸入(注:Spout并不需要接收Stream,只會發射Stream)。
目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping,具體策略可以參考 這里 。
一種Storm不能支持的場景
以上介紹了一些Storm中的基本概念,可以看出, Storm中Stream的概念是Topology內唯一的,只能在Topology內按照“發布-訂閱”方式在不同的計算組件(Spout和Bolt)之間進行數據的流動,而Stream在Topology之間是無法流動的 。
這一點限制了Storm在一些場景下的應用,下面通過一個簡單的實例來說明。
假設現在有一個Topology1的結構如下:通過Spout產生數據流后,依次需要經過Filter Bolt,Join Bolt,Business1 Bolt。其中,Filter Bolt用于對數據進行過濾,Join Bolt用于數據流的聚合,Business1 Bolt用于進行一個實際業務的計算邏輯。
目前這個Topology1已經被提交到Storm集群運行,而現在我們又有了新的需求,需要計算一個新的業務邏輯,而這個Topology的特點是和Topology1公用同樣的數據源,而且前期的預處理過程完全一樣(依次經歷Filter Bolt和Join Bolt),那么這時候Storm怎么來滿足這一需求?據個人了解,有以下幾種“曲折”的實現方式:
1) 第一種方式: 首先kill掉已經在集群中運行的Topology1計算任務,然后實現Business2 Bolt的計算邏輯,并重新打包形成一個新的Topology計算任務jar包后,提交到Storm集群中重新運行,這時候Storm內的整體Topology結構如下:
這種方式的缺點在于:由于要重啟Topology,所以如果Spout或Bolt有狀態則會丟失掉;同時由于Topology結構發生了變化,因此重新運行Topology前需要對程序的穩定性、正確性進行驗證;另外Topology結構的變化也會帶來額外的運維開銷。
2 ) 第二種方式: 完全開發部署一套新的Topology,其中前面的公共部分的Spout和Bolt可以直接復用,只需要重新開發新的計算邏輯Business2 Bolt來替換原有的Business1 Bolt即可。然后重新提交新的Topology運行。這時候Storm內的整體Topology結構如下:
這種方式的缺點在于:由于兩個Topology都會從External Data Source讀取同一份數據,無疑增加了External Data Source的負載壓力;而且會導致同樣的數據在Storm集群內被傳輸相同的兩份,被同樣的計算單元Bolt進行處理,浪費了Storm的計算資源和網絡傳輸帶寬。假設現在不止有兩個這樣的Topology計算任務,而是有N個,那么對Storm的計算Slot的浪費很嚴重。
注意:上述兩種方式還有一個公共的缺點——系統可擴展性不好,這意味著不管哪種方式,只要以后有這種新增業務邏輯的需求,都需要進行復雜的人工操作或線性的資源浪費現象。
3) 第三種方式: OK,看了以上兩種方式后,也許你會提出下面的解決方案:通過Kafka這樣的消息中間件,實現不同Topology的Spout共享數據源,而且這樣可以做到消息可靠傳輸、消息rewind回傳等,好處是對于Storm來說,已經有了 storm-kafka 插件的支持。這時候Storm內的整體Topology結構如下:
這種實現方式可以通過引入一層消息中間件減少對External Data Source的重復訪問的壓力,而且可以通過消息中間件層,屏蔽掉External Data Source的細節,如果需要擴展新的業務邏輯,只需要重新部署運行新的Topology,應該說是現有 Storm版本 下很好的實現方式了。不過消息中間件的引入,無疑將給系統帶來了一定的復雜性,這對于Storm上的應用開發來說提高了門檻。
值得注意的是,方案三中仍遺留有一點問題沒有解決:對于Storm集群來說,這種方式還是沒有能夠從根本上避免數據在Storm不同Topology內的重復發送與處理。這是由于Storm的數據流模型上的限制所導致的,如果Storm實現了不同Topology之間Stream的共享,那么這一問題也就迎刃而解了。
一個流處理系統的數據流模型
個人工作中有幸參與過一個流處理框架的開發與應用。下面我們來簡單看看其中所采用的數據流模型:
其中:
1) 數據流(data stream) :時間分布和數量上無限的一系列數據記錄的集合體;
2) 數據記錄(data record) :數據流的最小組成單元,每條數據記錄包括 3 類數據:所屬數據流名稱(stream name)、用于路由的數據(keys)和具體數據處理邏輯所需的數據(value);
3) 數據處理任務定義(task definition) :定義一個數據處理任務的基本屬性,無法直接被執行,必須特化為具體的任務實例。其基本屬性包括:
- (可選)輸入流(input stream):描述該任務依賴哪些數據流作為輸入,是一個數據流名稱列表;數據流產生源不會依賴其他數據流,可忽略該配置;
- 數據處理邏輯(process logic):描述該任務具體的處理邏輯,例如由獨立進程進行的外部處理邏輯;
- (可選)輸出流(output stream):描述該任務產生哪個數據流,是一個數據流名稱;數據流處理鏈末級任務不會產生新的數據流,可忽略該配置;
4) 數據處理任務實例(task instance) :對一個數據處理任務定義進行具體約束后,可推送到某個處理結點上運行的邏輯實體。附加下列屬性:
- 數據處理任務定義:指向該任務實例對應的數據處理任務定義實體;
- 輸入流過濾條件(input filting condition):一個 boolean 表達式列表,描述每個輸入流中符合什么條件的數據記錄可以作為有效數據交給處理邏輯;若某個輸入流中所有數據記錄都是有效數據,則可直接用 true 表示;
- (可選)強制輸出周期(output interval):描述以什么頻率強制該任務實例產生輸出流記錄,可以用輸入流記錄個數或間隔時間作為周期;忽略該配置時,輸出流記錄產生周期完全由處理邏輯自身決定,不受框架約束;
5) 數據處理結點(node) :可容納多個數據處理任務實例運行的實體機器,每個數據處理結點的IPv4地址必須保證唯一。
該分布式流處理系統由多個數據處理結點(node)組成;每個數據處理結點(node)上運行有多個數據任務實例(task instance);每個數據任務實例(task instance)屬于一個數據任務定義(task definition),任務實例是在任務定義的基礎上,添加了輸入流過濾條件和強制輸出周期屬性后,可實際推送到數據處理結點(node)上運行的邏輯實體;數據任務定義(task definition)包含輸入數據流、數據處理邏輯以及輸出數據流屬性。
該系統中,通過分布式應用程序協調服務ZooKeeper集群存儲以上數據流模型中的所有配置信息;不同的數據處理節點統一通過ZooKeeper集群獲取數據流的配置信息后進行任務實例的運行與停止、數據流的流入和流出。
同時,每個數據處理任務可以接受流系統中已存在的任意數據流(data stream)作為輸入,并產出新的任意名稱的數據流(data stream),被其他結點上運行的任務實例訂閱。不同結點之間對于各個數據流(data stream)的訂閱關系,通過ZooKeeper集群來動態感知并負責通知流系統做出變化。
二者在數據流模型上的不同之處
至于兩個系統的實現細節,我們先不去做具體比較,下面僅列出二者在數據流模型上的一些不同之處(這里并不是為了全面對比二者的不同之處,只是列出其中的關鍵部分):
1) 在Storm中,數據流Stream是在Topology內進行定義,并在Topology內進行傳輸的;而在上面提到的流處理系統中,數據流Stream是在整個系統內全局唯一的,可以在整個集群內被訂閱。
2) 在Storm中,數據流Stream的發布和訂閱都是靜態的,所謂靜態是指數據流的發布與訂閱關系在向Storm集群提交Topology計算任務時,被一次性生成的,這一關系在Topology的運行過程中是不能被改變的;而在上面提到的流處理系統中,數據流Stream的發布和訂閱都是動態的,即數據處理任務task可以動態的發布Stream,也可以動態的訂閱系統內已經生成的任意Stream,數據流的訂閱關于通過分布式應用程序協調服務ZooKeeper集群的動態節點來維護管理。
有了以上的對比,我們不難發現,對于本文所舉的應用場景實例,Storm的數據流模式尚不能很方便的支持,而在這里提到的這個流處理系統的全局數據流模型下,這一應用場景的需求可以很方便的滿足。
總結的話
個人覺得,Storm有必要實現不同Topology之間Stream的共享,這個至少可以在不損失Storm現有功能的前提下,使得Storm在處理實際生產環境下的一些應用場景時更加從容應對。
至于如何在現有Storm的基礎上實現這一需求,可能的方式很多。一種簡單的方式是通過Zookeeper來集中存儲、動態感知Topology之間Stream的“發布-訂閱”關系,同時在Storm的消息分發過程中對這種情況加以處理。
以上觀點,如果不對之處,歡迎大家指出。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
