欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring boot整合spring-kafka實(shí)現(xiàn)發(fā)送接收消息實(shí)例代碼

 更新時(shí)間:2017年06月29日 11:34:27   作者:honway  
這篇文章主要給大家介紹了關(guān)于spring-boot整合spring-kafka實(shí)現(xiàn)發(fā)送接收消息的相關(guān)資料,文中介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面跟著小編一起來看看吧。

前言

由于我們的新項(xiàng)目使用的是spring-boot,而又要同步新項(xiàng)目中建的數(shù)據(jù)到老的系統(tǒng)當(dāng)中.原來已經(jīng)有一部分的同步代碼,使用的是kafka. 其實(shí)只是做數(shù)據(jù)的同步,我覺得選MQ沒必要使用kafka.首先數(shù)據(jù)量不大,其實(shí)搞kafka又要搞集群,ZK.只是用做一些簡單數(shù)據(jù)同步的話,有點(diǎn)大材小用.

沒辦法,咱只是個(gè)打工的,領(lǐng)導(dǎo)讓搞就搞吧.剛開始的時(shí)候發(fā)現(xiàn)有一個(gè)spring-integration-kafka,描述中說是基于spring-kafka做了一次重寫.但是我看了官方文檔.實(shí)在是搞的有點(diǎn)頭大.功能一直沒實(shí)現(xiàn).文檔寫的也不是很漂亮,也可能是剛起步,有很多的問題.我這里只能放棄了,使用了spring-kafka.

實(shí)現(xiàn)方法

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 
 <groupId>org.linuxsogood.sync</groupId>
 <artifactId>linuxsogood-sync</artifactId>
 <version>1.0.0-SNAPSHOT</version>
 
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.4.0.RELEASE</version>
 </parent>
 
 <properties>
  <java.version>1.8</java.version>
  <!-- 依賴版本 -->
  <mybatis.version>3.3.1</mybatis.version>
  <mybatis.spring.version>1.2.4</mybatis.spring.version>
  <mapper.version>3.3.6</mapper.version>
  <pagehelper.version>4.1.1</pagehelper.version>
 </properties>
 
 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-freemarker</artifactId>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-kafka</artifactId>
   <version>2.0.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>4.3.1.RELEASE</version>
   <scope>compile</scope>
  </dependency>-->
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>
  <!--<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>1.1.0.RELEASE</version>
  </dependency>-->
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.assertj</groupId>
   <artifactId>assertj-core</artifactId>
   <version>3.5.2</version>
  </dependency>
  <dependency>
   <groupId>org.hamcrest</groupId>
   <artifactId>hamcrest-all</artifactId>
   <version>1.3</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.mockito</groupId>
   <artifactId>mockito-all</artifactId>
   <version>1.9.5</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-test</artifactId>
   <version>4.2.3.RELEASE</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
  </dependency>
  <dependency>
   <groupId>com.microsoft.sqlserver</groupId>
   <artifactId>sqljdbc4</artifactId>
   <version>4.0.0</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
   <version>1.0.11</version>
  </dependency>
 
  <!--Mybatis-->
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis</artifactId>
   <version>${mybatis.version}</version>
  </dependency>
  <dependency>
   <groupId>org.mybatis</groupId>
   <artifactId>mybatis-spring</artifactId>
   <version>${mybatis.spring.version}</version>
  </dependency>
  <!--<dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
   <version>1.1.1</version>
  </dependency>-->
  <!-- Mybatis Generator -->
  <dependency>
   <groupId>org.mybatis.generator</groupId>
   <artifactId>mybatis-generator-core</artifactId>
   <version>1.3.2</version>
   <scope>compile</scope>
   <optional>true</optional>
  </dependency>
  <!--分頁插件-->
  <dependency>
   <groupId>com.github.pagehelper</groupId>
   <artifactId>pagehelper</artifactId>
   <version>${pagehelper.version}</version>
  </dependency>
  <!--通用Mapper-->
  <dependency>
   <groupId>tk.mybatis</groupId>
   <artifactId>mapper</artifactId>
   <version>${mapper.version}</version>
  </dependency>
  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>1.2.17</version>
  </dependency>
 </dependencies>
 <repositories>
  <repository>
   <id>repo.spring.io.milestone</id>
   <name>Spring Framework Maven Milestone Repository</name>
   <url>https://repo.spring.io/libs-milestone</url>
  </repository>
 </repositories>
 <build>
  <finalName>mybatis_generator</finalName>
  <plugins>
   <plugin>
    <groupId>org.mybatis.generator</groupId>
    <artifactId>mybatis-generator-maven-plugin</artifactId>
    <version>1.3.2</version>
    <configuration>
     <verbose>true</verbose>
     <overwrite>true</overwrite>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
     <mainClass>org.linuxsogood.sync.Starter</mainClass>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>

