Kafka的安裝及接入SpringBoot的詳細過程
環(huán)境:windows、jdk1.8、springboot2
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/
1.概述
Kafka 是一種高性能、分布式的消息隊列系統(tǒng),最初由 LinkedIn 公司開發(fā),并于2011年成為 Apache 頂級項目。它設(shè)計用于處理大規(guī)模的實時數(shù)據(jù)流,具有高吞吐量、低延遲、持久性等特點,被廣泛應(yīng)用于構(gòu)建實時數(shù)據(jù)管道、日志收集、事件驅(qū)動架構(gòu)等場景。
詳細概述見Kafka概述:
1.1 Kafka的作用
- 發(fā)布和訂閱記錄流
- 持久存儲記錄流,Kafka中的數(shù)據(jù)即使消費后也不會消失
- 在系統(tǒng)或應(yīng)用之間構(gòu)建可靠獲取數(shù)據(jù)的實時流數(shù)據(jù)管道
- 構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實時流應(yīng)用程序
- Kafka可以處理源源不斷產(chǎn)生的數(shù)據(jù)
1.2 Kafka的一些概念
- topic:Kafka將消息分門別類,每一類的消息稱之為一個主題(Topic 就是Rabbitmq中的queue)
- producer:發(fā)布消息的對象稱之為主題生產(chǎn)者(Kafka topic producer)
- consumer:訂閱消息并處理發(fā)布的消息的對象稱之為主題消費者(consumers)
- broker:已發(fā)布的消息保存在一組服務(wù)器中,稱之為Kafka集群。集群中的每一個服務(wù)器都是一個代理(Broker)。 消費者可以訂閱一個或多個主題(topic),并從Broker拉數(shù)據(jù),從而消費這些已發(fā)布的消息。
2.Kafka下載安裝
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/downloads
選擇最新版就可以
2.1 配置kafka
解壓下載的文件,修改 config 文件夾下的 zookeeper.properties
修改 config 文件夾下的server.properties
當(dāng)需要外網(wǎng)訪問時要配置advertised.listeners(比如連云服務(wù)器的kafka)
advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092
2.2 啟動 zookeeper
Zookeeper 在 Kafka 中充當(dāng)了分布式協(xié)調(diào)服務(wù)的角色,幫助 Kafka 實現(xiàn)了集群管理、元數(shù)據(jù)存儲、故障恢復(fù)、領(lǐng)導(dǎo)者選舉等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撐。
kafka_2.13-3.7.0\bin\windows文件夾中輸入命令:
zookeeper-server-start.bat ../../config/zookeeper.properties
可以本地訪問看一下:http://localhost:2181/
2.3 啟動Kafka
kafka_2.13-3.7.0\bin\windows文件夾中輸入命令:
kafka-server-start.sh ../../config/server.properties
訪問路徑:http://localhost:9092/
2.4 便捷啟動腳本
兩個腳本放到Kafka的目錄(kafka_2.13-3.7.0)中
cd bin\windows
zookeeper-server-start.bat ../../config/zookeeper.properties
cd bin\windows
kafka-server-start.bat ../../config/server.properties
3.springboot集成Kafka
3.1 環(huán)境搭建
(1)添加pom依賴
<!-- 繼承Spring boot工程 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.8.RELEASE</version> </parent> <properties> <fastjson.version>1.2.58</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
(2)配置類application.yml
生產(chǎn)者:
spring: kafka: bootstrap-servers: xxx.xxx.xxx.xxx:9092 producer: retries: 0 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
消費者:
spring: kafka: bootstrap-servers: xxx.xxx.xxx.xxx:9092 consumer: group-id: kafka-demo-kafka-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
(3)啟動類
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApp { public static void main(String[] args) { SpringApplication.run(KafkaApp.class, args); } }
3.2 消息生產(chǎn)者
junit測試,新建消息發(fā)送方
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; ? ? @RunWith(SpringRunner.class) @SpringBootTest public class KafkaSendTest { @Autowired private KafkaTemplate<String,String> kafkaTemplate; //如果這里有紅色波浪線,那是假錯誤 ? @Test public void sendMsg(){ String topic = "spring_test"; kafkaTemplate.send(topic,"hello spring boot kafka!"); System.out.println("發(fā)送成功."); while (true){ //保存加載ioc容器 ? } } }
3.3 消息消費者
新建監(jiān)聽類:
? import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; ? @Component public class MyKafkaListener { ? // 以下兩種方法都行 // 指定監(jiān)聽的主題 // @KafkaListener(topics = "spring_test") // public void receiveMsg(String message){ // System.out.println("接收到的消息:"+message); // } ? @KafkaListener(topics = "spring_test") public void handleMessage(ConsumerRecord<String, String> record) { System.out.println("接收到消息,偏移量為: " + record.offset() + " 消息為: " + record.value()); } }
到此這篇關(guān)于Kafka的安裝及接入SpringBoot的文章就介紹到這了,更多相關(guān)Kafka的安裝及接入SpringBoot內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
FP-Growth算法的Java實現(xiàn)+具體實現(xiàn)思路+代碼
FP-Growth算法比Apriori算法快很多(但是卻比不上時間,how time slipped away)。在網(wǎng)上搜索后發(fā)現(xiàn)Java實現(xiàn)的FP-Growth算法很少,且大多數(shù)不太能理解):太菜。所以就自己實現(xiàn)了一下。這篇文章重點介紹一下我的Java實現(xiàn)2021-06-06java發(fā)送form-data請求實現(xiàn)文件上傳的示例代碼
最近做一個需求,需要請求第三方接口上傳文件,該請求類型是form-data請求,本文就來介紹一下java發(fā)送form-data請求實現(xiàn)文件上傳的示例代碼,感興趣的可以了解一下2023-12-12spring aop底層源碼執(zhí)行邏輯剖析(源碼解析)
這篇文章主要介紹了spring aop底層源碼執(zhí)行邏輯剖析,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-08-08使用Spring-Retry解決Spring Boot應(yīng)用程序中的重試問題
重試的使用場景比較多,比如調(diào)用遠程服務(wù)時,由于網(wǎng)絡(luò)或者服務(wù)端響應(yīng)慢導(dǎo)致調(diào)用超時,此時可以多重試幾次。用定時任務(wù)也可以實現(xiàn)重試的效果,但比較麻煩,用Spring Retry的話一個注解搞定所有,感興趣的可以了解一下2023-04-04