欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka中Producer和Consumer的作用詳解

 更新時(shí)間:2023年12月05日 09:45:59   作者:楊熒  
這篇文章主要介紹了Kafka中Producer和Consumer的作用詳解,Kafka是一個(gè)分布式的流處理平臺(tái),它的核心是消息系統(tǒng),Producer是Kafka中用來將消息發(fā)送到Broker的組件之一,它將消息發(fā)布到主題,并且負(fù)責(zé)按照指定的分區(qū)策略將消息分配到對(duì)應(yīng)的分區(qū)中,需要的朋友可以參考下

一、Producer

Kafka是一個(gè)分布式的流處理平臺(tái),它的核心是消息系統(tǒng)。Producer是Kafka中用來將消息發(fā)送到Broker的組件之一。它將消息發(fā)布到主題(topic),并且負(fù)責(zé)按照指定的分區(qū)策略將消息分配到對(duì)應(yīng)的分區(qū)中。

下面是使用Java語言編寫的Kafka Producer示例代碼:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("acks", "all"); // 所有副本都響應(yīng)了才認(rèn)為發(fā)送成功
        props.put("retries", 0); // 發(fā)送失敗時(shí)重試次數(shù)
        props.put("batch.size", 16384); // 緩沖區(qū)大小
        props.put("linger.ms", 1); // 延遲1ms發(fā)送以便等待更多的消息
        props.put("buffer.memory", 33554432); // 緩存總量
        // key和value序列化方式,這里使用默認(rèn)的StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("test_topic", Integer.toString(i), "hello world" + i));
        }
        producer.close();
    }
}

上述代碼中,我們先設(shè)置了Kafka集群地址、消息確認(rèn)方式等參數(shù)。

然后使用這些參數(shù)創(chuàng)建一個(gè)KafkaProducer實(shí)例,并通過send方法發(fā)送消息到指定的主題。

在這個(gè)例子中,我們將10條帶有字符串"hello world"的消息發(fā)送到名為"test_topic"的主題中。最后別忘了關(guān)閉producer連接。

二、Consumer

Kafka是一個(gè)分布式流媒體平臺(tái),其中Consumer是Kafka中消費(fèi)數(shù)據(jù)的組件之一。

Kafka Consumer可以訂閱一個(gè)或多個(gè)Topic,并從這些Topic中消費(fèi)消息。

Kafka Consumer可以以不同的方式處理消息,例如將其寫入到數(shù)據(jù)庫、打印出來或進(jìn)行其他自定義處理。

Kafka Consumer使用一組API來與Kafka Broker通信,并接收Broker返回的數(shù)據(jù)。

在接收到數(shù)據(jù)后,Consumer會(huì)將其提交給應(yīng)用程序,由應(yīng)用程序進(jìn)一步處理。

以下是一個(gè)使用Java編寫的Kafka Consumer樣例代碼:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key=%s value=%s%n", record.key(), record.value());
            }
        }
    }
}

在這個(gè)樣例代碼中,我們首先創(chuàng)建了一個(gè)Properties對(duì)象,其中包含連接Kafka Broker所需的配置信息。

然后,我們創(chuàng)建了一個(gè)Kafka Consumer實(shí)例,并訂閱了名為“test-topic”的Topic。

最后,在while循環(huán)中,我們使用poll()方法從Broker獲取消息,并在控制臺(tái)上打印出每條消息的鍵和值。

三、Producer和Consumer有什么作用?

Kafka是一個(gè)分布式的消息隊(duì)列系統(tǒng),Producer和Consumer都是Kafka中的核心組件之一。

Producer負(fù)責(zé)向Kafka集群發(fā)送消息,將消息發(fā)布到一個(gè)或多個(gè)主題(topic)中。Producer可以選擇在消息發(fā)送成功后等待確認(rèn)(ack)或不等待,在等待確認(rèn)時(shí)會(huì)阻塞,直到收到Broker返回的確認(rèn)信息。

