提交示例代碼:
1 public static void main(String[] args) throws Exception {
2 TopologyBuilder builder = new TopologyBuilder();
3 builder.setSpout("random", new RandomWordSpout(), 2);
4 builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");
5 builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));
6 Config conf = new Config();
7 conf.setNumWorkers(4);// 設置啟動4個Worker
8 conf.setNumAckers(1); // 設置一個ack線程
9 conf.setDebug(true); // 設置打印所有發送的消息及系統消息
10 StormSubmitter.submitTopology("test", conf, builder.createTopology());
11 }
1、構建 TopologyBuilder 對象 builder,主要用于對各個組件(bolt、spout)進行配置;
TopologyBuilder主要屬性字段定義如下:
public class TopologyBuilder {
// 所提交Topolog中所有的bolt將放入到_bolts中
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
// 所提交Topolog中所有的spout將放入到_spouts中
private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
// 所提交Topolog中所有的spout和bolt都將放入_commons中
private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
....................................
}
2、以上提交代碼中第三行,配置了一個id值為random,IRichSpout對象為RandomWordSpout,而并行度為2(兩個線程里面跑兩個任務)的spout;
// setSpout函數實現源碼
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, spout, parallelism_hint);
_spouts.put(id, spout);
return new SpoutGetter(id);
}
validateUnusedId:檢測輸入的id是不是唯一,若已經存在將拋出異常;
initCommon:構建ComponentCommon對象并進行相應的初始化,最后放入到_commons(以上TopologyBuilder中定義的Map);
initCommon函數實現源碼:
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
// 設置消息流的來源及分組方式
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null)
// 設置并行度
common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null)
// 設置組件的配置參數
common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
在ComponentCommon中主要對以下四個屬性字段進行設置:
// GlobalStreamId:確定消息來源,其中componentId表示所屬組件,streamId為消息流的標識符;
// Grouping:確定消息分組方式;
private Map<GlobalStreamId,Grouping> inputs;
// StreamInfo表示輸出的字段列表及是否為直接流
private Map<String,StreamInfo> streams;
private int parallelism_hint; // 設置并行度
private String json_conf; // 其它配置參數設置(必須為JSON格式)
3、SpoutGetter
實現源碼:
protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
public SpoutGetter(String id) {
super(id);
}
}
ConfigGetter、SpoutGetter的實現都是在TopologyBuilder中, ConfigGetter作用:設置程序中的配置項,覆蓋默認的配置項,且配置項的格式為為JSON(本質上是改變對應ComponentCommon對象中json_conf的值);
4、提交示例代碼中的第四行定義了一個id為transfer,IRichSpout對象為TransferBolt,并行度為4的bolt
setBolt實現源碼:
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
設置Bolt的函數與設置Spout函數的實現唯一的區別在返回結果;
BoltGetter實現部分源碼:
protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
private String _boltId;
public BoltGetter(String boltId) {
super(boltId);
_boltId = boltId;
}
public BoltDeclarer shuffleGrouping(String componentId) {
return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
}
public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {
return grouping(componentId, streamId, Grouping.fields(fields.toList()));
}
public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));
}
private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {
_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
return this;
}
.........................................
}
BoltGetter繼承至ConfigGetter并實現了BoltDeclarer接口,并重載了BoltDeclarer(InputDeclarer)中各種分組方式(如:fieldsGrouping、shuffleGrouping),分組方式的實現本質上是在_commons中通過對用的boltId找到對應的ComponentCommon對象,對inputs屬性進行設置;
5、通過以上幾步完成了bolt與spout的配置(對應提交示例代碼中的2~5行),6~9行是對運行環境的配置,10行用于向集群提交執行任務,builder.createTopology用于構建StormTopology對象.
createTopology實現源碼:
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
for(String boltId: _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
ComponentCommon common = getComponentCommon(boltId, bolt);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));
}
for(String spoutId: _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));
}
return new StormTopology(spoutSpecs,
boltSpecs,
new HashMap<String, StateSpoutSpec>());
}
以上源碼實現中主要做了兩件事:
- 通過boltId從_bolts中獲取到對應的bolt對象,再通過getComponentCommon方法設置對應ComponentCommon對象的streams(輸出的字段列表及是否為直接流)屬性值,最后將bolt和common一起 放入到boltSpecs集合中。
- 通過spoutId從_spouts中獲取到對應的spout對象,再通過getComponentCommon方法設置對應ComponentCommon對象的streams(輸出的字段列表及是否為直接流)屬性值,最后將spout和common一起 放入到boltSpecs集合中。
- 通過以上兩步使所設置的所有組件都封裝到StormTopology對象中,最后提交的到集群中運行。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