orm層使用了MyBatis,又使用了通用Mapper和分頁插件.

kafka消費(fèi)端配置

import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 
 @Value("${kafka.broker.address}")
 private String brokerAddress;
 
 @Bean
 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
 factory.setConsumerFactory(consumerFactory());
 factory.setConcurrency(3);
 factory.getContainerProperties().setPollTimeout(3000);
 return factory;
 }
 
 @Bean
 public ConsumerFactory<String, String> consumerFactory() {
 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }
 
 @Bean
 public Map<String, Object> consumerConfigs() {
 Map<String, Object> propsMap = new HashMap<>();
 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 return propsMap;
 }
 
 @Bean
 public Listener listener() {
 return new Listener();
 }
}

生產(chǎn)者的配置.

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaProducerConfig {
 
 @Value("${kafka.broker.address}")
 private String brokerAddress;
 
 @Bean
 public ProducerFactory<String, String> producerFactory() {
 return new DefaultKafkaProducerFactory<>(producerConfigs());
 }
 
 @Bean
 public Map<String, Object> producerConfigs() {
 Map<String, Object> props = new HashMap<>();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
 props.put(ProducerConfig.RETRIES_CONFIG, 0);
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 return props;
 }
 
 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
 return new KafkaTemplate<String, String>(producerFactory());
 }
}

監(jiān)聽,監(jiān)聽里面,寫的就是業(yè)務(wù)邏輯了,從kafka里面得到數(shù)據(jù)后,具體怎么去處理. 如果需要開啟kafka處理消息的廣播模式,多個(gè)監(jiān)聽要監(jiān)聽不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個(gè)監(jiān)聽里的group寫的一樣,就會(huì)造成只有一個(gè)監(jiān)聽能處理其中的消息,另外監(jiān)聽就不能處理消息了.也即是kafka的分布式消息處理方式.

在同一個(gè)group里的監(jiān)聽,共同處理接收到的消息,會(huì)根據(jù)一定的算法來處理.如果不在一個(gè)組,但是監(jiān)聽的是同一個(gè)topic的話,就會(huì)形成廣播模式

import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;
import java.util.Optional;

public class Listener {

