Springboot整合kafka的示例代碼
1. 整合kafka
1、引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、設(shè)置yml文件
spring: application: name: demo kafka: bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904 producer: # producer 生產(chǎn)者 retries: 0 # 重試次數(shù) acks: 1 # 應(yīng)答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(可選0、1、all/-1) batch-size: 16384 # 批量大小 buffer-memory: 33554432 # 生產(chǎn)端緩沖區(qū)大小 key-serializer: org.apache.kafka.common.serialization.StringSerializer # value-serializer: com.itheima.demo.config.MySerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # consumer消費者 group-id: javagroup # 默認的消費組ID enable-auto-commit: true # 是否自動提交offset auto-commit-interval: 100 # 提交offset延時(接收到消息后多久提交offset) # earliest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 # latest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù) # none:topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value-deserializer: com.itheima.demo.config.MyDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、啟動項目
2. 消息發(fā)送
2.1 發(fā)送類型
KafkaTemplate調(diào)用send時默認采用異步發(fā)送,如果需要同步獲取發(fā)送結(jié)果,調(diào)用get方法
異步發(fā)送生產(chǎn)者:
@RestController public class KafkaProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/kafka/test/{msg}") public void sendMessage(@PathVariable("msg") String msg) { Message message = new Message(); message.setMessage(msg); kafkaTemplate.send("test", JSON.toJSONString(message)); } }
同步發(fā)送生產(chǎn)者:
//測試同步發(fā)送與監(jiān)聽 @RestController public class AsyncProducer { private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class); @Resource private KafkaTemplate<String, Object> kafkaTemplate; //同步發(fā)送 @GetMapping("/kafka/sync/{msg}") public void sync(@PathVariable("msg") String msg) throws Exception { Message message = new Message(); message.setMessage(msg); ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message)); //注意,可以設(shè)置等待時間,超出后,不再等候結(jié)果 SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS); logger.info("send result:{}",result.getProducerRecord().value()); } }
消費者:
@Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); //不指定group,默認取yml里配置的 @KafkaListener(topics = {"test"}) public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("message:{}", msg); } } }
那么我們怎么看出來同步發(fā)送和異步發(fā)送的區(qū)別呢?
①首先在服務(wù)器上,將kafka暫停服務(wù)。
②在swagger發(fā)送消息
調(diào)同步發(fā)送:請求被阻斷,一直等待,超時后返回錯誤
而調(diào)異步發(fā)送的(默認發(fā)送接口),請求立刻返回。
那么,異步發(fā)送的消息怎么確認發(fā)送情況呢?
我們使用注冊監(jiān)聽
即新建一個類:KafkaListener.java
@Configuration public class KafkaListener { private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class); @Autowired KafkaTemplate kafkaTemplate; //配置監(jiān)聽 @PostConstruct private void listener() { kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() { @Override public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) { logger.info("ok,message={}", producerRecord.value()); } public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) { logger.error("error!message={}", producerRecord.value()); }); } }
查看控制臺,等待一段時間后,異步發(fā)送失敗的消息會被回調(diào)給注冊過的listener
如果是正常發(fā)送異步消息,則會獲得該消息??梢钥吹剑趦?nèi)部類 KafkaListener$1 中,即注冊的Listener的消息。
2.2 序列化
消費者使用:KafkaConsumer.java
@Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); //不指定group,默認取yml里配置的 @KafkaListener(topics = {"test"}) public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("message:{}", msg); } } }
1)序列化詳解
- 前面用到的是Kafka自帶的字符串序列化器(
org.apache.kafka.common.serialization.StringSerializer
) - 除此之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
- 這些序列化器都實現(xiàn)了接口(
org.apache.kafka.common.serialization.Serializer
) - 基本上,可以滿足絕大多數(shù)場景
2)自定義序列化
自己實現(xiàn),實現(xiàn)對應(yīng)的接口即可,有以下方法:
public interface Serializer<T> extends Closeable { default void configure(Map<String, ?> configs, Boolean isKey) { } //理論上,只實現(xiàn)這個即可正常運行 byte[] serialize(String var1, T var2); //默認調(diào)上面的方法 default byte[] serialize(String topic, Headers headers, T data) { return this.serialize(topic, data); } default void close() { } }
我們來自己實現(xiàn)一個序列化器:MySerializer.java
public class MySerializer implements Serializer { @Override public byte[] serialize(String s, Object o) { String json = JSON.toJSONString(o); return json.getBytes(); } }
3)解碼MyDeserializer.java
,實現(xiàn)方式與編碼器幾乎一樣.
public class MyDeserializer implements Deserializer { private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class); @Override public Object deserialize(String s, byte[] bytes) { try { String json = new String(bytes,"utf-8"); return JSON.parse(json); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } }
4)在yaml中配置自己的編碼器、解碼器
再次收發(fā),消息正常
2.3 分區(qū)策略
分區(qū)策略決定了消息根據(jù)key投放到哪個分區(qū),也是順序消費保障的基石。
- 給定了分區(qū)號,直接將數(shù)據(jù)發(fā)送到指定的分區(qū)里面去
- 沒有給定分區(qū)號,給定數(shù)據(jù)的key值,通過key取上hashCode進行分區(qū)
- 既沒有給定分區(qū)號,也沒有給定key值,直接輪循進行分區(qū)(默認)
- 自定義分區(qū),你想怎么做就怎么做
1)驗證默認分區(qū)規(guī)則
發(fā)送者代碼參考:PartitionProducer.java
//測試分區(qū)發(fā)送 @RestController public class PartitionProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; // 指定分區(qū)發(fā)送 // 不管你key是什么,到同一個分區(qū) @GetMapping("/kafka/partitionSend/{key}") public void setPartition(@PathVariable("key") String key) { kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0號分區(qū)"); } // 指定key發(fā)送,不指定分區(qū) // 根據(jù)key做hash,相同的key到同一個分區(qū) @GetMapping("/kafka/keysend/{key}") public void setKey(@PathVariable("key") String key) { kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分區(qū)"); }
消費者代碼使用:PartitionConsumer.java
@Component public class PartitionConsumer { private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class); //分區(qū)消費 @KafkaListener(topics = {"test"},topicPattern = "0") public void onMessage(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("partition=0,message:[{}]", msg); } } @KafkaListener(topics = {"test"},topicPattern = "1") public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("partition=1,message:[{}]", msg); } } }
通過swagger訪問setKey(也就是只給了key的方法):
可以看到key相同的被hash到了同一個分區(qū)
再訪問setPartition來設(shè)置分區(qū)號0來發(fā)送:
可以看到無論key是什么,都是分區(qū)0來消費
2)自定義分區(qū)
參考代碼:MyPartitioner.java , MyPartitionTemplate.java。
發(fā)送使用:MyPartitionProducer.java。
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 定義自己的分區(qū)策略 // 如果key以0開頭,發(fā)到0號分區(qū) // 其他都扔到1號分區(qū) String keyStr = key+""; if (keyStr.startsWith("0")){ return 0; }else { return 1; } } public void close() { public void configure(Map<String, ?> map) { }
@Configuration public class MyPartitionTemplate { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; KafkaTemplate kafkaTemplate; @PostConstruct public void setKafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //注意分區(qū)器在這里?。?! props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class); this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props)); } public KafkaTemplate getKafkaTemplate(){ return kafkaTemplate; }
//測試自定義分區(qū)發(fā)送 @RestController public class MyPartitionProducer { @Autowired MyPartitionTemplate template; // 使用0開頭和其他任意字母開頭的key發(fā)送消息 // 看控制臺的輸出,在哪個分區(qū)里? @GetMapping("/kafka/myPartitionSend/{key}") public void setPartition(@PathVariable("key") String key) { template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定義分區(qū)策略"); } }
使用swagger,發(fā)送0開頭和非0開頭兩種key
3. 消息消費
3.1 消息組別
發(fā)送者使用:KafkaProducer.java
@RestController public class KafkaProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/kafka/test/{msg}") public void sendMessage(@PathVariable("msg") String msg) { Message message = new Message(); message.setMessage(msg); kafkaTemplate.send("test", JSON.toJSONString(message)); } }
1)代碼參考:GroupConsumer.java,Listener拷貝3份,分別賦予兩組group,驗證分組消費:
//測試組消費 @Component public class GroupConsumer { private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class); //組1,消費者1 @KafkaListener(topics = {"test"},groupId = "group1") public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("group:group1-1 , message:{}", msg); } } //組1,消費者2 public void onMessage2(ConsumerRecord<?, ?> consumerRecord) { logger.info("group:group1-2 , message:{}", msg); //組2,只有一個消費者 @KafkaListener(topics = {"test"},groupId = "group2") public void onMessage3(ConsumerRecord<?, ?> consumerRecord) { logger.info("group:group2 , message:{}", msg); }
2)啟動
3)通過swagger發(fā)送2條消息
- 同一group下的兩個消費者,在group1均分消息
- group2下只有一個消費者,得到全部消息
4)消費端閑置
注意分區(qū)數(shù)與消費者數(shù)的搭配,如果 ( 消費者數(shù) > 分區(qū)數(shù)量 ),將會出現(xiàn)消費者閑置(因為一個分區(qū)只能分配給一個消費者),浪費資源!
驗證方式:
停掉項目,刪掉test主題,重新建一個 ,這次只給它分配一個分區(qū)。
重新發(fā)送兩條消息,試一試
- group2可以消費到1、2兩條消息
- group1下有兩個消費者,但是只分配給了 1 , 2這個進程被閑置
3.2 位移提交
1)自動提交
前面的案例中,我們設(shè)置了以下兩個選項,則kafka會按延時設(shè)置自動提交
enable-auto-commit: true # 是否自動提交offset auto-commit-interval: 100 # 提交offset延時(接收到消息后多久提交offset,默認單位為ms)
2)手動提交
有些時候,我們需要手動控制偏移量的提交時機,比如確保消息嚴格消費后再提交,以防止丟失或重復(fù)。
下面我們自己定義配置,覆蓋上面的參數(shù)
代碼參考:MyOffsetConfig.java
@Configuration public class MyOffsetConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 注意這里!??!設(shè)置手動提交 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps)); // ack模式: // AckMode針對ENABLE_AUTO_COMMIT_CONFIG=false時生效,有以下幾種: // // RECORD // 每處理一條commit一次 // BATCH(默認) // 每次poll的時候批量提交一次,頻率取決于每次poll的調(diào)用頻率 // TIME // 每次間隔ackTime的時間去commit(跟auto commit interval有什么區(qū)別呢?) // COUNT // 累積達到ackCount次的ack去commit // COUNT_TIME // ackTime或ackCount哪個條件先滿足,就commit // MANUAL // listener負責ack,但是背后也是批量上去 // MANUAL_IMMEDIATE // listner負責ack,每調(diào)用一次,就立即commit factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
然后通過在消費端的Consumer來提交偏移量
MyOffsetConsumer:
@Component public class MyOffsetConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory") public void manualCommit(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer, Acknowledgment ack) { logger.info("手動提交偏移量 , partition={}, msg={}", partition, message); // 同步提交 consumer.commitSync(); //異步提交 //consumer.commitAsync(); // ack提交也可以,會按設(shè)置的ack策略走(參考MyOffsetConfig.java里的ack模式) // ack.acknowledge(); } @KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory") public void noCommit(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer, Acknowledgment ack) { logger.info("忘記提交偏移量, partition={}, msg={}", partition, message); // 不做commit! /** * 現(xiàn)實狀況: * commitSync和commitAsync組合使用 * <p> * 手工提交異步 consumer.commitAsync(); * 手工同步提交 consumer.commitSync() * commitSync()方法提交最后一個偏移量。在成功提交或碰到無怯恢復(fù)的錯誤之前, * commitSync()會一直重試,但是commitAsync()不會。 * 一般情況下,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大問題 * 因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。 * 但如果這是發(fā)生在關(guān)閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。否則就會造成重復(fù)消費 * 因此,在消費者關(guān)閉前一般會組合使用commitAsync()和commitSync()。 */ // @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory") public void manualOffset(@Payload String message, try { logger.info("同步異步搭配 , partition={}, msg={}", partition, message); //先異步提交 consumer.commitAsync(); //繼續(xù)做別的事 } catch (Exception e) { System.out.println("commit failed"); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } * 甚至可以手動提交,指定任意位置的偏移量 * 不推薦日常使用?。?! // @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory") public void offset(ConsumerRecord record, Consumer consumer) { logger.info("手動指定任意偏移量, partition={}, msg={}", record.partition(), record); Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); currentOffset.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); consumer.commitSync(currentOffset); }
3)重復(fù)消費問題
如果手動提交模式被打開,一定不要忘記提交偏移量。否則會造成重復(fù)消費!
用km將test主題刪除,新建一個test空主題。方便觀察消息偏移 注釋掉其他Consumer的Component注解,只保留當前MyOffsetConsumer.java 啟動項目,使用swagger的KafkaProducer發(fā)送連續(xù)幾條消息 留心控制臺,都能消費,沒問題:
但是!重啟項目:
無論重啟多少次,不提交偏移量的消費組,會重復(fù)消費一遍?。。?/p>
再通過命令行查詢偏移量
4)經(jīng)驗與總結(jié)
commitSync()方法,即同步提交,會提交最后一個偏移量。在成功提交或碰到無怯恢復(fù)的錯誤之前,commitSync()會一直重試,但是commitAsync()不會。
這就造成一個陷阱:
如果異步提交,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。只要成功一次,偏移量就會提交上去。
但是!如果這是發(fā)生在關(guān)閉消費者時的最后一次提交,就要確保能夠提交成功,如果還沒提交完就停掉了進程。就會造成重復(fù)消費!
因此,在消費者關(guān)閉前一般會組合使用commitAsync()和commitSync()。
詳細代碼參考:MyOffsetConsumer.manualOffset()
到此這篇關(guān)于Springboot整合kafka的文章就介紹到這了,更多相關(guān)Springboot整合kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于Mybatis與JPA的優(yōu)缺點說明
這篇文章主要介紹了關(guān)于Mybatis與JPA的優(yōu)缺點說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06SpringCloud用Zookeeper搭建配置中心的方法
本篇文章主要介紹了SpringCloud用Zookeeper搭建配置中心的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-04-04詳談java中File類getPath()、getAbsolutePath()、getCanonical的區(qū)別
下面小編就為大家?guī)硪黄斦刯ava中File類getPath()、getAbsolutePath()、getCanonical的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07