欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java分布式學(xué)習(xí)之Kafka消息隊(duì)列

 更新時(shí)間:2022年07月28日 11:33:13   作者:kaico2018  
Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái),由Scala和Java編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)

介紹

Apache Kafka 是分布式發(fā)布-訂閱消息系統(tǒng),在 kafka官網(wǎng)上對(duì) kafka 的定義:一個(gè)分布式發(fā)布-訂閱消息傳遞系統(tǒng)。 它最初由LinkedIn公司開發(fā),Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開源項(xiàng)目。Kafka是一種快速、可擴(kuò)展的、設(shè)計(jì)內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。

注意:Kafka并沒有遵循JMS規(guī)范(),它只提供了發(fā)布和訂閱通訊方式。

kafka中文官網(wǎng):http://kafka.apachecn.org/quickstart.html

Kafka核心相關(guān)名稱

  1. Broker:Kafka節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker,多個(gè)broker可以組成一個(gè)Kafka集群
  2. Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時(shí)負(fù)責(zé)多個(gè)topic的分發(fā)
  3. massage: Kafka中最基本的傳遞對(duì)象。
  4. Partition:topic物理上的分組,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。Kafka里面實(shí)現(xiàn)分區(qū),一個(gè)broker就是表示一個(gè)區(qū)域。
  5. Segment:partition物理上由多個(gè)segment組成,每個(gè)Segment存著message信息
  6. Producer : 生產(chǎn)者,生產(chǎn)message發(fā)送到topic
  7. Consumer : 消費(fèi)者,訂閱topic并消費(fèi)message, consumer作為一個(gè)線程來消費(fèi)
  8. Consumer Group:消費(fèi)者組,一個(gè)Consumer Group包含多個(gè)consumer
  9. Offset:偏移量,理解為消息 partition 中消息的索引位置

主題和隊(duì)列的區(qū)別:

隊(duì)列是一個(gè)數(shù)據(jù)結(jié)構(gòu),遵循先進(jìn)先出原則

kafka集群安裝

參考官方文檔:https://kafka.apachecn.org/quickstart.html

  • 每臺(tái)服務(wù)器上安裝jdk1.8環(huán)境
  • 安裝Zookeeper集群環(huán)境
  • 安裝kafka集群環(huán)境
  • 運(yùn)行環(huán)境測試

安裝jdk環(huán)境和zookeeper這里不詳述了。

kafka為什么依賴于zookeeper:kafka會(huì)將mq信息存放到zookeeper上,為了使整個(gè)集群能夠方便擴(kuò)展,采用zookeeper的事件通知相互感知。

kafka集群安裝步驟:

1、下載kafka的壓縮包,下載地址:https://kafka.apachecn.org/downloads.html

2、解壓安裝包

tar -zxvf kafka_2.11-1.0.0.tgz

3、修改kafka的配置文件 config/server.properties

配置文件修改內(nèi)容:

  • zookeeper連接地址:zookeeper.connect=192.168.1.19:2181
  • 監(jiān)聽的ip,修改為本機(jī)的iplisteners=PLAINTEXT://192.168.1.19:9092
  • kafka的brokerid,每臺(tái)broker的id都不一樣broker.id=0

4、依次啟動(dòng)kafka

./kafka-server-start.sh -daemon config/server.properties

kafka使用

kafka文件存儲(chǔ)

topic是邏輯上的概念,而partition是物理上的概念,每個(gè)partition對(duì)應(yīng)于一個(gè)log文件,該log文件中存儲(chǔ)的就是Producer生成的數(shù)據(jù)。Producer生成的數(shù)據(jù)會(huì)被不斷追加到該log文件末端,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,將每個(gè)partition分為多個(gè)segment,每個(gè)segment包括:“.index”文件、“.log”文件和.timeindex等文件。這些文件位于一個(gè)文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號(hào)。

例如:執(zhí)行命令新建一個(gè)主題,分三個(gè)區(qū)存放放在三個(gè)broker中:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kaico

  • 一個(gè)partition分為多個(gè)segment
  • .log 日志文件
  • .index 偏移量索引文件
  • .timeindex 時(shí)間戳索引文件
  • 其他文件(partition.metadata,leader-epoch-checkpoint)

Springboot整合kafka

maven依賴

 <dependencies>
        <!-- springBoot集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

yml配置

# kafka
spring:
  kafka:
    # kafka服務(wù)器地址(可以多個(gè))
#    bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
    bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
    consumer:
      # 指定一個(gè)默認(rèn)的組名
      group-id: kafkaGroup1
      # earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),從頭開始消費(fèi)
      # latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
      # none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 緩存容量
      buffer-memory: 524288
      # 服務(wù)器地址
      bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094

