org.apache.stormstorm-core0.9.3org.apache.kafka

亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

kafka+storm連接

系統 1654 0

本項目為maven項目,需要添加必要的storm庫,以及kafka依賴,使用storm自帶的storm-kafka進行連接,根據自己集群環境

      		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-core</artifactId>

			<version>0.9.3</version>

		</dependency>



		<dependency>

			<groupId>org.apache.kafka</groupId>

			<artifactId>kafka_2.10</artifactId>

			<version>0.8.2.1</version>

			<exclusions>

				<exclusion>

					<groupId>org.apache.zookeeper</groupId>

					<artifactId>zookeeper</artifactId>

				</exclusion>

				<exclusion>

					<groupId>log4j</groupId>

					<artifactId>log4j</artifactId>

				</exclusion>

			</exclusions>

		</dependency>



		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-kafka</artifactId>

			<version>0.9.3</version>

		</dependency>


    

  實例topology:

      package com.xh.kafka.test;



import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;



public class KafkaSpoutTest {



	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

		

		BrokerHosts brokerHosters = new ZkHosts("zookeeperip1:2181,zookeeperip2:2181/kafka/65_250-252");

		

		String topic = "log_test";

		

		//offsetZkRoot 和 offsetZkId 自定義即可

		String offsetZkRoot = "/storm_test";

		String offsetZkId = "kafka-storm";

		

		SpoutConfig spoutConfig = new SpoutConfig(brokerHosters, topic, offsetZkRoot, offsetZkId);

		

		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());		

		

				

		Config conf = new Config();

			

		TopologyBuilder builder = new TopologyBuilder();



		builder.setSpout("spout", new KafkaSpout(spoutConfig));

		builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");

	

		if(args != null && args.length > 0){

			conf.setNumWorkers(3);

			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

		}else{

			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("my-topology", conf, builder.createTopology());

		}

	}



}


    

  此外,不管是本地運行還是集群運行,都需要 修改host文件,添加,kafka集群的機器名 ,例如:

      192.168.*.* kafka-01

192.168.**.** kafka-02

192.168.***.*** kafka-03


    

  否則會報錯如下:

      23810 [Thread-10-spout] INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException



23815 [Thread-10-spout] ERROR backtype.storm.util - Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]

at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]

at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]

at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_65]

Caused by: java.nio.channels.ClosedChannelException: null

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.3.jar:0.9.3]

... 6 common frames omitted


    

kafka+storm連接


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 日本不卡二区 | 老司机精品久久最新免费 | 精品香蕉99久久久久网站 | 色四月 | 淫视频在线观看 | a毛片视频免费观看影院 | 黄片123 | 四虎4hu永久免费 | 梦想成为魔法少女在线观看 | 91国内精品久久久久免费影院 | 四虎成人免费网址在线 | 国产麻豆高清视频在线第一页 | 国产亚洲精品国看不卡 | 四虎影视免费观看免费观看 | 婷婷五月色综合香五月 | 精品福利视频第一 | 国产欧美一区二区三区在线看 | 99爱在线视频这里只有精品 | 四虎影视库永久在线地址 | 亚洲国产成人久久一区www | 成人国产视频在线观看 | 女人隐私秘视频黄www免费 | 波多野给衣一区二区三区 | 理论片我不卡在线观看 | 日韩毛片免费 | 国产精品久久久久久搜索 | 激情一区二区三区 | 免费大片黄在线观看yw | 天天干天天弄 | 91亚洲精品国产自在现线 | 老司机深夜福利网站 | 国产精品久久亚洲不卡4k岛国 | 久久 精品| 台湾佬中文娱乐2222vvv | 国产一区视频在线播放 | 国产99久久亚洲综合精品 | 九九视频热 | 成人免费观看一区二区 | 免费特黄一级欧美大片 | 国产午夜精品久久久久免费视 | 久久成人国产精品二三区 |