spring boot整合spring-kafka實(shí)現(xiàn)發(fā)送接收消息實(shí)例代碼
前言
由于我們的新項(xiàng)目使用的是spring-boot,而又要同步新項(xiàng)目中建的數(shù)據(jù)到老的系統(tǒng)當(dāng)中.原來已經(jīng)有一部分的同步代碼,使用的是kafka. 其實(shí)只是做數(shù)據(jù)的同步,我覺得選MQ沒必要使用kafka.首先數(shù)據(jù)量不大,其實(shí)搞kafka又要搞集群,ZK.只是用做一些簡單數(shù)據(jù)同步的話,有點(diǎn)大材小用.
沒辦法,咱只是個(gè)打工的,領(lǐng)導(dǎo)讓搞就搞吧.剛開始的時(shí)候發(fā)現(xiàn)有一個(gè)spring-integration-kafka,描述中說是基于spring-kafka做了一次重寫.但是我看了官方文檔.實(shí)在是搞的有點(diǎn)頭大.功能一直沒實(shí)現(xiàn).文檔寫的也不是很漂亮,也可能是剛起步,有很多的問題.我這里只能放棄了,使用了spring-kafka.
實(shí)現(xiàn)方法
pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.linuxsogood.sync</groupId> <artifactId>linuxsogood-sync</artifactId> <version>1.0.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <!-- 依賴版本 --> <mybatis.version>3.3.1</mybatis.version> <mybatis.spring.version>1.2.4</mybatis.spring.version> <mapper.version>3.3.6</mapper.version> <pagehelper.version>4.1.1</pagehelper.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <!--<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.0.1.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.3.1.RELEASE</version> <scope>compile</scope> </dependency>--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency> <!--<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.0.RELEASE</version> </dependency>--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <version>1.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.9.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.2.3.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>sqljdbc4</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.11</version> </dependency> <!--Mybatis--> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>${mybatis.version}</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>${mybatis.spring.version}</version> </dependency> <!--<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency>--> <!-- Mybatis Generator --> <dependency> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-core</artifactId> <version>1.3.2</version> <scope>compile</scope> <optional>true</optional> </dependency> <!--分頁插件--> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper</artifactId> <version>${pagehelper.version}</version> </dependency> <!--通用Mapper--> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>${mapper.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.17</version> </dependency> </dependencies> <repositories> <repository> <id>repo.spring.io.milestone</id> <name>Spring Framework Maven Milestone Repository</name> <url>https://repo.spring.io/libs-milestone</url> </repository> </repositories> <build> <finalName>mybatis_generator</finalName> <plugins> <plugin> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> <version>1.3.2</version> <configuration> <verbose>true</verbose> <overwrite>true</overwrite> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>org.linuxsogood.sync.Starter</mainClass> </configuration> </plugin> </plugins> </build> </project>
orm層使用了MyBatis,又使用了通用Mapper和分頁插件.
kafka消費(fèi)端配置
import org.linuxsogood.sync.listener.Listener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.broker.address}") private String brokerAddress; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } @Bean public Listener listener() { return new Listener(); } }
生產(chǎn)者的配置.
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.broker.address}") private String brokerAddress; @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
監(jiān)聽,監(jiān)聽里面,寫的就是業(yè)務(wù)邏輯了,從kafka里面得到數(shù)據(jù)后,具體怎么去處理. 如果需要開啟kafka處理消息的廣播模式,多個(gè)監(jiān)聽要監(jiān)聽不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個(gè)監(jiān)聽里的group寫的一樣,就會(huì)造成只有一個(gè)監(jiān)聽能處理其中的消息,另外監(jiān)聽就不能處理消息了.也即是kafka的分布式消息處理方式.
在同一個(gè)group里的監(jiān)聽,共同處理接收到的消息,會(huì)根據(jù)一定的算法來處理.如果不在一個(gè)組,但是監(jiān)聽的是同一個(gè)topic的話,就會(huì)形成廣播模式
import com.alibaba.fastjson.JSON; import org.linuxsogood.qilian.enums.CupMessageType; import org.linuxsogood.qilian.kafka.MessageWrapper; import org.linuxsogood.qilian.model.store.Store; import org.linuxsogood.sync.mapper.StoreMapper; import org.linuxsogood.sync.model.StoreExample; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import java.util.List; import java.util.Optional; public class Listener { private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class); @Autowired private StoreMapper storeMapper; /** * 監(jiān)聽kafka消息,如果有消息則消費(fèi),同步數(shù)據(jù)到新烽火的庫 * @param record 消息實(shí)體bean */ @KafkaListener(topics = "linuxsogood-topic", group = "sync-group") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); try { MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class); CupMessageType type = messageWrapper.getType(); //判斷消息的數(shù)據(jù)類型,不同的數(shù)據(jù)入不同的表 if (CupMessageType.STORE == type) { proceedStore(messageWrapper); } } catch (Exception e) { LOGGER.error("將接收到的消息保存到數(shù)據(jù)庫時(shí)異常, 消息:{}, 異常:{}",message.toString(),e); } } } /** * 消息是店鋪類型,店鋪消息處理入庫 * @param messageWrapper 從kafka中得到的消息 */ private void proceedStore(MessageWrapper messageWrapper) { Object data = messageWrapper.getData(); Store cupStore = JSON.parseObject(data.toString(), Store.class); StoreExample storeExample = new StoreExample(); String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName(); storeExample.createCriteria().andStoreNameEqualTo(storeName); List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample); org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store(); org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore); //如果查詢不到記錄則新增 if (stores.size() == 0) { storeMapper.insert(store); } else { store.setStoreId(stores.get(0).getStoreId()); storeMapper.updateByPrimaryKey(store); } } }
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
SpringBoot基于Swagger2構(gòu)建API文檔過程解析
這篇文章主要介紹了SpringBoot基于Swagger2構(gòu)建API文檔過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Java自定義協(xié)議報(bào)文封裝 添加Crc32校驗(yàn)的實(shí)例
下面小編就為大家分享一篇Java自定義協(xié)議報(bào)文封裝 添加Crc32校驗(yàn)的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01Spring中的@ExceptionHandler異常攔截器
這篇文章主要介紹了Spring中的@ExceptionHandler異常攔截器,Spring的@ExceptionHandler可以用來統(tǒng)一處理方法拋出的異常,給方法加上@ExceptionHandler注解,這個(gè)方法就會(huì)處理類中其他方法拋出的異常,需要的朋友可以參考下2024-01-01mybatis?plus新增(insert)數(shù)據(jù)獲取主鍵id的問題
這篇文章主要介紹了mybatis?plus新增(insert)數(shù)據(jù)獲取主鍵id的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03springboot定時(shí)任務(wù)@Scheduled執(zhí)行多次的問題
這篇文章主要介紹了springboot定時(shí)任務(wù)@Scheduled執(zhí)行多次問題的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10