欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring boot集成Kafka+Storm的示例代碼

 更新時(shí)間:2017年12月31日 10:51:33   作者:LeeZer  
這篇文章主要介紹了Spring boot集成Kafka+Storm的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

前言

由于業(yè)務(wù)需求需要把Strom與kafka整合到spring boot項(xiàng)目里,實(shí)現(xiàn)其他服務(wù)輸出日志至kafka訂閱話題,storm實(shí)時(shí)處理該話題完成數(shù)據(jù)監(jiān)控及其他數(shù)據(jù)統(tǒng)計(jì),但是網(wǎng)上教程較少,今天想寫(xiě)的就是如何整合storm+kafka 到spring boot,順帶說(shuō)一說(shuō)我遇到的坑。

使用工具及環(huán)境配置

​ 1. java 版本jdk-1.8

​ 2. 編譯工具使用IDEA-2017

​ 3. maven作為項(xiàng)目管理

​ 4.spring boot-1.5.8.RELEASE

需求體現(xiàn)

1.為什么需要整合到spring boot

為了使用spring boot 統(tǒng)一管理各種微服務(wù),及同時(shí)避免多個(gè)分散配置

2.具體思路及整合原因

​ 使用spring boot統(tǒng)一管理kafka、storm、redis等所需要的bean,通過(guò)其他服務(wù)日志收集至Kafka,KafKa實(shí)時(shí)發(fā)送日志至storm,在strom bolt時(shí)進(jìn)行相應(yīng)的處理操作

遇到的問(wèn)題

​ 1.使用spring boot并沒(méi)有相關(guān)整合storm

​ 2.以spring boot啟動(dòng)方式不知道如何觸發(fā)提交Topolgy

​ 3.提交Topology時(shí)遇到numbis not client localhost 問(wèn)題

​ 4.Storm bolt中無(wú)法通過(guò)注解獲得實(shí)例化bean進(jìn)行相應(yīng)的操作

解決思路

在整合之前我們需要知道相應(yīng)的spring boot 的啟動(dòng)方式及配置(如果你在閱讀本文時(shí),默認(rèn)你已經(jīng)對(duì)storm,kafka及spring boot有相關(guān)了解及使用)

spring boot 對(duì)storm進(jìn)行整合的例子在網(wǎng)上很少,但是因?yàn)橛邢鄳?yīng)的需求,因此我們還是需要整合.

首先導(dǎo)入所需要jar包:

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>
 </dependency>

 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-kafka</artifactId>
 <exclusions>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>spring-boot-actuator</artifactId>
  <groupId>org.springframework.boot</groupId>
 </exclusion>
 <exclusion>
  <artifactId>kafka-clients</artifactId>
  <groupId>org.apache.kafka</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 <exclusions>
 <exclusion>
  <artifactId>kafka-clients</artifactId>
  <groupId>org.apache.kafka</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <dependency>
 <groupId>org.springframework.data</groupId>
 <artifactId>spring-data-hadoop</artifactId>
 <version>2.5.0.RELEASE</version>
 <exclusions>
 <exclusion>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
 </exclusion>
 <exclusion>
  <artifactId>commons-logging</artifactId>
  <groupId>commons-logging</groupId>
 </exclusion>
 <exclusion>
  <artifactId>netty</artifactId>
  <groupId>io.netty</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-core-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>curator-client</artifactId>
  <groupId>org.apache.curator</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jettison</artifactId>
  <groupId>org.codehaus.jettison</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-mapper-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-jaxrs</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>snappy-java</artifactId>
  <groupId>org.xerial.snappy</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-xc</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-mapreduce-client-core</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>

 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.zookeeper</groupId>
 <artifactId>zookeeper</artifactId>
 <version>3.4.10</version>
 <exclusions>
 <exclusion>
  <artifactId>slf4j-log4j12</artifactId>
  <groupId>org.slf4j</groupId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.2.4</version>
 <exclusions>
 <exclusion>
  <artifactId>log4j</artifactId>
  <groupId>log4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>netty</artifactId>
  <groupId>io.netty</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-common</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-annotations</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-yarn-common</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>slf4j-log4j12</artifactId>
  <groupId>org.slf4j</groupId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-common</artifactId>
 <version>2.7.3</version>
 <exclusions>
 <exclusion>
  <artifactId>commons-logging</artifactId>
  <groupId>commons-logging</groupId>
 </exclusion>
 <exclusion>
  <artifactId>curator-client</artifactId>
  <groupId>org.apache.curator</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-mapper-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-core-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>log4j</artifactId>
  <groupId>log4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>snappy-java</artifactId>
  <groupId>org.xerial.snappy</groupId>
 </exclusion>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-auth</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>commons-lang</artifactId>
  <groupId>commons-lang</groupId>
 </exclusion>
 <exclusion>
  <artifactId>slf4j-log4j12</artifactId>
  <groupId>org.slf4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-mapreduce-examples</artifactId>
 <version>2.7.3</version>
 <exclusions>
 <exclusion>
  <artifactId>commons-logging</artifactId>
  <groupId>commons-logging</groupId>
 </exclusion>
 <exclusion>
  <artifactId>netty</artifactId>
  <groupId>io.netty</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>log4j</artifactId>
  <groupId>log4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <!--storm-->
 <dependency>
 <groupId>org.apache.storm</groupId>
 <artifactId>storm-core</artifactId>
 <version>${storm.version}</version>
 <scope>${provided.scope}</scope>
 <exclusions>
 <exclusion>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-slf4j-impl</artifactId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <dependency>
 <groupId>org.apache.storm</groupId>
 <artifactId>storm-kafka</artifactId>
 <version>1.1.1</version>
 <exclusions>
 <exclusion>
  <artifactId>kafka-clients</artifactId>
  <groupId>org.apache.kafka</groupId>
 </exclusion>
 </exclusions>
 </dependency>

