SpringBoot如何集成Kafka低版本和高版本
說(shuō)明
這里之所以集成低版本和新版本,是因?yàn)樵谄髽I(yè)開(kāi)發(fā)中,有的SpringBoot項(xiàng)目版本很低,像我這個(gè)項(xiàng)目版本就很低,是1.4.2.RELEASE版本,而新版本即高版本就是用來(lái)自己學(xué)習(xí)的。
這里主要告訴大家,版本一定要根據(jù)自己的項(xiàng)目版本選擇對(duì)應(yīng)的kafka版本。
地址
官網(wǎng)地址:https://spring.io/projects/spring-kafka#overview
maven倉(cāng)庫(kù)spring-kafka地址:https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
官網(wǎng)對(duì)應(yīng)版本圖:
低版本SpringBoot集成Kafka代碼
linux本地服務(wù)器zookeeper和kafka使用版本:
springboot版本和使用的spring版本:
使用的spring-kafka版本:
這里我SpringBoot版本是1.4.2.RELEASE版本,版本很低,官網(wǎng)顯示的SpringBoot版本最低是1.5.x,可以使用1.3.x的版本,很明顯我的這個(gè)不在官網(wǎng)給的范圍內(nèi),然后我的spring版本是4.3.9.RELEASE,這里我在上面這個(gè)maven倉(cāng)庫(kù)spring-kafka地址里面看了一個(gè)1.3.0版本,如下:
直到我往下繼續(xù)找,終于發(fā)現(xiàn)1.2.2.RELEASE這個(gè)版本是與我項(xiàng)目對(duì)應(yīng)的。
剛好這個(gè)版本對(duì)應(yīng)的spring版本是4.3.9.RELEASE與我項(xiàng)目的spring版本一致,于是我就使用了這個(gè)spring-kafka版本。
好了,這里怎么選擇版本就說(shuō)到這里,下面是代碼。
代碼
這里之所以是在Java類里面寫生產(chǎn)者和消費(fèi)者配置,是因?yàn)閟pringboot和kafka集成版本太低,不支持直接在application.yml里面配置,好像springboot高版本至少2.幾的版本可以直接在application.yml里面配置,至于2.幾的版本才支持我給忘記了,有知道的小伙伴麻煩告訴下我,謝謝了。
kafka生產(chǎn)者配置
這里是帶用戶名密碼協(xié)議配置,最下面三個(gè)就是,協(xié)議類型為:SASL/SCRAM-SHA-256,如果你們那里的kafka配置沒(méi)有設(shè)置這個(gè),可以不需要配置最下面三個(gè)。
企業(yè)開(kāi)發(fā)一般需要進(jìn)行認(rèn)證才能發(fā)送消息。
package com.gmcc.project.controllers.kafka; import lombok.Data; import org.springframework.context.annotation.Configuration; //kafka生產(chǎn)者參數(shù)配置 @Data @Configuration public class KafkaProducerProperties { //指定kafka 代理地址,多個(gè)地址用英文逗號(hào)隔開(kāi) private String bootstrapServers="192.168.11.111:9092,192.168.11.112:9093";//本地測(cè)試kafka使用 //消息重發(fā)次數(shù),如果配置了事務(wù),則不能為0,改為1 private int retries=0; //每次批量發(fā)送消息的數(shù)量 private String batchSize="16384"; //默認(rèn)值為0,意思就是說(shuō)消息必須立即被發(fā)送,但這樣會(huì)影響性能 //一般設(shè)置10毫秒左右,這個(gè)消息發(fā)送完后會(huì)進(jìn)入本地的一個(gè)batch,如果10毫秒內(nèi)這個(gè)batch滿了16kb就會(huì)隨batch一起發(fā)送出去 private String lingerMs="10"; //生產(chǎn)者最大可發(fā)送的消息大小,內(nèi)有多個(gè)batch,一旦滿了,只有發(fā)送到kafka后才能空出位置,否則阻塞接收新消息 private String bufferMemory="33554432"; //指定消息key和消息體的編解碼方式 private String keySerializer="org.apache.kafka.common.serialization.StringSerializer"; private String valueSerializer="org.apache.kafka.common.serialization.StringSerializer"; //確認(rèn)等級(jí)ack,kafka生產(chǎn)端最重要的選項(xiàng),如果配置了事務(wù),那必須是-1或者all //acks=0,生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng) //acks=1,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器成功響應(yīng) //acks=-1,表示分區(qū)leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認(rèn)為product請(qǐng)求成功。這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的 private String acks="1"; //協(xié)議類型,為SASL類型 private String securityProtocol="SASL_PLAINTEXT"; //協(xié)議 private String saslMechanism="SCRAM-SHA-256"; //用戶名密碼配置 private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=root password=123456;"; }
然后再創(chuàng)建一個(gè)config使kafka生產(chǎn)者配置生效。
如果kafka配置文件沒(méi)有設(shè)置用戶名密碼協(xié)議,注釋掉最下面三個(gè)即可。
package com.gmcc.project.controllers.config; import com.gmcc.project.controllers.kafka.KafkaProducerProperties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProductConfig { @Autowired private KafkaProducerProperties producerProperties; @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerProperties.getBootstrapServers()); props.put(ProducerConfig.RETRIES_CONFIG, producerProperties.getRetries()); props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerProperties.getBatchSize()); props.put(ProducerConfig.LINGER_MS_CONFIG, producerProperties.getLingerMs()); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerProperties.getBufferMemory()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProperties.getKeySerializer()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerProperties.getValueSerializer()); props.put(ProducerConfig.ACKS_CONFIG, producerProperties.getAcks()); //props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, producerProperties.getSecurityProtocol()); //props.put(SaslConfigs.SASL_MECHANISM, producerProperties.getSaslMechanism()); //props.put(SaslConfigs.SASL_JAAS_CONFIG, producerProperties.getSaslJaas()); return props; } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
kafka消費(fèi)者配置
如果kafka配置文件沒(méi)有配置用戶名密碼協(xié)議,認(rèn)證后才能消費(fèi)消息,可以將最下面的三個(gè)注釋掉不使用。
package com.gmcc.project.controllers.kafka; import lombok.Data; import org.springframework.context.annotation.Configuration; //kafka消費(fèi)者配置 @Data @Configuration public class KafkaConsumerProperties { //指定kafka 代理地址,多個(gè)地址用英文逗號(hào)隔開(kāi) private String bootstrapServers="192.168.11.111:9092,192.168.11.112:9093";//本地測(cè)試kafka使用 //指定默認(rèn)消費(fèi)者group id,消費(fèi)者監(jiān)聽(tīng)到的也是這個(gè) private String groupId="test-consumer-group";//本地測(cè)試使用 //消費(fèi)者在讀取一個(gè)沒(méi)有offset的分區(qū)或者offset無(wú)效時(shí)的策略,默認(rèn)earliest是從頭讀,latest不是從頭讀 private String autoOffsetReset="earliest"; //是否自動(dòng)提交偏移量offset,默認(rèn)為true,一般是false,如果為false,則auto-commit-interval屬性就會(huì)無(wú)效 private boolean enableAutoCommit=true; //自動(dòng)提交間隔時(shí)間,接收到消息后多久會(huì)提交offset,前提需要開(kāi)啟自動(dòng)提交,也就是enable-auto-commit設(shè)置為true,默認(rèn)單位是毫秒(ms),如果寫10s,最后加載的顯示值為10000ms,需要符合特定時(shí)間格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小時(shí),天) private String autoCommitInterval="1000"; //指定消息key和消息體的編解碼方式 private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer"; private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer"; //批量消費(fèi)每次最多消費(fèi)多少條信息 private String maxPollRecords="50"; //協(xié)議類型,為SASL類型 private String securityProtocol="SASL_PLAINTEXT"; //協(xié)議 private String saslMechanism="SCRAM-SHA-256"; //用戶名密碼配置 private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=root password=123456;"; }
然后再創(chuàng)建一個(gè)config使kafka消費(fèi)者配置生效。如果kafka沒(méi)有設(shè)置用戶名密碼協(xié)議,注釋掉最下面三個(gè)即可。
package com.gmcc.project.controllers.config; import com.gmcc.project.controllers.kafka.KafkaConsumerProperties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Autowired private KafkaConsumerProperties consumerProperties; @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(false);//這里為true的時(shí)候,KafkaConsumer那里需要使用批量消費(fèi)方法,不然報(bào)錯(cuò) return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerProperties.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerProperties.getGroupId()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerProperties.getAutoOffsetReset()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerProperties.isEnableAutoCommit()); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerProperties.getAutoCommitInterval()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProperties.getKeyDeserializerClass()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProperties.getValueDeserializerClass()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerProperties.getMaxPollRecords()); //props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, consumerProperties.getSecurityProtocol()); //props.put(SaslConfigs.SASL_MECHANISM, consumerProperties.getSaslMechanism()); //props.put(SaslConfigs.SASL_JAAS_CONFIG, consumerProperties.getSaslJaas()); return props; } }
發(fā)送消息給kafka的Controller代碼
這里使用addCallback這個(gè)方法,是可以在生產(chǎn)者發(fā)送消息給kafka時(shí),如果生產(chǎn)者配置有問(wèn)題或者服務(wù)有問(wèn)題,我可以直接看到接口返回結(jié)果,所以沒(méi)有直接這樣kafkaTemplate.send(“first”,data);寫。
package com.gmcc.project.controllers.kafka; import com.gmcc.project.core.utils.StringUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; //kafka生產(chǎn)者 @RestController @RequestMapping("kafkaProducer") public class KafkaProducerController { @Resource private KafkaTemplate<String,String> kafkaTemplate; //向kafka發(fā)送消息 @RequestMapping(value = "/sendFileMd5", method = RequestMethod.POST) public Map<String, Object> sendFileMd5(@RequestParam(value = "fileMd5", required = false) String fileMd5, @RequestParam(value = "uuid", required = false) String uuid){ Map<String, Object> returnMap = new HashMap<>(); //寫在success里面只會(huì)返回一次,第二次就給你返回一個(gè)空map對(duì)象 returnMap.put("message", "發(fā)送消息成功!"); returnMap.put("result", null); returnMap.put("status", "200"); //非空判斷 if(StringUtils.isBlank(fileMd5)) { returnMap.put("message", "fileMd5不能為空!"); returnMap.put("result", ""); returnMap.put("status", "999"); return returnMap; } if(StringUtils.isBlank(uuid)) { returnMap.put("message", "uuid不能為空!"); returnMap.put("result", ""); returnMap.put("status", "999"); return returnMap; } try{ //需要發(fā)送的消息 String data="{\"file_md5\":\""+fileMd5+"\",\"uuid\":\""+uuid+"\",\"vendor\":\"etone\",\"model\":\"5g信令回放\"}"; //pro環(huán)境使用topic為test_sample_get //本地測(cè)試使用,向topic為first發(fā)送消息 kafkaTemplate.send("first",data).addCallback(success -> { // 消息發(fā)送到的topic String topic = success.getRecordMetadata().topic(); // 消息發(fā)送到的分區(qū) int partition = success.getRecordMetadata().partition(); // 消息在分區(qū)內(nèi)的offset long offset = success.getRecordMetadata().offset(); System.out.println("發(fā)送消息成功:"+data+",主題:"+topic+",分區(qū):"+partition+",偏移量:"+offset); }, failure -> { returnMap.put("message", "發(fā)送消息失敗:" + failure.getMessage()); returnMap.put("result", null); returnMap.put("status", "500"); }); }catch (Exception e){ returnMap.put("message", e.getMessage()); returnMap.put("result", null); returnMap.put("status", "500"); } return returnMap; } }
消費(fèi)者消費(fèi)代碼
package com.gmcc.project.controllers.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { //逐條消費(fèi) @KafkaListener(topics = "first") //@KafkaListener(topics = "test_sample_return") public void onMessage(ConsumerRecord<?,?> record){ try{ //消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容 System.out.println("消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value()); }catch (Exception e){ e.printStackTrace(); } } //批量消費(fèi)方法 /*@KafkaListener(topics = "first") public void onMessage(List<ConsumerRecord<?,?>> records){ System.out.println("消費(fèi)數(shù)量="+records.size()); for(ConsumerRecord<?,?> record:records){ //消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容 System.out.println("消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value()); } }*/ }
消費(fèi)到的消息:
這里面的uuid是集成了websocket需要用到,這里怎么集成websocket將消費(fèi)到的消息返回給客戶端等以后有時(shí)間了在另寫一個(gè)博客說(shuō)明。
高版本SpringBoot集成Kafka代碼
這里高版本可以供自己學(xué)習(xí)。高版本集成很簡(jiǎn)單,沒(méi)有低版本那么麻煩。
代碼結(jié)構(gòu):
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hjl</groupId> <artifactId>kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.6.2</version> </plugin> </plugins> </build> </project>
這里我的SpringBoot版本是2.6.2版本,spring-kafka版本是2.8.1版本。符合官網(wǎng)給的版本推薦。
如下:
application.yml文件
這里之所以可以在application.yml直接配置kafka,是因?yàn)閟pringboot和spring-kafka版本很高。
這里生產(chǎn)者配置和消費(fèi)者配置都在里面。
server: port: 8080 spring: kafka: # 指定kafka 代理地址,多個(gè)地址用英文逗號(hào)隔開(kāi) bootstrap-servers: 192.168.11.111:9092 #初始化生產(chǎn)者配置 producer: #消息重發(fā)次數(shù),如果配置了事務(wù),則不能為0,改為1 retries: 0 # 每次批量發(fā)送消息的數(shù)量 batch-size: 16384 #生產(chǎn)者最大可發(fā)送的消息大小,內(nèi)有多個(gè)batch,一旦滿了,只有發(fā)送到kafka后才能空出位置,否則阻塞接收新消息 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #確認(rèn)等級(jí)ack,kafka生產(chǎn)端最重要的選項(xiàng),如果配置了事務(wù),那必須是-1或者all #acks=0,生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng) #acks=1,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器成功響應(yīng) #acks=-1,表示分區(qū)leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認(rèn)為product請(qǐng)求成功。這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的 acks: all #配置事務(wù),名字隨便起 #transaction-id-prefix: hbz-transaction- #初始化消費(fèi)者配置 consumer: # 指定默認(rèn)消費(fèi)者group id,消費(fèi)者監(jiān)聽(tīng)到的也是這個(gè) group-id: test-consumer-group #消費(fèi)者在讀取一個(gè)沒(méi)有offset的分區(qū)或者offset無(wú)效時(shí)的策略,默認(rèn)earliest是從頭讀,latest不是從頭讀 auto-offset-reset: earliest #是否自動(dòng)提交偏移量offset,默認(rèn)為true,一般是false,如果為false,則auto-commit-interval屬性就會(huì)無(wú)效 enable-auto-commit: true #自動(dòng)提交間隔時(shí)間,接收到消息后多久會(huì)提交offset,前提需要開(kāi)啟自動(dòng)提交,也就是enable-auto-commit設(shè)置為true,默認(rèn)單位是毫秒(ms),如果寫10s,最后加載的顯示值為10000ms,需要符合特定時(shí)間格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小時(shí),天) auto-commit-interval: 1000 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer #批量消費(fèi)每次最多消費(fèi)多少條信息 max-poll-records: 50 #監(jiān)聽(tīng)器設(shè)置 listener: #消費(fèi)端監(jiān)聽(tīng)的topic不存在時(shí),項(xiàng)目啟動(dòng)會(huì)報(bào)錯(cuò)(關(guān)掉) missing-topics-fatal: false #設(shè)置消費(fèi)類型 批量消費(fèi)batch,單條消費(fèi)single type: batch #指定容器的線程數(shù),提高并發(fā)量,默認(rèn)為1 #concurrency: 3 #手動(dòng)提交偏移量,當(dāng)enable-auto-commit為true自動(dòng)提交時(shí),不需要設(shè)置改屬性 #ack-mode: manual
生產(chǎn)者發(fā)送消息代碼
package com.project.kafkademo.kafkaproduct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; //kafka生產(chǎn)者 @RestController @RequestMapping("kafka") public class KafkaProducer { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send(@RequestParam(value = "message", required = false) String message){ kafkaTemplate.send("first",message); return "success"; } }
消費(fèi)者消費(fèi)消息代碼
package com.project.kafkademo.kafkaconsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; @Component public class KafkaConsumer { //消費(fèi)監(jiān)聽(tīng),topics=監(jiān)聽(tīng)的主題名,groupId=分組,consumer.properties里面的group.id配置 //如果在這里直接寫groupId="test-consumer-group"會(huì)導(dǎo)致application.yml里面設(shè)置的group-id不起效 //最終會(huì)被這里的設(shè)置直接覆蓋掉,所以這里不應(yīng)該加groupId="test-consumer-group"這個(gè)屬性 //@KafkaListener(topics = "first",groupId="test-consumer-group") //這樣寫的話,application.yml里面設(shè)置的group-id就會(huì)生效,監(jiān)控的就是application.yml里面的了 //逐條消費(fèi) /*@KafkaListener(topics = "first") public void onMessage(ConsumerRecord<?,?> record){ //消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容 System.out.println("消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value()); }*/ //批量消費(fèi),用List批量接收消息,ConsumerRecord<?,?>只能單條消費(fèi)消息 /*@KafkaListener(topics = "first") public void onMessage(List<ConsumerRecord<?,?>> records){ System.out.println("消費(fèi)數(shù)量="+records.size()); for(ConsumerRecord<?,?> record:records){ //消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容 System.out.println("消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value()); } }*/ //批量消費(fèi),ConsumerRecords<?,?>用于批量消費(fèi)消息 @KafkaListener(topics = "first") public void onMessage(ConsumerRecords<?,?> records){ System.out.println("消費(fèi)數(shù)量="+records.count()); for(ConsumerRecord<?,?> record:records){ //消費(fèi)的哪個(gè)topic、partition(哪個(gè)分區(qū))的消息,打印出消息內(nèi)容 System.out.println("消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.key()+"-"+record.value()); } } }
效果
項(xiàng)目啟動(dòng)后,會(huì)打印出你配置的參數(shù)以及默認(rèn)配置的參數(shù)
postman接口測(cè)試:
后臺(tái)結(jié)果打印:
總結(jié)
好了,我的記錄就先到這里。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認(rèn)證
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- springboot使用kafka事務(wù)的示例代碼
- Kafka的安裝及接入SpringBoot的詳細(xì)過(guò)程
- springboot使用@KafkaListener監(jiān)聽(tīng)多個(gè)kafka配置實(shí)現(xiàn)
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法
相關(guān)文章
spring中@autowired、@Qualifier、@Primary注解的使用說(shuō)明
這篇文章主要介紹了spring中@autowired、@Qualifier、@Primary注解的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11SpringBoot實(shí)戰(zhàn):Spring如何找到對(duì)應(yīng)轉(zhuǎn)換器優(yōu)雅使用枚舉參數(shù)
這篇文章主要介紹了SpringBoot實(shí)戰(zhàn)中Spring是如何找到對(duì)應(yīng)轉(zhuǎn)換器優(yōu)雅的使用枚舉參數(shù),文中附有詳細(xì)的實(shí)例代碼有需要的朋友可以參考下,希望可以有所幫助2021-08-08Java設(shè)置PDF跨頁(yè)表格重復(fù)顯示表頭行的步驟詳解
這篇文章主要給大家介紹了關(guān)于Java設(shè)置PDF跨頁(yè)表格重復(fù)顯示表頭行的相關(guān)資料,這里使用的是Free Spire.PDF for Java的jar包,Spire.PDF for Java 是一款專門對(duì) PDF 文檔進(jìn)行操作的 Java 類庫(kù),需要的朋友可以參考下2021-07-07Java使用OCR技術(shù)識(shí)別驗(yàn)證碼實(shí)現(xiàn)自動(dòng)化登陸方法
在本篇文章里小編給大家分享的是關(guān)于Java 如何使用 OCR 技術(shù)識(shí)別驗(yàn)證碼實(shí)現(xiàn)自動(dòng)化登陸的相關(guān)知識(shí)點(diǎn)內(nèi)容,需要的朋友們學(xué)習(xí)下。2019-08-08SpringBoot+Spring?Data?JPA整合H2數(shù)據(jù)庫(kù)的示例代碼
H2數(shù)據(jù)庫(kù)是一個(gè)開(kāi)源的關(guān)系型數(shù)據(jù)庫(kù),本文重點(diǎn)給大家介紹SpringBoot+Spring?Data?JPA整合H2數(shù)據(jù)庫(kù)的示例代碼,感興趣的朋友跟隨小編一起看看吧2022-02-02分布式系統(tǒng)下調(diào)用鏈追蹤技術(shù)面試題
這篇文章主要為大家介紹了分布式系統(tǒng)下調(diào)用鏈追蹤技術(shù)面試問(wèn)題合集,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-03-03在SpringBoot中,如何使用Netty實(shí)現(xiàn)遠(yuǎn)程調(diào)用方法總結(jié)
我們?cè)谶M(jìn)行網(wǎng)絡(luò)連接的時(shí)候,建立套接字連接是一個(gè)非常消耗性能的事情,特別是在分布式的情況下,用線程池去保持多個(gè)客戶端連接,是一種非常消耗線程的行為.那么我們?cè)撏ㄟ^(guò)什么技術(shù)去解決上述的問(wèn)題呢,那么就不得不提一個(gè)網(wǎng)絡(luò)連接的利器——Netty,需要的朋友可以參考下2021-06-06