Kafka中Producer和Consumer的作用詳解
一、Producer
Kafka是一個(gè)分布式的流處理平臺(tái),它的核心是消息系統(tǒng)。Producer是Kafka中用來將消息發(fā)送到Broker的組件之一。它將消息發(fā)布到主題(topic),并且負(fù)責(zé)按照指定的分區(qū)策略將消息分配到對(duì)應(yīng)的分區(qū)中。
下面是使用Java語言編寫的Kafka Producer示例代碼:
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class MyKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址 props.put("acks", "all"); // 所有副本都響應(yīng)了才認(rèn)為發(fā)送成功 props.put("retries", 0); // 發(fā)送失敗時(shí)重試次數(shù) props.put("batch.size", 16384); // 緩沖區(qū)大小 props.put("linger.ms", 1); // 延遲1ms發(fā)送以便等待更多的消息 props.put("buffer.memory", 33554432); // 緩存總量 // key和value序列化方式,這里使用默認(rèn)的StringSerializer props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("test_topic", Integer.toString(i), "hello world" + i)); } producer.close(); } }
上述代碼中,我們先設(shè)置了Kafka集群地址、消息確認(rèn)方式等參數(shù)。
然后使用這些參數(shù)創(chuàng)建一個(gè)KafkaProducer實(shí)例,并通過send方法發(fā)送消息到指定的主題。
在這個(gè)例子中,我們將10條帶有字符串"hello world"的消息發(fā)送到名為"test_topic"的主題中。最后別忘了關(guān)閉producer連接。
二、Consumer
Kafka是一個(gè)分布式流媒體平臺(tái),其中Consumer是Kafka中消費(fèi)數(shù)據(jù)的組件之一。
Kafka Consumer可以訂閱一個(gè)或多個(gè)Topic,并從這些Topic中消費(fèi)消息。
Kafka Consumer可以以不同的方式處理消息,例如將其寫入到數(shù)據(jù)庫、打印出來或進(jìn)行其他自定義處理。
Kafka Consumer使用一組API來與Kafka Broker通信,并接收Broker返回的數(shù)據(jù)。
在接收到數(shù)據(jù)后,Consumer會(huì)將其提交給應(yīng)用程序,由應(yīng)用程序進(jìn)一步處理。
以下是一個(gè)使用Java編寫的Kafka Consumer樣例代碼:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Received message: key=%s value=%s%n", record.key(), record.value()); } } } }
在這個(gè)樣例代碼中,我們首先創(chuàng)建了一個(gè)Properties對(duì)象,其中包含連接Kafka Broker所需的配置信息。
然后,我們創(chuàng)建了一個(gè)Kafka Consumer實(shí)例,并訂閱了名為“test-topic”的Topic。
最后,在while循環(huán)中,我們使用poll()方法從Broker獲取消息,并在控制臺(tái)上打印出每條消息的鍵和值。
三、Producer和Consumer有什么作用?
Kafka是一個(gè)分布式的消息隊(duì)列系統(tǒng),Producer和Consumer都是Kafka中的核心組件之一。
Producer負(fù)責(zé)向Kafka集群發(fā)送消息,將消息發(fā)布到一個(gè)或多個(gè)主題(topic)中。Producer可以選擇在消息發(fā)送成功后等待確認(rèn)(ack)或不等待,在等待確認(rèn)時(shí)會(huì)阻塞,直到收到Broker返回的確認(rèn)信息。
而Consumer則是從Kafka集群消費(fèi)消息,并且訂閱一個(gè)或多個(gè)主題。每個(gè)Consumer在消費(fèi)消息時(shí)都有自己獨(dú)立的offset(偏移量),用來標(biāo)識(shí)該Consumer已經(jīng)消費(fèi)到哪個(gè)位置。消費(fèi)者可以隨時(shí)停止消費(fèi)或重新開始消費(fèi),而不影響其他Consumer的消費(fèi)進(jìn)度。
總體來說,Producer和Consumer的作用是實(shí)現(xiàn)了消息的生產(chǎn)和消費(fèi),幫助用戶構(gòu)建高可靠、高性能的消息處理系統(tǒng)。
到此這篇關(guān)于Kafka中Producer和Consumer的作用詳解的文章就介紹到這了,更多相關(guān)Producer和Consumer的作用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)簡(jiǎn)單樹結(jié)構(gòu)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單樹結(jié)構(gòu)的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-01-01實(shí)現(xiàn)一個(gè)規(guī)則引擎的可視化具體方案
項(xiàng)目原因需要用到規(guī)則引擎,但是發(fā)現(xiàn)大部分不可以自由的進(jìn)行規(guī)則定義,通過不斷嘗試變換關(guān)鍵字在搜索引擎搜索,最終在stackoverflow找到了一個(gè)探討這個(gè)問題的帖子,特此將帖子中提到的方案分享一下,如果你跟我一樣在研究同樣的問題,也許對(duì)你有用2021-04-04java?freemarker實(shí)現(xiàn)動(dòng)態(tài)生成excel文件
這篇文章主要為大家詳細(xì)介紹了java如何通過freemarker實(shí)現(xiàn)動(dòng)態(tài)生成excel文件,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-12-12JAVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例
下面小編就為大家?guī)硪黄狫AVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起 小編過來看看吧2016-11-11Mybatis Plus使用條件構(gòu)造器增刪改查功能的實(shí)現(xiàn)方法
這篇文章主要介紹了Mybatis-Plus使用條件構(gòu)造器增刪改查,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-05-05Java初學(xué)者之五子棋游戲?qū)崿F(xiàn)教程
這篇文章主要為大家詳細(xì)介紹了Java初學(xué)者之五子棋游戲?qū)崿F(xiàn)教程,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10Spring MVC InitBinder驗(yàn)證方法
這篇文章主要介紹了Spring MVC InitBinder驗(yàn)證方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-03-03