看過(guò)一些別人寫(xiě)的, 感覺(jué)有些東西沒(méi)太說(shuō)清楚,個(gè)人主要以源代碼跟蹤,參考個(gè)人理解講述,有錯(cuò)誤請(qǐng)指正。
1基本名詞
1.1 Tuple: 消息傳遞的基本單位。很多文章中介紹都是這么說(shuō)的, 個(gè)人覺(jué)得應(yīng)該更詳細(xì)一點(diǎn)。
?在spout發(fā)送的時(shí)候,函數(shù)原型
?public List<Integer> emit(List<Object> tuple, Object messageId) {
??????? return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
??? }
這里的tuple, 實(shí)際上是List<Object> 對(duì)象,返回的是 List<Integer> 是要發(fā)送的tast的IdsList
在bolt接收的時(shí)候, 函數(shù)原型
public void execute(Tuple tuple)
變成了一個(gè)Tuple對(duì)象,? 結(jié)構(gòu)應(yīng)該也是一個(gè)list, List<Field1, value1, Field2, value2..>這樣的一個(gè)結(jié)構(gòu), FieldList ValueList, 我們根據(jù)對(duì)應(yīng)的fieldname就可以取出對(duì)應(yīng)的getIntegerByField方法
回到spout對(duì)象中來(lái), 在spout有一個(gè)定義的輸出字段
??? public void declareOutputFields(OutputFieldsDeclarer declarer) {
?? ??? ?declarer.declare(new Fields("word"));
?? ?}
這里定義的一個(gè)字段,所以我們?cè)趀mit的時(shí)候就只能發(fā)送一個(gè)包含一個(gè)value的tuple(spout部分), storm會(huì)將field, 和 發(fā)送的value下標(biāo)對(duì)應(yīng), 變成一個(gè)Tuple對(duì)象,? 也就是上面說(shuō)的
List<Field1, value1, Field2, value2..>這樣的一個(gè)結(jié)構(gòu),? 在bolt 之間傳遞tuple, 發(fā)送又是List<Object> tuple, 根據(jù)組裝bolt定義的fiels, 再組合成Tuple對(duì)象給下一個(gè)Bolt處理
在發(fā)射的最后 還有一個(gè) void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);? 因?yàn)樯厦鎒mit的時(shí)候已經(jīng)返回List<taskid>, 所以它就知道要發(fā)送給哪些taskid處理,然后將taskid 和 tuple放入隊(duì)列
LinkedBlockingQueue
, 代碼如下
; worker.clj
(
defn
mk-transfer-
fn
[
transfer-queue
]
(
fn
[
task ^Tuple tuple
]
(.put ^LinkedBlockingQueue
transfer-queue
[
task tuple
]
)
))
然后單獨(dú)會(huì)開(kāi)啟一個(gè)叫async-loop的線程,取出每條記錄(taskid, tuple), 然后worker會(huì)從當(dāng)前task建立一個(gè)到目標(biāo)task的zeromq連接, 通過(guò)zeromq將tuple發(fā)送給目標(biāo)task
?
總結(jié): 每次emit都是根據(jù)List<Object>和定義的輸出Fields組合成一個(gè)Tuple對(duì)象,,每個(gè)接受對(duì)象接收的是Tuple對(duì)象,如果處理完再發(fā)送又再組合字段, 在emit的時(shí)候返回LIst<taskids>,所以就知道發(fā)送給哪些Task, 然后拿這些taskid和tuple再組合成一個(gè)任務(wù)隊(duì)列,通過(guò)zeromq發(fā)送到目標(biāo)task,目標(biāo)task接收到tuple進(jìn)程處理至于并發(fā)度控制, 參考
http://www.cnblogs.com/chengxin1982/p/4001275.html
?
TupleID Tuple對(duì)應(yīng)的ID,? 在創(chuàng)建的時(shí)候賦予一個(gè)64位的id,主要用來(lái)跟蹤消息
MsgID? 官方解釋 Emits a new tuple to the default output stream with the given message ID. 如果不指定,acker不會(huì)跟蹤。主要作用 , 在spout收到fail時(shí)候, 能夠定位到是哪條消息出錯(cuò),能夠決定重發(fā). 使用實(shí)例? _collector.emit(new Values(sentence),? new Integer(num));
acker 消息跟蹤者. acker 存儲(chǔ)一個(gè)Map<taskid, ack val> ,? taskid為祖宗tuple創(chuàng)建者的taskid
,?
ack_val 為消息傳遞過(guò)程中的 tupleid的xor值,如果為0則知道是哪個(gè)spout或者bolt已經(jīng)處理完了, 為什么會(huì)有bolt, 因?yàn)閎olt在發(fā)射的時(shí)候,如果非錨定,就是不帶tuple發(fā)射,它會(huì)被認(rèn)為是祖宗tuple, 上一個(gè)tuple會(huì)認(rèn)為已經(jīng)結(jié)束.
至于分配發(fā)射源分配到acker, storm采用一致性hash 祖宗tupleid來(lái)分配,因?yàn)樵谒械膖uple中都能知道祖宗tupleid,所以在子孫tuple處理時(shí), 知道該發(fā)送給哪個(gè)acker跟蹤
?
理解Storm可靠性消息