亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

測試Storm的多源頭錨定

系統 1572 0

過程,

Spout 發送msgid 1-10

一級Bolt, msgid1的tuple做為基本組合tuple, 其他8個和一組合, 然后發送給二級Bolt, 同時單個msgid對應的tuple都ack一次,msgid1對象tuple, acker將會跟蹤8個二級bolt處理情況.

二級Bolt,發送ack fail(模擬處理失?。?

結果:在spout fail下出現msg1-9都失敗的情況 .

拓撲代碼

      package storm.starter;



import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.ShellBolt;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import storm.starter.spout.RandomSentenceSpout;



import java.lang.management.ManagementFactory;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;



import org.apache.log4j.Logger;

import org.apache.log4j.PropertyConfigurator;



/**

 * This topology demonstrates Storm's stream groupings and multilang

 * capabilities.

 */

public class WordCountTopology {

	public static String GetThreadName() {

		Thread thread = Thread.currentThread();

		return thread.getName();

	}



	public static final Logger logger = Logger

			.getLogger(WordCountTopology.class);



	// 切分單詞 一級bolt

	/*

	 * public static class SplitSentence extends ShellBolt implements IRichBolt

	 * { public SplitSentence() { super("python", "splitsentence.py");

	 * logger.error(GetThreadName() + "SplitSentence create"); }

	 * 

	 * // 定義字段發送

	 * 

	 * @Override public void declareOutputFields(OutputFieldsDeclarer declarer)

	 * { declarer.declare(new Fields("word")); logger.error(GetThreadName() +

	 * "declarer.declare(new Fields(word))"); }

	 * 

	 * @Override public Map<String, Object> getComponentConfiguration() {

	 * logger.error("getComponentConfiguration"); return null; } }

	 */

	public static class SplitSentence implements IRichBolt {

		private OutputCollector _collector;

		

		int num = 0;

		@Override

		public void prepare(Map stormConf, TopologyContext context,

				OutputCollector collector) {

			_collector = collector;

		}

		

		private Tuple tuple1;

		@Override

		public void execute(Tuple tuple) {

			String sentence = tuple.getString(0);

		    if(sentence.equals("a")) {

		    	tuple1 = tuple;

		    }

		    else{

		    	List<Tuple> anchors = new ArrayList<Tuple>();

		    	anchors.add(tuple1);

		    	anchors.add(tuple);

		    	_collector.emit(anchors, new Values(sentence + "a"));

		    	_collector.ack(tuple);

		    	_collector.ack(tuple1);

		    }

			

//			for (String word : sentence.split(" ")){

//				_collector.emit(tuple, new Values(word));

//			}

//			num++;

			

			System.out.println("Bolt Thread " + Thread.currentThread().getName() + "recve : " + sentence);	

			System.out.println( num + " bolt recev:" + tuple.getMessageId().getAnchorsToIds());			

		}



		@Override

		public void cleanup() {

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			declarer.declare(new Fields("word"));

		}



		@Override

		public Map<String, Object> getComponentConfiguration() {

			// TODO Auto-generated method stub

			return null;

		}

	}

	

	public static class CountCount1 implements IRichBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();

		private OutputCollector _collector;

		int num = 0;



		@Override

		public void execute(Tuple tuple) {

			String word = tuple.getString(0);

			//logger.error(this.toString() + "word = " + word);

			Integer count = counts.get(word);

			if (count == null)

				count = 0;

			

			count++;

			counts.put(word, count); 

			num++;

			

			_collector.fail(tuple);

			//_collector.ack(tuple);

			

		   //_collector.emit(tuple, new Values(word, count));

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// logger.error("declareOutputFields :");

			declarer.declare(new Fields("result", "count"));

		}



		@Override

		public void prepare(Map stormConf, TopologyContext context,

				OutputCollector collector) {

			// TODO Auto-generated method stub

			_collector = collector;

		}



		@Override

		public void cleanup() {

			// TODO Auto-generated method stub

			

		}



		@Override

		public Map<String, Object> getComponentConfiguration() {

			// TODO Auto-generated method stub

			return null;

		}

	}

	

	

	public static class WordCount extends BaseBasicBolt {

		private OutputCollector _collector;

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String word = tuple.getString(0);

			//logger.error(this.toString() + "word = " + word);

			Integer count = counts.get(word);

			if (count == null)

				count = 0;

			count++;

			counts.put(word, count); // <key, list<value, count> >

			//logger.error(this.toString() + "count = " + count);

			collector.emit(new Values(word, count));

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// logger.error("declareOutputFields :");

			declarer.declare(new Fields("result", "count"));

		}

	}



	public static class WordCount1 extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			// logger.error("WordCount1");

			// tuple.getFields()[0];

			if (tuple.getFields().contains("result")) {

				String count = (String) tuple.getValueByField("result");

				// tuple.getValueByField(field)

				long countl = -0;// = Long.valueOf(count);

				// logger.error(this.toString() + " key  = resultkey " + count);

			}



			if (tuple.getFields().contains("count")) {

				Integer count = (Integer) tuple.getValueByField("count");

				// tuple.getValueByField(field)

				long countl = -0;// = Long.valueOf(count);

				//logger.error(this.toString() + " key  = count " + count);

			}



			// String word = tuple.getString(0);

			// logger.error(this.toString() +"word = " + word);

			// Integer count = counts.get(word);

			// if (count == null)

			// count = 0;

			// count++;

			// counts.put(word, count);

			// logger.error(this.toString() + "count = " + count);

			// collector.emit(new Values(word, count));

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// logger.error("declareOutputFields :");

			declarer.declare(new Fields("word1", "count1"));

		}

	}



	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();



		PropertyConfigurator

				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");

		

	    // parallelism_hint 代表是executor數量, setNumTasks 代表Tasks數量

		builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);



        builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout");

		builder.setBolt("count", new CountCount1(), 12).fieldsGrouping("split",

				new Fields("word"));