 private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);

 @Autowired
 private StoreMapper storeMapper;

 /**
  * 監(jiān)聽kafka消息,如果有消息則消費(fèi),同步數(shù)據(jù)到新烽火的庫
  * @param record 消息實(shí)體bean
  */
 @KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
 public void listen(ConsumerRecord<?, ?> record) {
  Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   try {
    MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
    CupMessageType type = messageWrapper.getType();
    //判斷消息的數(shù)據(jù)類型,不同的數(shù)據(jù)入不同的表
    if (CupMessageType.STORE == type) {
     proceedStore(messageWrapper);
    }
   } catch (Exception e) {
    LOGGER.error("將接收到的消息保存到數(shù)據(jù)庫時(shí)異常, 消息:{}, 異常:{}",message.toString(),e);
   }
  }
 }

 /**
  * 消息是店鋪類型,店鋪消息處理入庫
  * @param messageWrapper 從kafka中得到的消息
  */
 private void proceedStore(MessageWrapper messageWrapper) {
  Object data = messageWrapper.getData();
  Store cupStore = JSON.parseObject(data.toString(), Store.class);
  StoreExample storeExample = new StoreExample();
  String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
  storeExample.createCriteria().andStoreNameEqualTo(storeName);
  List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);
  org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
  org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
  //如果查詢不到記錄則新增
  if (stores.size() == 0) {
   storeMapper.insert(store);
  } else {
   store.setStoreId(stores.get(0).getStoreId());
   storeMapper.updateByPrimaryKey(store);
  }
 }

}

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

  • SpringBoot基于Swagger2構(gòu)建API文檔過程解析

    SpringBoot基于Swagger2構(gòu)建API文檔過程解析

    這篇文章主要介紹了SpringBoot基于Swagger2構(gòu)建API文檔過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Java自定義協(xié)議報(bào)文封裝 添加Crc32校驗(yàn)的實(shí)例

    Java自定義協(xié)議報(bào)文封裝 添加Crc32校驗(yàn)的實(shí)例

    下面小編就為大家分享一篇Java自定義協(xié)議報(bào)文封裝 添加Crc32校驗(yàn)的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2018-01-01
  • Java建造者設(shè)計(jì)模式詳解

    Java建造者設(shè)計(jì)模式詳解

    這篇文章主要為大家詳細(xì)介紹了Java建造者設(shè)計(jì)模式,對(duì)建造者設(shè)計(jì)模式進(jìn)行分析理解,感興趣的小伙伴們可以參考一下
    2016-02-02
  • Java反轉(zhuǎn)鏈表測(cè)試過程介紹

    Java反轉(zhuǎn)鏈表測(cè)試過程介紹

    這篇文章主要介紹了Java反轉(zhuǎn)鏈表測(cè)試過程,學(xué)習(xí)過數(shù)據(jù)結(jié)構(gòu)的小伙伴們,對(duì)鏈表想來是并不陌生。本篇文章將為大家介紹幾種在Java語言當(dāng)中,實(shí)現(xiàn)鏈表反轉(zhuǎn)的幾種方法,以下是具體內(nèi)容
    2023-04-04
  • java顯示當(dāng)前美國洛杉磯時(shí)間

    java顯示當(dāng)前美國洛杉磯時(shí)間

    這篇文章主要介紹了java顯示當(dāng)前美國洛杉磯時(shí)間的方法,也就是當(dāng)前時(shí)間的切換,需要的朋友可以參考下
    2014-02-02
  • Spring中的@ExceptionHandler異常攔截器

    Spring中的@ExceptionHandler異常攔截器

    這篇文章主要介紹了Spring中的@ExceptionHandler異常攔截器,Spring的@ExceptionHandler可以用來統(tǒng)一處理方法拋出的異常,給方法加上@ExceptionHandler注解,這個(gè)方法就會(huì)處理類中其他方法拋出的異常,需要的朋友可以參考下
    2024-01-01
  • RocketMQ設(shè)計(jì)之異步刷盤

    RocketMQ設(shè)計(jì)之異步刷盤

    本文介紹RocketMQ設(shè)計(jì)之異步刷盤,RocketMQ消息存儲(chǔ)到磁盤上,這樣既保證斷電后恢復(fù),也讓存儲(chǔ)消息量超出內(nèi)存限制,RocketMQ為了提高性能,會(huì)盡可能保證磁盤順序?qū)?消息通過Producer寫入RocketMQ的時(shí)候,有兩種方式,上篇介紹了同步刷盤,本文介紹異步刷盤,需要的朋友可以參考下
    2022-03-03
  • mybatis?plus新增(insert)數(shù)據(jù)獲取主鍵id的問題

    mybatis?plus新增(insert)數(shù)據(jù)獲取主鍵id的問題

    這篇文章主要介紹了mybatis?plus新增(insert)數(shù)據(jù)獲取主鍵id的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • springboot定時(shí)任務(wù)@Scheduled執(zhí)行多次的問題

    springboot定時(shí)任務(wù)@Scheduled執(zhí)行多次的問題

    這篇文章主要介紹了springboot定時(shí)任務(wù)@Scheduled執(zhí)行多次問題的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • 一文了解為什么Java中只有值傳遞

    一文了解為什么Java中只有值傳遞

    Java?傳參是值傳遞還是引用傳遞?這個(gè)問題很基礎(chǔ),但是許多人都有點(diǎn)懵。本文就來通過一些示例帶大家詳細(xì)了解一下,需要的可以參考一下
    2022-07-07

最新評(píng)論