storm操作zookeeper的主要函數都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定義了兩個重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協議中封裝了一組與zookeeper進行交互的基礎函數,如獲取子節點函數,獲取子節點數據函數等,ClusterState協議定義如下:
ClusterState協議
?( set-ephemeral-node [ this path data ])
?( delete-node [ this path ])
?( create-sequential [ this path data ])
? ;; if node does not exist, create persistent with this data
?( set-data [ this path data ])
?( get-data [ this path watch? ])
?( get-version [ this path watch? ])
?( get-data-with-version [ this path watch? ])
?( get-children [ this path watch? ])
?( mkdirs [ this path ])
?( close [ this ])
?( register [ this callback ])
?( unregister [ this id ]))
StormClusterState協議封裝了一組storm與zookeeper進行交互的函數,可以將StormClusterState協議中的函數看成ClusterState協議中函數的"組合"。StormClusterState協議定義如下:
StormClusterState協議
?( assignments [ this callback ])
?( assignment-info [ this storm-id callback ])
?( assignment-info-with-version [ this storm-id callback ])
?( assignment-version [ this storm-id callback ])
?( active-storms [ this ])
?( storm-base [ this storm-id callback ])
?( get-worker-heartbeat [ this storm-id node port ])
?( executor-beats [ this storm-id executor->node+port ])
?( supervisors [ this callback ])
?( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
?( setup-heartbeats! [ this storm-id ])
?( teardown-heartbeats! [ this storm-id ])
?( teardown-topology-errors! [ this storm-id ])
?( heartbeat-storms [ this ])
?( error-topologies [ this ])
?( worker-heartbeat! [ this storm-id node port info ])
?( remove-worker-heartbeat! [ this storm-id node port ])
?( supervisor-heartbeat! [ this supervisor-id info ])
?( activate-storm! [ this storm-id storm-base ])
?( update-storm! [ this storm-id new-elems ])
?( remove-storm-base! [ this storm-id ])
?( set-assignment! [ this storm-id info ])
?( remove-storm! [ this storm-id ])
?( report-error [ this storm-id task-id node port error ])
?( errors [ this storm-id task-id ])
?( disconnect [ this ]))
命名空間backtype.storm.cluster除了定義ClusterState和StormClusterState這兩個重要協議外,還定義了兩個重要函數:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函數如下:
該函數返回一個實現了ClusterState協議的對象,通過這個對象就可以與zookeeper進行交互了。
mk-distributed-cluster-state函數
? ;; conf綁定了storm.yaml中的配置信息,是一個map對象
? [ conf ]
? ;; zk綁定一個zk client,Storm使用CuratorFramework與Zookeeper進行交互
?( let [ zk ( zk/mk-client conf ( conf STORM-ZOOKEEPER-SERVERS ) ( conf STORM-ZOOKEEPER-PORT ) :auth-conf conf )]
? ? ;; 創建storm集群在zookeeper上的根目錄,默認值為/storm
? ?( zk/mkdirs zk ( conf STORM-ZOOKEEPER-ROOT ))
? ?( .close zk ))
? ;; callbacks綁定回調函數集合,是一個map對象
?( let [ callbacks ( atom {})
? ? ? ? ;; active標示zookeeper集群狀態
? ? ? ? active ( atom true )
? ? ? ? ;; zk重新綁定新的zk client,該zk client設置了watcher,這樣當zookeeper集群的狀態發生變化時,zk server會給zk client發送相應的event,zk client設置的watcher會調用callbacks中相應回調函數來處理event
? ? ? ? ;; 啟動nimbus時,callbacks是一個空集合,所以nimbus端收到event后不會調用任何回調函數;但是啟動supervisor時,callbacks中注冊了回調函數,所以當supervisor收到zk server發送的event后,會調用相應的回調函數
? ? ? ? ;; mk-client函數定義在zookeeper.clj文件中,請參見其定義部分
? ? zk ( zk/mk-client conf
? ? ? ? ? ? ? ? ? ? ? ? ( conf STORM-ZOOKEEPER-SERVERS )
? ? ? ? ? ? ? ? ? ? ? ? ( conf STORM-ZOOKEEPER-PORT )
? ? ? ? ? ? ? ? ? ? ? ? :auth-conf conf
? ? ? ? ? ? ? ? ? ? ? ? :root ( conf STORM-ZOOKEEPER-ROOT )
? ? ? ? ? ? ? ? ? ? ? ? ;; :watcher綁定一個函數,指定zk client的默認watcher函數,state標示當前zk client的狀態;type標示事件類型;path標示zookeeper上產生該事件的znode
? ? ? ? ? ? ? ? ? ? ? ? ;; 該watcher函數主要功能就是執行callbacks集合中的函數,callbacks集合中的函數是在mk-storm-cluster-state函數中通過調用ClusterState的register函數添加的
? ? ? ? ? ? ? ? ? ? ? ? :watcher ( fn [ state type path ]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when @ active
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when-not ( = :connected state )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper." ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when-not ( = :none type )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( doseq [ callback ( vals @ callbacks )]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( callback type path ))))))]
? ? ;; reify相當于java中的implements,這里表示實現一個協議
? ?( reify
? ? ClusterState
? ? ;; register函數用于將回調函數加入callbacks中,key是一個32位的標識
? ? ( register
? ? ? [ this callback ]
? ? ? ( let [ id ( uuid )]
? ? ? ? ( swap! callbacks assoc id callback )
? ? ? ? id ))
? ? ;; unregister函數用于將指定key的回調函數從callbacks中刪除
? ? ( unregister
? ? ? [ this id ]
? ? ? ( swap! callbacks dissoc id ))
? ? ;; 在zookeeper上添加一個臨時節點
? ? ( set-ephemeral-node
? ? ? [ this path data ]
? ? ? ( zk/mkdirs zk ( parent-path path ))
? ? ? ( if ( zk/exists zk path false )
? ? ? ? ( try-cause
? ? ? ? ? ( zk/set-data zk path data ) ; should verify that it's ephemeral
? ? ? ? ? ( catch KeeperException$NoNodeException e
? ? ? ? ? ? ( log-warn-error e "Ephemeral node disappeared between checking for existing and setting data" )
? ? ? ? ? ? ( zk/create-node zk path data :ephemeral )
? ? ? ? ? ? ))
? ? ? ? ( zk/create-node zk path data :ephemeral )))
? ? ;; 在zookeeper上添加一個順序節點
? ? ( create-sequential
? ? ? [ this path data ]
? ? ? ( zk/create-node zk path data :sequential ))
? ? ;; 修改某個節點數據
? ? ( set-data
? ? ? [ this path data ]
? ? ? ;; note: this does not turn off any existing watches
? ? ? ( if ( zk/exists zk path false )
? ? ? ? ( zk/set-data zk path data )
? ? ? ? ( do
? ? ? ? ? ( zk/mkdirs zk ( parent-path path ))
? ? ? ? ? ( zk/create-node zk path data :persistent ))))
? ? ;; 刪除指定節點
? ? ( delete-node
? ? ? [ this path ]
? ? ? ( zk/delete-recursive zk path ))
? ? ;; 獲取指定節點數據。path標示節點路徑;watch?是一個布爾類型值,表示是否需要對該節點進行"觀察",如果watch?=true,當調用set-data函數修改該節點數據后,
? ? ;; 會給zk client發送一個事件,zk client接收事件后,會調用創建zk client時指定的默認watcher函數(即:watcher綁定的函數)
? ? ( get-data
? ? ? [ this path watch? ]
? ? ? ( zk/get-data zk path watch? ))
? ? ;; 與get-data函數的區別就是獲取指定節點數據的同時,獲取節點數據的version,version表示節點數據修改的次數
? ? ( get-data-with-version
? ? ? [ this path watch? ]
? ? ? ( zk/get-data-with-version zk path watch? ))
? ? ;; 獲取指定節點的version,watch?的含義與get-data函數中的watch?相同
? ? ( get-version
? ? ? [ this path watch? ]
? ? ? ( zk/get-version zk path watch? ))
? ? ;; 獲取指定節點的子節點列表,watch?的含義與get-data函數中的watch?相同
? ? ( get-children
? ? ? [ this path watch? ]
? ? ? ( zk/get-children zk path watch? ))
? ? ;; 在zookeeper上創建一個節點
? ? ( mkdirs
? ? ? [ this path ]
? ? ? ( zk/mkdirs zk path ))
? ? ;; 關閉zk client
? ? ( close
? ? ? [ this ]
? ? ? ( reset! active false )
? ? ? ( .close zk )))))
mk-storm-cluster-state函數定義如下:
mk-storm-cluster-state函數非常重要,該函數返回一個實現了StormClusterState協議的實例,通過該實例storm就可以更加方便與zookeeper進行交互在啟動nimbus和supervisor的函數中均調用了
mk-storm-cluster-state函數。關于nimbus和supervisor的啟動將在之后的文章中介紹。
mk-storm-cluster-state函數
? [ cluster-state-spec ]
? ;; satisfies?謂詞相當于java中的instanceof,判斷cluster-state-spec是不是ClusterState實例
?( let [[ solo? cluster-state ] ( if ( satisfies? ClusterState cluster-state-spec )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [ false cluster-state-spec ]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [ true ( mk-distributed-cluster-state cluster-state-spec )])
? ? ;; 綁定topology id->回調函數的map,當/assignments/{topology id}數據發生變化時,zk client執行assignment-info-callback中topology id所對應的回調函數
? ? ? ? assignment-info-callback ( atom {})
? ? ;; assignment-info-with-version-callback與assignment-info-callback類似
? ? ? ? assignment-info-with-version-callback ( atom {})
? ? ;; assignment-version-callback與assignments-callback類似
? ? ? ? assignment-version-callback ( atom {})
? ? ;; 當/supervisors標示的znode的子節點發生變化時,zk client執行supervisors-callback指向的函數
? ? ? ? supervisors-callback ( atom nil )
? ? ;; 當/assignments標示的znode的子節點發生變化時,zk client執行assignments-callback指向的函數
? ? ? ? assignments-callback ( atom nil )
? ? ;; 當/storms/{topology id}標示的znode的數據發生變化時,zk client執行storm-base-callback中topology id所對應的回調函數
? ? ? ? storm-base-callback ( atom {})
? ? ;; register函數將"回調函數(fn ...)"添加到cluster-state的callbacks集合中,并返回標示該回調函數的uuid
? ? ? ? state-id ( register
? ? ? ? ? ? ? ? ? cluster-state
? ? ? ? ? ? ;; 定義"回調函數",type標示事件類型,path標示znode
? ? ? ? ? ? ? ? ? ( fn [ type path ]
? ? ? ? ? ? ? ;; subtree綁定路徑前綴如"assignments"、"storms"、"supervisors"等,args存放topology id
? ? ? ? ? ? ? ? ? ? ( let [[ subtree & args ] ( tokenize-path path )]
? ? ? ? ? ? ? ? ;; condp相當于java中的switch
? ? ? ? ? ? ? ? ? ? ? ( condp = subtree
? ? ? ? ? ? ? ? ? ;; 當subtree="assignments"時,如果args為空,說明是/assignments的子節點發生變化,執行assignments-callback指向的回調函數,否則
? ? ? ? ? ? ? ;; 說明/assignments/{topology id}標示的節點數據發生變化,執行assignment-info-callback指向的回調函數
? ? ? ? ? ? ? ? ? ? ? ? ASSIGNMENTS-ROOT ( if ( empty? args )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( issue-callback! assignments-callback )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( issue-map-callback! assignment-info-callback ( first args )))
? ? ? ? ? ? ? ? ? ;; 當subtree="supervisors"時,說明是/supervisors的子節點發生變化,執行supervisors-callback指向的回調函數
? ? ? ? ? ? ? ? ? ? ? ? SUPERVISORS-ROOT ( issue-callback! supervisors-callback )
? ? ? ? ? ? ;; 當subtree="storms"時,說明是/storms/{topology id}標示的節點數據發生變化,執行storm-base-callback指向的回調函數
? ? ? ? ? ? ? ? ? ? ? ? STORMS-ROOT ( issue-map-callback! storm-base-callback ( first args ))
? ? ? ? ? ? ? ? ? ? ? ? ;; this should never happen
? ? ? ? ? ? ? ? ? ? ? ? ( exit-process! 30 "Unknown callback for subtree " subtree args )))))]
? ? ;; 在zookeeper上創建storm運行topology所必需的znode
? ?( doseq [ p [ ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE ]]
? ? ?( mkdirs cluster-state p))
? ? ;; 返回一個實現StormClusterState協議的實例
? ?( reify
? ? ? StormClusterState
? ? ? ;; 獲取/assignments的子節點列表,如果callback不為空,將其賦值給assignments-callback,并對/assignments添加"節點觀察"
? ? ?( assignments
? ? ? ? [ this callback ]
? ? ? ?( when callback
? ? ? ? ?( reset! assignments-callback callback ))
? ? ? ?( get-children cluster-state ASSIGNMENTS-SUBTREE ( not-nil? callback )))
? ? ? ;; 獲取/assignments/{storm-id}節點數據,即storm-id的分配信息,如果callback不為空,將其添加到assignment-info-callback中,并對/assignments/{storm-id}添加"數據觀察"
? ? ?( assignment-info
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! assignment-info-callback assoc storm-id callback ))
? ? ? ?( maybe-deserialize ( get-data cluster-state ( assignment-path storm-id ) ( not-nil? callback ))))
? ? ? ;; 獲取/assignments/{storm-id}節點數據包括version信息,如果callback不為空,將其添加到assignment-info-with-version-callback中,并對/assignments/{storm-id}添加"數據觀察"
? ? ?( assignment-info-with-version
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! assignment-info-with-version-callback assoc storm-id callback ))
? ? ? ?( let [{ data :data version :version }
? ? ? ? ? ? ?( get-data-with-version cluster-state ( assignment-path storm-id ) ( not-nil? callback ))]
? ? ? ? { :data ( maybe-deserialize data )
? ? ? ? :version version }))
? ? ? ;; 獲取/assignments/{storm-id}節點數據的version信息,如果callback不為空,將其添加到assignment-version-callback中,并對/assignments/{storm-id}添加"數據觀察"
? ? ?( assignment-version
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! assignment-version-callback assoc storm-id callback ))
? ? ? ?( get-version cluster-state ( assignment-path storm-id ) ( not-nil? callback )))
? ? ? ;; 獲取storm集群中正在運行的topology id即/storms的子節點列表
? ? ?( active-storms
? ? ? ? [ this ]
? ? ? ?( get-children cluster-state STORMS-SUBTREE false ))
? ? ? ;; 獲取storm集群中所有有心跳的topology id即/workerbeats的子節點列表
? ? ?( heartbeat-storms
? ? ? ? [ this ]
? ? ? ?( get-children cluster-state WORKERBEATS-SUBTREE false ))
? ? ? ;; 獲取所有有錯誤的topology id即/errors的子節點列表
? ? ?( error-topologies
? ? ? ? [ this ]
? ? ? ?( get-children cluster-state ERRORS-SUBTREE false ))
? ? ? ;; 獲取指定storm-id進程的心跳信息,即/workerbeats/{storm-id}/{node-port}節點數據
? ? ?( get-worker-heartbeat
? ? ? ? [ this storm-id node port ]
? ? ? ?( -> cluster-state
? ? ? ? ? ?( get-data ( workerbeat-path storm-id node port ) false )
? ? ? ? ? ? maybe-deserialize ))
? ? ? ;; 獲取指定進程中所有線程的心跳信息
? ? ?( executor-beats
? ? ? ? [ this storm-id executor->node+port ]
? ? ? ? ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
? ? ? ? ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
? ? ? ? ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
? ? ? ? ;; we avoid situations like that
? ? ? ?( let [ node+port->executors ( reverse-map executor->node+port )
? ? ? ? ? ? ? all-heartbeats ( for [[[ node port ] executors ] node+port->executors ]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ( ->> ( get-worker-heartbeat this storm-id node port )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( convert-executor-beats executors )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ))]
? ? ? ? ?( apply merge all-heartbeats )))
? ? ? ;; 獲取/supervisors的子節點列表,如果callback不為空,將其賦值給supervisors-callback,并對/supervisors添加"節點觀察"
? ? ?( supervisors
? ? ? ? [ this callback ]
? ? ? ?( when callback
? ? ? ? ?( reset! supervisors-callback callback ))
? ? ? ?( get-children cluster-state SUPERVISORS-SUBTREE ( not-nil? callback )))
? ? ? ;; 獲取/supervisors/{supervisor-id}節點數據,即supervisor的心跳信息
? ? ?( supervisor-info
? ? ? ? [ this supervisor-id ]
? ? ? ?( maybe-deserialize ( get-data cluster-state ( supervisor-path supervisor-id ) false )))
? ? ? ;; 設置進程心跳信息
? ? ?( worker-heartbeat!
? ? ? ? [ this storm-id node port info ]
? ? ? ?( set-data cluster-state ( workerbeat-path storm-id node port ) ( Utils/serialize info )))
? ? ? ;; 刪除進程心跳信息
? ? ?( remove-worker-heartbeat!
? ? ? ? [ this storm-id node port ]
? ? ? ?( delete-node cluster-state ( workerbeat-path storm-id node port )))
? ? ? ;; 創建指定storm-id的topology的用于存放心跳信息的節點
? ? ?( setup-heartbeats!
? ? ? ? [ this storm-id ]
? ? ? ?( mkdirs cluster-state ( workerbeat-storm-root storm-id )))
? ? ? ;; 刪除指定storm-id的topology的心跳信息節點
? ? ?( teardown-heartbeats!
? ? ? ? [ this storm-id ]
? ? ? ?( try-cause
? ? ? ? ?( delete-node cluster-state ( workerbeat-storm-root storm-id ))
? ? ? ? ?( catch KeeperException e
? ? ? ? ? ?( log-warn-error e "Could not teardown heartbeats for " storm-id ))))
? ? ? ;; 刪除指定storm-id的topology的錯誤信息節點
? ? ?( teardown-topology-errors!
? ? ? ? [ this storm-id ]
? ? ? ?( try-cause
? ? ? ? ?( delete-node cluster-state ( error-storm-root storm-id ))
? ? ? ? ?( catch KeeperException e
? ? ? ? ? ?( log-warn-error e "Could not teardown errors for " storm-id ))))
? ? ? ;; 創建臨時節點存放supervisor的心跳信息
? ? ?( supervisor-heartbeat!
? ? ? ? [ this supervisor-id info ]
? ? ? ?( set-ephemeral-node cluster-state ( supervisor-path supervisor-id ) ( Utils/serialize info )))
? ? ? ;; 創建/storms/{storm-id}節點
? ? ?( activate-storm!
? ? ? ? [ this storm-id storm-base ]
? ? ? ?( set-data cluster-state ( storm-path storm-id ) ( Utils/serialize storm-base )))
? ? ? ;; 更新topology對應的StormBase對象,即更新/storm/{storm-id}節點
? ? ?( update-storm!
? ? ? ? [ this storm-id new-elems ]
? ? ? ? ;; base綁定storm-id在zookeeper上的StormBase對象
? ? ? ?( let [ base ( storm-base this storm-id nil )
? ? ? ? ? ? ? ;; executors綁定component名稱->組件并行度的map
? ? ? ? ? ? ? executors ( :component->executors base )
? ? ? ? ? ? ? ;; new-elems綁定合并后的組件并行度map,update函數將組件新并行度map合并到舊map中
? ? ? ? ? ? ? new-elems ( update new-elems :component->executors ( partial merge executors ))]
? ? ? ? ? ;; 更新StormBase對象中的組件并行度map,并寫入zookeeper的/storms/{storm-id}節點
? ? ? ? ?( set-data cluster-state ( storm-path storm-id )
? ? ? ? ? ? ? ? ? ?( -> base
? ? ? ? ? ? ? ? ? ? ? ?( merge new-elems )
? ? ? ? ? ? ? ? ? ? ? ? Utils/serialize ))))
? ? ? ;; 獲取storm-id的StormBase對象,即讀取/storms/{storm-id}節點數據,如果callback不為空,將其賦值給storm-base-callback,并為/storms/{storm-id}節點添加"數據觀察"
? ? ?( storm-base
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! storm-base-callback assoc storm-id callback ))
? ? ? ?( maybe-deserialize ( get-data cluster-state ( storm-path storm-id ) ( not-nil? callback ))))
? ? ? ;; 刪除storm-id的StormBase對象,即刪除/storms/{storm-id}節點
? ? ?( remove-storm-base!
? ? ? ? [ this storm-id ]
? ? ? ?( delete-node cluster-state ( storm-path storm-id )))
? ? ? ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}節點數據
? ? ?( set-assignment!
? ? ? ? [ this storm-id info ]
? ? ? ?( set-data cluster-state ( assignment-path storm-id ) ( Utils/serialize info )))
? ? ? ;; 刪除storm-id的分配信息,同時刪除其StormBase信息,即刪除/assignments/{storm-id}節點和/storms/{storm-id}節點
? ? ?( remove-storm!
? ? ? ? [ this storm-id ]
? ? ? ?( delete-node cluster-state ( assignment-path storm-id ))
? ? ? ?( remove-storm-base! this storm-id ))
? ? ? ;; 將組件異常信息寫入zookeeper
? ? ?( report-error
? ? ? ? [ this storm-id component-id node port error ]
? ? ? ? ;; path綁定"/errors/{storm-id}/{component-id}"
? ? ? ?( let [ path ( error-path storm-id component-id )
? ? ? ? ? ? ? ;; data綁定異常信息,包括異常時間、異常堆棧信息、主機和端口
? ? ? ? ? ? ? data { :time-secs ( current-time-secs ) :error ( stringify-error error ) :host node :port port }
? ? ? ? ? ? ? ;; 創建/errors/{storm-id}/{component-id}節點
? ? ? ? ? ? ? _ ( mkdirs cluster-state path )
? ? ? ? ? ? ? ;; 創建/errors/{storm-id}/{component-id}的子順序節點,并寫入異常信息
? ? ? ? ? ? ? _ ( create-sequential cluster-state ( str path "/e" ) ( Utils/serialize data ))
? ? ? ? ? ? ? ;; to-kill綁定除去順序節點編號最大的前10個節點的剩余節點的集合
? ? ? ? ? ? ? to-kill ( ->> ( get-children cluster-state path false )
? ? ? ? ? ? ? ? ? ? ? ? ? ( sort-by parse-error-path )
? ? ? ? ? ? ? ? ? ? ? ? ? reverse
? ? ? ? ? ? ? ? ? ? ? ? ? ( drop 10 ))]
? ? ? ? ? ;; 刪除to-kill中包含的節點
? ? ? ? ?( doseq [ k to-kill ]
? ? ? ? ? ?( delete-node cluster-state ( str path "/" k )))))
? ? ? ;; 得到給定的storm-id component-id下的異常信息
? ? ?( errors
? ? ? ? [ this storm-id component-id ]
? ? ? ?( let [ path ( error-path storm-id component-id )
? ? ? ? ? ? ? _ ( mkdirs cluster-state path )
? ? ? ? ? ? ? children ( get-children cluster-state path false )
? ? ? ? ? ? ? errors ( dofor [ c children ]
? ? ? ? ? ? ? ? ? ? ? ? ? ?( let [ data ( -> ( get-data cluster-state ( str path "/" c ) false )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? maybe-deserialize )]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when data
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( struct TaskError ( :error data ) ( :time-secs data ) ( :host data ) ( :port data ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?)))
? ? ? ? ? ? ? ]
? ? ? ? ?( ->> ( filter not-nil? errors )
? ? ? ? ? ? ? ( sort-by ( comp - :time-secs )))))
? ? ? ;; 關閉連接,在關閉連接前,將回調函數從cluster-state的callbacks中刪除
? ? ?( disconnect
? ? ? ? [ this ]
? ? ? ?( unregister cluster-state state-id )
? ? ? ?( when solo?
? ? ? ? ?( close cluster-state ))))))
zookeeper.clj中mk-client函數定義如下:
mk-client函數創建一個CuratorFramework實例,為該實例注冊了CuratorListener,當一個后臺操作完成或者指定的watch被觸發時將會執行CuratorListener中的eventReceived()。eventReceived中調用的wacher函數就是mk-distributed-cluster-state中:watcher綁定的函數。
mk-client函數
? [ conf servers port
? :root ""
? :watcher default-watcher
? :auth-conf nil ]
?( let [ fk ( Utils/newCurator conf servers port root ( when auth-conf ( ZookeeperAuthInfo. auth-conf )))]
? ?( .. fk
? ? ? ?( getCuratorListenable )
? ? ? ?( addListener
? ? ? ? ?( reify CuratorListener
? ? ? ? ? ?( ^ void eventReceived [ this ^ CuratorFramework _fk ^ CuratorEvent e ]
? ? ? ? ? ? ? ? ? ( when ( = ( .getType e ) CuratorEventType/WATCHED )
? ? ? ? ? ? ? ? ? ? ( let [ ^ WatchedEvent event ( .getWatchedEvent e )]
? ? ? ? ? ? ? ? ? ? ? ( watcher ( zk-keeper-states ( .getState event ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( zk-event-types ( .getType event ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( .getPath event ))))))))
? ?( .start fk )
? ? fk ))
以上就是storm與zookeeper進行交互的源碼分析,我覺得最重要的部分就是如何給zk client添加"wacher",storm的很多功能都是通過zookeeper的wacher機制實現的,如"分配信息領取"。添加"wacher"大概分為以下幾個步驟:
-
mk-distributed-cluster-state函數創建了一個zk client,并通過:watcher給該zk client指定了"wacher"函數,這個"wacher"函數只是簡單調用ClusterState的callbacks集合中的函數,這樣這個"wacher"函數執行哪些函數將由ClusterState實例決定
- ClusterState實例提供register函數來更新callbacks集合,ClusterState實例被傳遞給了mk-storm-cluster-state函數,在mk-storm-cluster-state中調用register添加了一個函數(fn [type path] ... ),這個函數實現了"watcher"函數的全部邏輯
-
mk-storm-cluster-state中注冊的函數執行的具體內容由StormClusterState實例決定,對zookeeper節點添加"觀察"也是通過StormClusterState實例實現的,這樣我們就可以通過StormClusterState實例對我們感興趣的節點添加"觀察"和"回調函數",當節點或節點數據發生變化后,zk server就會給zk client發送"通知",zk client中的"wather"函數將被調用,進而我們注冊的"回到函數"將被執行。
這部分源碼與zookeeper聯系十分緊密,涉及了很多zookeeper中的概念和特性,如"數據觀察"和"節點觀察"等,有關zookeeper的wacher機制請參考
http://www.cnblogs.com/ggjucheng/p/3369946.html
http://www.cnblogs.com/zhangchaoyang/articles/3813217.html
storm并沒有直接使用zookeeper的api,而是使用Curator框架,Curator框架簡化了訪問zookeeper的操作。關于Curator框架請參考
http://f.dataguru.cn/thread-120125-1-1.html
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
