Kafka在客戶端實現(xiàn)消息的發(fā)送與讀取
1.創(chuàng)建Maven工程
引入kafka相關依賴,POM文件如下:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
2.生產(chǎn)者API(Producer API)
2.1 生成者處理流程
- Producer創(chuàng)建時,會創(chuàng)建一個Sender線程并設置為守護線程。
- 生產(chǎn)消息時,內部其實是異步流程;生產(chǎn)的消息先經(jīng)過攔截器->序列化器->**分區(qū)器,**然后將消 息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時創(chuàng)建)。
- 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達到batch.size或者linger.ms達到上限,哪個先達到就算 哪個。
- 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失 敗原因允許重試,那么客戶端內部會對該消息進行重試。
- 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者。
- 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調返回。
2.2 常用參數(shù)介紹
生產(chǎn)者主要的對象有: KafkaProducer , ProducerRecord 。
- KafkaProducer 是用于發(fā)送消息的類;
- ProducerRecord 類用于封裝Kafka的消息。
KafkaProducer 的實例化需要指定的參數(shù),Producer的參數(shù)定義在 org.apache.kafka.clients.producer.ProducerConfig類中。
常用參數(shù)說明如下:
- bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個broker的地址,而不 是全部,當生產(chǎn)者連接上此處指定的broker之后,在通過該連接發(fā)現(xiàn)集群 中的其他節(jié)點。
- key.serializer: 要發(fā)送信息的key數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。
- **value.serializer:**要發(fā)送消息的alue數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。
acks: 默認值:all。
acks=0: 生產(chǎn)者不等待broker對消息的確認,只要將消息放到緩沖區(qū),就認為消息 已經(jīng)發(fā)送完成。 該情形不能保證broker是否真的收到了消息,retries配置也不會生效。發(fā) 送的消息的返回的消息偏移量永遠是-1
acks=1 表示消息只需要寫到主分區(qū)即可,然后就響應客戶端,而不等待副本分區(qū)的 確認。 在該情形下,如果主分區(qū)收到消息確認之后就宕機了,而副本分區(qū)還沒來得 及同步該消息,則該消息丟失。
acks=all 首領分區(qū)會等待所有的ISR副本分區(qū)確認記錄。 該處理保證了只要有一個ISR副本分區(qū)存活,消息就不會丟失。 這是Kafka最強的可靠性保證,等效于 acks=-1
- retries: 設置該屬性為一個大于1的值,將在消息發(fā)送失敗的時候重新發(fā)送消 息。該重試與客戶端收到異常重新發(fā)送并無二至。允許重試但是不設 置參數(shù)max.in.flight.requests.per.connection為1,存在消息亂序 的可能,因為如果兩個批次發(fā)送到同一個分區(qū),第一個失敗了重試, 第二個成功了,則第一個消息批在第二個消息批后。int類型的值,默 認:0,可選值:[0,…,2147483647
- compression.type: 生產(chǎn)者生成數(shù)據(jù)的壓縮格式。默認是none(沒有壓縮)。允許的 值:none,gzip,snappy和lz4。壓縮是對整個消息批次來講 的。消息批的效率也影響壓縮的比例。消息批越大,壓縮效率越好。默認是none。
2.3 生產(chǎn)者代碼
package com.warybee; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; /** * @author joy */ public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 設置連接Kafka的初始連接用到的服務器地址 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); // 設置key的序列化類 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); // 設置value的序列化類 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.ACKS_CONFIG,"all"); KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs); //發(fā)送100條消息 for (int i = 0; i < 100; i++) { ProducerRecord<Integer,String> producerRecord=new ProducerRecord<> ( "test_topic_1", 0, i, "test topic msg "+i); //消息的異步確認 kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception==null){ System.out.println("消息的主題:"+recordMetadata.topic()); System.out.println("消息的分區(qū):"+recordMetadata.partition()); System.out.println("消息的偏移量:"+recordMetadata.offset()); }else { System.out.println("發(fā)送消息異常"); } } }); } // 關閉生產(chǎn)者 kafkaProducer.close(); } }
3.消費者API(Consumer API)
3.1 常用參數(shù)介紹
KafkaConsumer 的實例化需要指定的參數(shù),Consumer的參數(shù)定義在 org.apache.kafka.clients.consumer.ConsumerConfig類中。
常用參數(shù)說明如下:
- bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個broker的地址,而不 是全部,當生產(chǎn)者連接上此處指定的broker之后,在通過該連接發(fā)現(xiàn)集群 中的其他節(jié)點。
- key.deserializer: key數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
- value.deserializer: Value數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
- group.id: 消費組ID,用于指定當前消費者屬于哪個消費組。
- auto.offset.reset: 當kafka中沒有偏移量或者當前偏移量在服務器中不存在時,kafka該如何處理?參數(shù)值如下:
- earliest: automatically reset the offset to the earliest offset(自動重置偏移量到最早的偏移量)
- latest: automatically reset the offset to the latest offset(自動重置偏移量為最新的)
- none: throw exception to the consumer if no previous offset is found for the consumer’s group(如果消費組上一個偏移量不存在,向consumer 拋出異常)
- anything: throw exception to the consumer.(向consumer 拋出異常)
- client.id : 消費消息的時候向服務器發(fā)送的id字符串。在ip/port基礎上 提供應用的邏輯名稱,記錄在服務端的請求日志中,用于追蹤請求的源。
- enable.auto.commit : 如果設置為true,消費者會自動周期性地向服務器提交偏移量。
3.2 消費者與消費組概念介紹
每一個Consumer屬于一個特定的Consumer Group,消費者可以通過指定group.id,來確定其所在消費組。
group_id一般設置為應用的邏輯名稱。比如多個訂單處理程序組成一個消費組,可以設置group_id 為"order_process"。
消費組均衡地給消費者分配分區(qū),每個分區(qū)只由消費組中一個消費者消費
一個擁有四個分區(qū)的主題,包含一個消費者的消費組。此時,消費組中的消費者消費主題中的所有分區(qū)。
如果在消費組中添加一個消費者2,則每個消費者分別從兩個分區(qū)接收消息。
如果消費組有四個消費者,則每個消費者可以分配到一個分區(qū)
如果向消費組中添加更多的消費者,超過主題分區(qū)數(shù)量,則有一部分消費者就會閑置,不會接收任 何消息
向消費組添加消費者是橫向擴展消費能力的主要方式。
3.3 消費者代碼
package com.warybee; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * @author joy */ public class KafkaConsumerDemo { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // 設置連接Kafka的初始連接用到的服務器地址 // 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); //KEY反序列化類 configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //value反序列化類 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //創(chuàng)建消費者對象 KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs); List<String> topics = new ArrayList<>(); topics.add("test_topic_1"); //消費者訂閱主題 consumer.subscribe(topics); while (true){ //批量拉取主題消息,每3秒拉取一次 ConsumerRecords<Integer, String> records = consumer.poll(3000); //變量消息 for (ConsumerRecord<Integer, String> record : records) { System.out.println("主題:"+record.topic() + "\t" +"分區(qū):" + record.partition() + "\t" +"偏移量:" + + record.offset() + "\t" +"Key:"+ record.key() + "\t" +"Value:"+ record.value()); } } } }
依次運行生產(chǎn)者和消費者??刂婆_可以看到消費者接收到的消息
4. 客戶端鏈接異常信息處理
如果運行代碼過程中,java客戶端連接出現(xiàn)Connection refused: no further information錯誤:
java.net.ConnectException: Connection refused: no further information ............................省略其他錯誤信息...................
修改 ${KAFKA_HOME}/config/server.properties
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 listeners = PLAINTEXT://localhost:9092
將localhost修改為kafka所在服務器的IP地址即可。如果java程序和kafka在同一個服務器上,則不需要修改。
到此這篇關于Kafka在客戶端實現(xiàn)消息的發(fā)送與讀取的文章就介紹到這了,更多相關Kafka消息發(fā)送與讀取內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java線程池ThreadPoolExecutor類使用小結
這篇文章主要介紹了java線程池ThreadPoolExecutor類使用,本文主要對ThreadPoolExecutor的使用方法進行一個詳細的概述,示例代碼介紹了ThreadPoolExecutor的構造函數(shù)的相關知識,感興趣的朋友一起看看吧2022-03-03使用SpringBoot簡單實現(xiàn)無感知的刷新 Token功能
實現(xiàn)無感知的刷新 Token 是一種提升用戶體驗的常用技術,可以在用戶使用應用時自動更新 Token,無需用戶手動干預,這種技術在需要長時間保持用戶登錄狀態(tài)的應用中非常有用,以下是使用Spring Boot實現(xiàn)無感知刷新Token的一個場景案例和相應的示例代碼2024-09-09springboot通過spel結合aop實現(xiàn)動態(tài)傳參的案例
SpEl 是Spring框架中的一個利器,Spring通過SpEl能在運行時構建復雜表達式、存取對象屬性、對象方法調用等,今天通過本文給大家介紹springboot?spel結合aop實現(xiàn)動態(tài)傳參,需要的朋友可以參考下2022-07-07