而Consumer則是從Kafka集群消費(fèi)消息,并且訂閱一個(gè)或多個(gè)主題。每個(gè)Consumer在消費(fèi)消息時(shí)都有自己獨(dú)立的offset(偏移量),用來標(biāo)識(shí)該Consumer已經(jīng)消費(fèi)到哪個(gè)位置。消費(fèi)者可以隨時(shí)停止消費(fèi)或重新開始消費(fèi),而不影響其他Consumer的消費(fèi)進(jìn)度。

總體來說,Producer和Consumer的作用是實(shí)現(xiàn)了消息的生產(chǎn)和消費(fèi),幫助用戶構(gòu)建高可靠、高性能的消息處理系統(tǒng)。

到此這篇關(guān)于Kafka中Producer和Consumer的作用詳解的文章就介紹到這了,更多相關(guān)Producer和Consumer的作用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java實(shí)現(xiàn)簡(jiǎn)單樹結(jié)構(gòu)

    Java實(shí)現(xiàn)簡(jiǎn)單樹結(jié)構(gòu)

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單樹結(jié)構(gòu)的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-01-01
  • 實(shí)現(xiàn)一個(gè)規(guī)則引擎的可視化具體方案

    實(shí)現(xiàn)一個(gè)規(guī)則引擎的可視化具體方案

    項(xiàng)目原因需要用到規(guī)則引擎,但是發(fā)現(xiàn)大部分不可以自由的進(jìn)行規(guī)則定義,通過不斷嘗試變換關(guān)鍵字在搜索引擎搜索,最終在stackoverflow找到了一個(gè)探討這個(gè)問題的帖子,特此將帖子中提到的方案分享一下,如果你跟我一樣在研究同樣的問題,也許對(duì)你有用
    2021-04-04
  • java?freemarker實(shí)現(xiàn)動(dòng)態(tài)生成excel文件

    java?freemarker實(shí)現(xiàn)動(dòng)態(tài)生成excel文件

    這篇文章主要為大家詳細(xì)介紹了java如何通過freemarker實(shí)現(xiàn)動(dòng)態(tài)生成excel文件,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-12-12
  • JAVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例

    JAVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例

    下面小編就為大家?guī)硪黄狫AVA利用HttpClient進(jìn)行POST請(qǐng)求(HTTPS)實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起 小編過來看看吧
    2016-11-11
  • Java中八大包裝類舉例詳解(通俗易懂)

    Java中八大包裝類舉例詳解(通俗易懂)

    這篇文章主要介紹了Java中的包裝類,包括它們的作用、特點(diǎn)、用途以及如何進(jìn)行裝箱和拆箱,包裝類還提供了許多實(shí)用方法,如轉(zhuǎn)換、獲取基本類型值、比較和類型檢測(cè),文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-02-02
  • Java核心技術(shù)之反射

    Java核心技術(shù)之反射

    本文非常詳細(xì)的講解了java反射的相關(guān)資料,java反射在現(xiàn)今的使用中很頻繁,希望此文可以幫大家解答疑惑,可以幫助大家理解
    2021-11-11
  • Mybatis Plus使用條件構(gòu)造器增刪改查功能的實(shí)現(xiàn)方法

    Mybatis Plus使用條件構(gòu)造器增刪改查功能的實(shí)現(xiàn)方法

    這篇文章主要介紹了Mybatis-Plus使用條件構(gòu)造器增刪改查,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-05-05
  • Java初學(xué)者之五子棋游戲?qū)崿F(xiàn)教程

    Java初學(xué)者之五子棋游戲?qū)崿F(xiàn)教程

    這篇文章主要為大家詳細(xì)介紹了Java初學(xué)者之五子棋游戲?qū)崿F(xiàn)教程,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-10-10
  • SpringBoot如何配置文件給bean賦值問題

    SpringBoot如何配置文件給bean賦值問題

    這篇文章主要介紹了SpringBoot如何配置文件給bean賦值問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Spring MVC InitBinder驗(yàn)證方法

    Spring MVC InitBinder驗(yàn)證方法

    這篇文章主要介紹了Spring MVC InitBinder驗(yàn)證方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2018-03-03

最新評(píng)論