亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Storm 中drpc調(diào)用

系統(tǒng) 1758 0
      package storm.starter;



import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.task.ShellBolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import storm.starter.spout.RandomSentenceSpout;



import java.lang.management.ManagementFactory;

import java.util.HashMap;

import java.util.Map;



import org.apache.log4j.Logger;

import org.apache.log4j.PropertyConfigurator;



/**

 * This topology demonstrates Storm's stream groupings and multilang

 * capabilities.

 */

public class Drpctest {

	public static final Logger logger = Logger.getLogger(Drpctest.class);

	public static class WordCount extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String word = tuple.getString(0);

			logger.error(this.toString() + "word = " + word);

			Integer count = counts.get(word);

			if (count == null)

				count = 0;

			count++;

			counts.put(word, count);

			logger.error(this.toString() + "count = " + count);

			collector.emit(new Values(word, count));

		}



		String str = Thread.currentThread().getName();



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			logger.error("declareOutputFields :");

			declarer.declare(new Fields("result", "count"));

		}

	}



	public static class DrpcBolt extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String logString = tuple.getString(0);

			logger.error("DrpcBolt recve :" + logString);

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// 暫時(shí)沒用

			declarer.declare(new Fields("word1", "count1"));

		}

	}



	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();



		// drpc

		LocalDRPC drpc = new LocalDRPC();

		DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);

		builder.setSpout("drpcspout", drpc_spout, 3);



		PropertyConfigurator

				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");



		// 接入drpc

		builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(

				"drpcspout");



		Config conf = new Config();

		conf.setDebug(true);



		if (args != null && args.length > 0) {

			conf.setNumWorkers(3);



			StormSubmitter.submitTopology(args[0], conf,

					builder.createTopology());

		} else {

			conf.setMaxTaskParallelism(3);

			conf.setDebug(true);



			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("word-count", conf, builder.createTopology());



			String str = "send test drpc"; // 和 DRPCSpout 名字對(duì)應(yīng)

			drpc.execute("testdrpc", str);



			Thread.sleep(10000);



			cluster.shutdown();

		}

	}

}


    

?

Storm 中drpc調(diào)用


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 国产精品亚洲欧美一区麻豆 | 国产成人精品亚洲日本在线观看 | 色干综合 | 亚洲国产婷婷香蕉久久久久久 | 国产成人精品日本 | 午夜窝窝 | 一级一级女人18毛片 | 五月婷婷国产 | 国产精品久久久久无毒 | 亚洲精品美女久久久aaa | 97色老99久久九九爱精品 | 一区二区三区视频网站 | 欧美综合图区亚洲综合图区 | 中国在线播放精品区 | 亚洲高清视频在线 | 精品日产 | 欧美777精品久久久久网 | 浮力影院欧美三级日本三级 | 国产精品主播在线 | 日本网站在线 | 国产乱人伦偷精品视频不卡 | a男人天堂 | 欧美精品中文 | 99爱在线观看精品视频 | 国产真实一区二区三区 | 午夜三级影院 | 久久在线一区 | 九九爱www高清免费人成 | 天天插天天射 | 在线观看中文字幕国产 | 情趣色视频网站 | 天天操天天舔天天干 | 高清在线一区二区三区亚洲综合 | 奇米影视色 | 日本一级毛片 | 国产日韩欧美亚洲精品95 | 色天天色综合 | 中文字幕久热精品视频免费 | 国产伦精品一区二区三区无广告 | www.四虎影院在线观看 | 免费国产a|