//		builder.setBolt("WordCount1", new WordCount1(), 1).fieldsGrouping(

//				"count", new Fields("result", "count"));



		Config conf = new Config();

		conf.setDebug(true);
      
? ? ? ? ? ? ? ??//? 這個設置一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple隊列過大, 只對可靠任務起作用 conf.setMaxSpoutPending(2); conf.setMessageTimeoutSecs(5); // 消息處理延時 conf.setNumAckers(2); // 消息處理acker System.out.println("args.length = " + args.length); if (args != null && args.length > 0) { conf.setNumWorkers(5); // 設置工作進程 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 每個組件的最大executor數 conf.setMaxTaskParallelism(1); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); String str = "testdrpc"; // drpc.execute("testdrpc", str); Thread.sleep(1088000); cluster.shutdown(); } } }

?spout代碼

      package storm.starter.spout;



import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;



import java.util.Map;

import java.util.Random;



import org.apache.log4j.Logger;



import storm.starter.WordCountTopology;





// IRichSpout 

public class RandomSentenceSpout extends BaseRichSpout { 

	SpoutOutputCollector _collector;

	Random _rand;



	public static final Logger logger = Logger

			.getLogger(RandomSentenceSpout.class);



	@Override

	public void open(Map conf, TopologyContext context,

			SpoutOutputCollector collector) {

		_collector = collector;

		_rand = new Random();



		WordCountTopology.logger.error(this.toString()

				+ "RandomSentenceSpout is create");

	}



	private int num = 0;



	private String gettmstr() {

		StringBuilder tmp = new StringBuilder();

		for (int i = 0; i <= num; i++)

			tmp.append("a");

		num++;

		return tmp.toString();

	}



	@Override

	public void nextTuple() {

		Utils.sleep(200);

		// String[] sentences = new String[]{ "the cow jumped over the moon",

		// "an apple a day keeps the doctor away",

		// "four score and seven years ago", "snow white and the seven dwarfs",

		// "i am at two with nature" };

		String[] sentences = new String[] { "A" };



		String sentence = gettmstr(); // sentences[_rand.nextInt(sentences.length)];

		if (num < 10) {

			_collector.emit(new Values(sentence), new Integer(num));

			// logger.error(this.toString() + "send sentence = " + sentence);

		   // System.out.println(Thread.currentThread().getName() + " Spout ");

		}

	}



	@Override

	public void ack(Object id) {

		logger.error(this.toString() + "spout ack =" + (Integer)id);

	}



	@Override

	public void fail(Object id) {

		logger.error("spout fail =" + (Integer)id);

	}



	@Override

	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("word"));

	}



}


    

?運行結果

      2014-10-03 21:17:31,149 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =1

2014-10-03 21:17:31,351 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =2

Bolt Thread Thread-22recve : aaa

0 bolt recev:{-3139141336114052337=7131499433188364504}

2014-10-03 21:17:31,552 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =3

Bolt Thread Thread-22recve : aaaa

0 bolt recev:{-4497680640148241887=-615828348570847097}

2014-10-03 21:17:31,754 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =4

Bolt Thread Thread-22recve : aaaaa

0 bolt recev:{-8878772617767839827=-7708082520013359311}

2014-10-03 21:17:31,957 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =5

Bolt Thread Thread-22recve : aaaaaa

0 bolt recev:{-3995020874692495577=-5070846720162762196}

2014-10-03 21:17:32,160 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =6

Bolt Thread Thread-22recve : aaaaaaa

0 bolt recev:{-5994700617723404155=-3738685841476816404}

2014-10-03 21:17:32,362 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =7

Bolt Thread Thread-22recve : aaaaaaaa

0 bolt recev:{-2308734827213127682=-5719708045753233056}

2014-10-03 21:17:32,563 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =8

Bolt Thread Thread-22recve : aaaaaaaaa

0 bolt recev:{-3718844156917119468=-6359724009048981605}

2014-10-03 21:17:32,766 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =9


    

?

測試Storm的多源頭錨定


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦?。。?/p>

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 美女视频很黄很暴黄是免费的 | 曰本毛片 | 精品欧美高清一区二区免费 | 毛片毛片毛片毛片 | 九色精品视频在线观看 | 伊人影院中文字幕 | 国内精品久久久久影院蜜芽 | 国产精品视频分类一区 | 特级一级毛片 | 公主恋人在线观看 | 日韩在线不卡 | 国产高清美女一级a毛片久久 | 伊人色婷婷综在合线亚洲 | 中文字幕最新中文字幕中文字幕 | 亚洲国产99 | 奶交性视频欧美 | 日本国产成人精品视频 | 干干操操 | 日日摸夜夜摸人人嗷嗷叫 | 国内精品中文字幕 | 久久97精品久久久久久久看片 | 四虎影院2019 | 操白嫩美女 | jizz中国人| 毛片一级免费 | 4hu四虎免费影院www | 日本一区二区三区精品 | 猫咪视频成人永久免费观看 | 欧美大尺码毛片 | 日本在线一卡二卡毛片 | 97久久精品人人做人人爽 | 国内拍拍自拍视频在线观看 | 综合欧美一区二区三区 | 精品一区二区三区免费爱 | 97精品视频 | 一级肉体毛片视频免费看看 | 神马影院午夜我不卡 | 日本视频播放免费线上观看 | 亚洲欧美天堂网 | 久久精品国产99久久久 | 久久久窝窝午夜精品 |