生產(chǎn)者

@RestController
public class KafkaController {
	/**
	 * 注入kafkaTemplate
	 */
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	/**
	 * 發(fā)送消息的方法
	 *
	 * @param key
	 *            推送數(shù)據(jù)的key
	 * @param data
	 *            推送數(shù)據(jù)的data
	 */
	private void send(String key, String data) {
		// topic 名稱 key   data 消息數(shù)據(jù)
		kafkaTemplate.send("kaico", key, data);
	}
	// test 主題 1 my_test 3
	@RequestMapping("/kafka")
	public String testKafka() {
		int iMax = 6;
		for (int i = 1; i < iMax; i++) {
			send("key" + i, "data" + i);
		}
		return "success";
	}
}

消費(fèi)者

@Component
public class TopicKaicoConsumer {
    /**
     * 消費(fèi)者使用日志打印消息
     */
    @KafkaListener(topics = "kaico") //監(jiān)聽的主題
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名稱:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分區(qū)位置:" + consumer.partition()
                + ", 下標(biāo)" + consumer.offset());
        //輸出key對(duì)應(yīng)的value的值
        System.out.println(consumer.value());
    }
}

到此這篇關(guān)于Java分布式學(xué)習(xí)之Kafka消息隊(duì)列的文章就介紹到這了,更多相關(guān)Java Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中LinkedList詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    Java中LinkedList詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    LinkedList 是一個(gè)繼承于AbstractSequentialList的雙向鏈表。它也可以被當(dāng)作堆棧、隊(duì)列或雙端隊(duì)列進(jìn)行操作。接下來通過示例代碼給大家詳細(xì)介紹java中l(wèi)inkedlist的使用,需要的朋友參考下吧
    2017-05-05
  • 帶大家深入了解Spring事務(wù)

    帶大家深入了解Spring事務(wù)

    Spring框架提供統(tǒng)一的事務(wù)抽象,通過統(tǒng)一的編程模型使得應(yīng)用程序可以很容易地在不同的事務(wù)框架之間進(jìn)行切換. 在學(xué)習(xí)Spring事務(wù)前,我們先對(duì)數(shù)據(jù)庫事務(wù)進(jìn)行簡單的介紹。,需要的朋友可以參考下
    2021-05-05
  • Mybatis延遲加載原理和延遲加載配置詳解

    Mybatis延遲加載原理和延遲加載配置詳解

    這篇文章主要介紹了Mybatis延遲加載原理和延遲加載配置詳解,MyBatis中的延遲加載,也稱為懶加載,是指在進(jìn)行表的關(guān)聯(lián)查詢時(shí),按照設(shè)置延遲規(guī)則推遲對(duì)關(guān)聯(lián)對(duì)象的select查詢,需要的朋友可以參考下
    2023-10-10
  • httpclient的CPool定義方法詳解

    httpclient的CPool定義方法詳解

    這篇文章主要為大家介紹了httpclient的CPool定義方法詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-10-10
  • 使用String類型小數(shù)值轉(zhuǎn)換為Long類型

    使用String類型小數(shù)值轉(zhuǎn)換為Long類型

    這篇文章主要介紹了使用String類型小數(shù)值轉(zhuǎn)換為Long類型操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • Java中文件的讀寫方法之IO流詳解

    Java中文件的讀寫方法之IO流詳解

    這篇文章主要介紹了Java中文件的讀寫方法之IO流詳解,本文中的代碼所涉及到的路徑或者文件都是本人的,大家得換成自己的,并且大家可以在網(wǎng)上自行找一些材料作為讀入或者寫入的材料,不過路徑最好是英文的,不要包含中文,防止JVM讀取失敗
    2022-04-04
  • JAVA十大排序算法之堆排序詳解

    JAVA十大排序算法之堆排序詳解

    這篇文章主要介紹了java中的冒泡排序,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考
    2021-08-08
  • JAVA設(shè)計(jì)模式之解釋器模式詳解

    JAVA設(shè)計(jì)模式之解釋器模式詳解

    這篇文章主要介紹了JAVA設(shè)計(jì)模式之解釋器模式詳解,解釋器模式是類的行為模式,給定一個(gè)語言之后,解釋器模式可以定義出其文法的一種表示,并同時(shí)提供一個(gè)解釋器,需要的朋友可以參考下
    2015-04-04
  • springboot如何集成Swagger2

    springboot如何集成Swagger2

    這篇文章主要介紹了springboot集成Swagger2的方法,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下
    2020-12-12
  • java不通過配置文件初始化logger示例

    java不通過配置文件初始化logger示例

    這篇文章主要介紹了java不通過配置文件初始化logger示例,需要的朋友可以參考下
    2014-05-05

最新評(píng)論