其中去除jar包是因?yàn)樾枰嗯c項(xiàng)目構(gòu)建依賴有多重依賴問(wèn)題,storm版本為1.1.0  spring boot相關(guān)依賴為

```java

<!-- spring boot -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
   <exclusions>
    <exclusion>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-logging</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-log4j2</artifactId>
  </dependency>
  <dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
   <version>${mybatis-spring.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-configuration-processor</artifactId>
   <optional>true</optional>
  </dependency>

ps:maven的jar包僅因?yàn)轫?xiàng)目使用需求,不是最精簡(jiǎn),僅供大家參考.

項(xiàng)目結(jié)構(gòu):

config-存儲(chǔ)不同環(huán)境配置文件


存儲(chǔ)構(gòu)建spring boot 相關(guān)實(shí)現(xiàn)類 其他如構(gòu)建名

啟動(dòng)spring boot的時(shí)候我們會(huì)發(fā)現(xiàn)

其實(shí)開(kāi)始整合前,對(duì)storm了解的較少,屬于剛開(kāi)始沒(méi)有接觸過(guò),后面參考發(fā)現(xiàn)整合到spring boot里面啟動(dòng)spring boot之后并沒(méi)有相應(yīng)的方式去觸發(fā)提交Topolgy的函數(shù),所以也造成了以為啟動(dòng)spring boot之后就完事了結(jié)果等了半個(gè)小時(shí)什么事情都沒(méi)發(fā)生才發(fā)現(xiàn)沒(méi)有實(shí)現(xiàn)觸發(fā)提交函數(shù).

為了解決這個(gè)問(wèn)題我的想法是: 啟動(dòng)spring boot->創(chuàng)建kafka監(jiān)聽(tīng)Topic然后啟動(dòng)Topolgy完成啟動(dòng),可是這樣的問(wèn)題kafka監(jiān)聽(tīng)這個(gè)主題會(huì)重復(fù)觸發(fā)Topolgy,這明顯不是我們想要的.看了一會(huì)后發(fā)現(xiàn)spring 有相關(guān)啟動(dòng)完成之后執(zhí)行某個(gè)時(shí)間方法,這個(gè)對(duì)我來(lái)說(shuō)簡(jiǎn)直是救星啊.所以現(xiàn)在觸發(fā)Topolgy的思路變?yōu)?

啟動(dòng)spring boot ->執(zhí)行觸發(fā)方法->完成相應(yīng)的觸發(fā)條件

構(gòu)建方法為:

/**
 * @author Leezer
 * @date 2017/12/28
 * spring加載完后自動(dòng)自動(dòng)提交Topology
 **/
@Configuration
@Component
public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {

 private static String BROKERZKSTR;
 private static String TOPIC;
 private static String HOST;
 private static String PORT;
 public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,
     @Value("${zookeeper.host}") String host,
     @Value("${zookeeper.port}") String port,
     @Value("${kafka.default-topic}") String topic
 ){
  BROKERZKSTR = brokerZkstr;
  HOST= host;
  TOPIC= topic;
  PORT= port;
 }

 @Override
 public void onApplicationEvent(ContextRefreshedEvent event) {
  try {
   //實(shí)例化topologyBuilder類。
   TopologyBuilder topologyBuilder = new TopologyBuilder();
   //設(shè)置噴發(fā)節(jié)點(diǎn)并分配并發(fā)數(shù),該并發(fā)數(shù)將會(huì)控制該對(duì)象在集群中的線程數(shù)。
   BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
   // 配置Kafka訂閱的Topic,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字
   SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
   spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
   spoutConfig.zkServers = Collections.singletonList(HOST);
   spoutConfig.zkPort = Integer.parseInt(PORT);
   //從Kafka最新輸出日志讀取
   spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
   KafkaSpout receiver = new KafkaSpout(spoutConfig);
   topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
   topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
   Config config = new Config();
   config.setDebug(false);
   /*設(shè)置該topology在storm集群中要搶占的資源slot數(shù),一個(gè)slot對(duì)應(yīng)這supervisor節(jié)點(diǎn)上的以個(gè)worker進(jìn)程,如果你分配的spot數(shù)超過(guò)了你的物理節(jié)點(diǎn)所擁有的worker數(shù)目的話,有可能提交不成功,加入你的集群上面已經(jīng)有了一些topology而現(xiàn)在還剩下2個(gè)worker資源,如果你在代碼里分配4個(gè)給你的topology的話,那么這個(gè)topology可以提交但是提交以后你會(huì)發(fā)現(xiàn)并沒(méi)有運(yùn)行。 而當(dāng)你kill掉一些topology后釋放了一些slot后你的這個(gè)topology就會(huì)恢復(fù)正常運(yùn)行。
   */
   config.setNumWorkers(1);
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

​ 注:

啟動(dòng)項(xiàng)目時(shí)因?yàn)槭褂玫氖莾?nèi)嵌tomcat進(jìn)行啟動(dòng),可能會(huì)報(bào)如下錯(cuò)誤

[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
 at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
 at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144]
 at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

這是因?yàn)橛邢鄳?yīng)導(dǎo)入的jar包引入了servlet-api版本低于內(nèi)嵌版本,我們需要做的就是打開(kāi)maven依賴把其去除

<exclusion>
 <artifactId>servlet-api</artifactId>
 <groupId>javax.servlet</groupId>
</exclusion>

然后重新啟動(dòng)就可以了.

啟動(dòng)過(guò)程中還有可能報(bào):

復(fù)制代碼 代碼如下:

org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90

這個(gè)問(wèn)題我思考了很久,發(fā)現(xiàn)網(wǎng)上的解釋都是因?yàn)閟torm配置問(wèn)題導(dǎo)致不對(duì),可是我的storm是部署在服務(wù)器上的.并沒(méi)有相關(guān)的配置,按理也應(yīng)該去服務(wù)器上讀取相關(guān)配置,可是結(jié)果并不是這樣的。最后嘗試了幾個(gè)做法發(fā)現(xiàn)都不對(duì),這里才發(fā)現(xiàn),在構(gòu)建集群的時(shí)候storm提供了相應(yīng)的本地集群

LocalCluster cluster = new LocalCluster();

進(jìn)行本地測(cè)試,如果在本地測(cè)試就使用其進(jìn)行部署測(cè)試,如果部署到服務(wù)器上需要把:

cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
//修正為:
StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());

進(jìn)行任務(wù)提交;

以上解決了上面所述的問(wèn)題1-3

問(wèn)題4:是在bolt中使用相關(guān)bean實(shí)例,我發(fā)現(xiàn)我把其使用@Component加入spring中也無(wú)法獲取到實(shí)例:我的猜想是在我們構(gòu)建提交Topolgy的時(shí)候,它會(huì)在:

復(fù)制代碼 代碼如下:

topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");

執(zhí)行bolt相關(guān):

@Override
 public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
  this.collector = collector;
  StormLauncher stormLauncher = StormLauncher.getStormLauncher();
  dataRepositorys =(AlarmDataRepositorys)   stormLauncher.getBean("alarmdataRepositorys");
 }

而不會(huì)實(shí)例化bolt,導(dǎo)致線程不一而spring 獲取不到.(這里我也不是太明白,如果有大佬知道可以分享一波)

而我們使用spring boot的意義就在于這些獲取這些繁雜的對(duì)象,這個(gè)問(wèn)題困擾了我很久.最終想到,我們可以通過(guò)上下文getbean獲取實(shí)例不知道能不能行,然后我就開(kāi)始了定義:

例如我需要在bolt中使用一個(gè)服務(wù):

/**
 * @author Leezer
 * @date 2017/12/27
 * 存儲(chǔ)操作失敗時(shí)間
 **/
@Service("alarmdataRepositorys")
public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
 private static final String ERRO = "erro";
 /**
  * @param type 類型
  * @param key key值
  * @return 錯(cuò)誤次數(shù)
  **/
 @Override
 public String getErrNumFromRedis(String type,String key) {
  if(type==null || key == null){
   return null;
  }else {
   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
   return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
  }
 }

 /**
  * @param type 錯(cuò)誤類型
  * @param key key值
  * @param value 存儲(chǔ)值
  **/
 @Override
 public void setErrNumToRedis(String type, String key,String value) {
  try {
   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
   valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
  }catch (Exception e){
   logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key為%s存入redis失敗",key));
  }
 }

這里我指定了該bean的名稱,則在bolt執(zhí)行prepare時(shí):使用getbean方法獲取了相關(guān)bean就能完成相應(yīng)的操作.

然后kafka訂閱主題發(fā)送至我bolt進(jìn)行相關(guān)的處理.而這里getbean的方法是在啟動(dòng)bootmain函數(shù)定義:

@SpringBootApplication
@EnableTransactionManagement
@ComponentScan({"service","storm"})
@EnableMongoRepositories(basePackages = {"storm"})
@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@ImportResource(locations = {
 "classpath:/configs/spring-hadoop.xml",
 "classpath:/configs/spring-hbase.xml"})
public class StormLauncher extends SpringBootServletInitializer {
 //設(shè)置 安全線程launcher實(shí)例
 private volatile static StormLauncher stormLauncher;
 //設(shè)置上下文
 private ApplicationContext context;
 public static void main(String[] args) {
  SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
  // application.web(false).run(args);該方式是spring boot不以web形式啟動(dòng)
 application.run(args);
 StormLauncher s = new StormLauncher();
 s.setApplicationContext(application.context());
 setStormLauncher(s);
 }

 private static void setStormLauncher(StormLauncher stormLauncher) {
 StormLauncher.stormLauncher = stormLauncher;
 }
 public static StormLauncher getStormLauncher() {
 return stormLauncher;
 }

 @Override
 protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
 return application.sources(StormLauncher.class);
 }

 /**
 * 獲取上下文
 *
 * @return the application context
 */
 public ApplicationContext getApplicationContext() {
 return context;
 }

 /**
 * 設(shè)置上下文.
 *
 * @param appContext 上下文
 */
 private void setApplicationContext(ApplicationContext appContext) {
 this.context = appContext;
 }

 /**
 * 通過(guò)自定義name獲取 實(shí)例 Bean.
 *
 * @param name the name
 * @return the bean
 */
 public Object getBean(String name) {
 return context.getBean(name);
 }

 /**
 * 通過(guò)class獲取Bean.
 *
 * @param <T> the type parameter
 * @param clazz the clazz
 * @return the bean
 */
 public <T> T getBean(Class<T> clazz) {
 return context.getBean(clazz);
 }

 /**
 * 通過(guò)name,以及Clazz返回指定的Bean
 *
 * @param <T> the type parameter
 * @param name the name
 * @param clazz the clazz
 * @return the bean
 */
 public <T> T getBean(String name, Class<T> clazz) {
 return context.getBean(name, clazz);
 }

到此集成storm 和kafka至spring boot已經(jīng)結(jié)束了,相關(guān)kafka及其他配置我會(huì)放入github上面

對(duì)了這里還有一個(gè)kafkaclient的坑:

Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.

項(xiàng)目會(huì)報(bào)kafka client 問(wèn)題,這是因?yàn)閟torm-kafka中,kafka使用的是0.8版本,而NetworkSend是0.9以上的版本,這里集成需要與你集成的kafka相關(guān)版本一致.

雖然集成比較簡(jiǎn)單,但是參考都比較少,加之剛開(kāi)始接觸storm所以思考比較多,也在這記錄一下.

項(xiàng)目地址 - github

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

  • spring?boot使用@Async注解解決異步多線程入庫(kù)的問(wèn)題

    spring?boot使用@Async注解解決異步多線程入庫(kù)的問(wèn)題

    最近在寫(xiě)項(xiàng)目是需要添加異步操作來(lái)提高效率,所以下面這篇文章主要給大家介紹了關(guān)于spring?boot使用@Async注解解決異步多線程入庫(kù)問(wèn)題的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-05-05
  • Java大文件上傳詳解及實(shí)例代碼

    Java大文件上傳詳解及實(shí)例代碼

    這篇文章主要介紹了Java大文件上傳詳解及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下
    2017-02-02
  • 解讀yml文件中配置時(shí)間類型的轉(zhuǎn)換方式

    解讀yml文件中配置時(shí)間類型的轉(zhuǎn)換方式

    這篇文章主要介紹了yml文件中配置時(shí)間類型的轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • java編寫(xiě)一個(gè)花名隨機(jī)抽取器的實(shí)現(xiàn)示例

    java編寫(xiě)一個(gè)花名隨機(jī)抽取器的實(shí)現(xiàn)示例

    這篇文章主要介紹了java編寫(xiě)一個(gè)花名隨機(jī)抽取器的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-03-03
  • maven項(xiàng)目打jar包并包含所有依賴詳細(xì)教程

    maven項(xiàng)目打jar包并包含所有依賴詳細(xì)教程

    maven打包生成的普通jar包,只包含該工程下源碼編譯結(jié)果,不包含依賴內(nèi)容,下面這篇文章主要給大家介紹了關(guān)于maven項(xiàng)目打jar包并包含所有依賴的相關(guān)資料,需要的朋友可以參考下
    2023-05-05
  • java正則表達(dá)式應(yīng)用的實(shí)例代碼

    java正則表達(dá)式應(yīng)用的實(shí)例代碼

    java正則的實(shí)例應(yīng)用分析,大家從下面的代碼中,就能知道java正則的應(yīng)用與寫(xiě)法
    2008-10-10
  • 史上最佳springboot Locale 國(guó)際化方案

    史上最佳springboot Locale 國(guó)際化方案

    今天給大家分享史上最佳springboot Locale 國(guó)際化方案,本文通過(guò)實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2021-08-08
  • 一文搞懂Java正則表達(dá)式的使用

    一文搞懂Java正則表達(dá)式的使用

    正則表達(dá)式,又稱規(guī)則表達(dá)式,是一種文本模式。正則表達(dá)式使用單個(gè)字符串來(lái)描述、匹配一系列匹配某個(gè)句法規(guī)則的字符串,通常被用來(lái)檢索、替換那些符合某個(gè)模式(規(guī)則)的文本。本文將通過(guò)示例為大家詳細(xì)說(shuō)說(shuō)Java正則表達(dá)式的使用,感興趣的可以了解一下
    2022-08-08
  • 詳解Spring事務(wù)回滾的兩種方法

    詳解Spring事務(wù)回滾的兩種方法

    Spring事務(wù)回滾的前提是你當(dāng)前使用的數(shù)據(jù)庫(kù)必須支持事務(wù),比如MySQL的Innodb是支持的,但Mysaim則是不支持事務(wù)的,本文就給大家介紹兩種Spring事務(wù)回滾的方法,需要的朋友可以參考下
    2023-07-07
  • mybatis?example如何自動(dòng)生成代碼?排序語(yǔ)句

    mybatis?example如何自動(dòng)生成代碼?排序語(yǔ)句

    這篇文章主要介紹了mybatis?example如何自動(dòng)生成代碼?排序語(yǔ)句,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12

最新評(píng)論