SpringBoot3+Kafka實(shí)戰(zhàn)指南
1. 項(xiàng)目層級(jí)
像火鍋店的分工:點(diǎn)單員、傳菜員、食客清清楚楚。
kafka/ ├── pom.xml # 根 POM(BOM對(duì)齊) ├── provider/ # 點(diǎn)單:生產(chǎn)者 │ ├── pom.xml # 子模塊 POM │ └── src/main/java/org/example/provider/ │ ├── ProviderApplication.java │ ├── conf/KafkaTopicsConfig.java │ ├── controller/ProviderController.java │ └── service/KafkaProducerService.java │ └── src/main/resources/application.yaml └── consumer/ # 上桌:消費(fèi)者 ├── pom.xml # 子模塊 POM └── src/main/java/org/example/consumer/ ├── ConsumerApplication.java └── listener/KafkaConsumerListener.java └── src/main/resources/application.yaml
2. 根 POM(大廚的調(diào)料表)
<modules> <module>provider</module> <module>consumer</module> </modules> <properties> <java.version>17</java.version> <spring.boot.version>3.4.3</spring.boot.version> <spring.cloud.version>2024.0.2</spring.cloud.version> </properties> <!-- 關(guān)鍵:用 BOM 管理依賴版本(不用 parent 也行) --> <dependencyManagement> <dependencies> <!-- Spring Boot 依賴版本對(duì)齊(含 starter、lombok 等) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!-- Spring Cloud 依賴版本對(duì)齊 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- hutool工具類 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-ai</artifactId> <version>5.8.38</version> </dependency> </dependencies>
?? 全局版本對(duì)齊,避免“鍋底和食材不搭”。
3. 子模塊 POM
3.1 provider/pom.xml
<parent> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>provider</name> <description>provider</description> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.2 consumer/pom.xml
<parent> <groupId>org.example</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consumer</name> <description>consumer</description> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.58</version> </dependency> </dependencies>
4. 配置(菜單寫清楚)
4.1 Provider(application.yaml - 生產(chǎn)者)
server: port: 1003 # 本模塊 HTTP 端口 app: kafka: topic: demo.topic.v1 # 要發(fā)送/創(chuàng)建的主題名 auto-create-topic: true # 開(kāi)啟后,會(huì)注冊(cè) NewTopic bean 從而在啟動(dòng)時(shí)創(chuàng)建主題(見(jiàn) KafkaTopicsConfig) spring: kafka: bootstrap-servers: yiqiquhuxi.cn:9092 # 數(shù)據(jù)網(wǎng)絡(luò)IO 序列化方式 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 發(fā)送字符串 # 可靠性 acks: all retries: 3
4.2 Consumer(application.yaml - 消費(fèi)者)
server: port: 1004 # 本模塊端口(通常只看日志) app: kafka: topic: demo.topic.v1 # 要訂閱的主題名(與 provider 保持一致) spring: kafka: bootstrap-servers: yiqiquhuxi.cn:9092 consumer: group-id: demo-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 接收字符串 # 說(shuō)明: # - JsonDeserializer 的默認(rèn)類型鍵為 spring.json.value.default.type(源碼常量 VALUE_DEFAULT_TYPE)。:contentReference[oaicite:7]{index=7} # - @KafkaListener 支持使用 ${...} 占位符讀取上述配置。:contentReference[oaicite:8]{index=8}
5. 核心代碼(廚師上陣)
5.1 入口
@SpringBootApplication public class ProviderApplication { public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); } } @SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
5.2 消息模型
public record MessagePayload(String id, String content, long ts) {}
5.3 Provider(點(diǎn)菜 + 上菜)
@Service public class KafkaProducerService { @Autowired private KafkaTemplate<String, String> kafka; @Value("${app.kafka.topic}") private String topic; public void send(String content) { MessagePayload payload = new MessagePayload( UUID.randomUUID().toString(), content, System.currentTimeMillis() ); //序列化 String jsonStr = JSONUtil.toJsonStr(payload); kafka.send(topic, jsonStr); } } @RestController @RequestMapping("/provider") public class ProviderController { @Autowired private KafkaProducerService producer; @GetMapping("/done") public String done() { producer.send("done"); return "done"; } }
5.4 Provider(創(chuàng)建 Topic)
@Configuration public class KafkaTopicsConfig { @Value("${app.kafka.topic}") private String topic; // 只有當(dāng) app.kafka.auto-create-topic=true(或缺省并 matchIfMissing=true)才注冊(cè) NewTopic @Bean @ConditionalOnProperty(name = "app.kafka.auto-create-topic", havingValue = "true", matchIfMissing = true) public NewTopic demoTopic() { // 分區(qū)/副本按你的集群實(shí)際調(diào)整;單 Broker 可用 (3,1) return new NewTopic(topic, 3, (short) 1); } }
?? 有了它,就不用手動(dòng) kafka-topics.sh --create
,Spring Boot 啟動(dòng)時(shí)就能幫你“先起鍋燒水”。
5.5 Consumer(開(kāi)吃)
@Slf4j @Component public class KafkaConsumerListener { @KafkaListener( topics = "${app.kafka.topic}", groupId = "${spring.kafka.consumer.group-id}" ) public void onMessage(String msg) { try { // json反序列化成對(duì)象 MessagePayload payload = JSON.parseObject(msg, MessagePayload.class); log.info("? received: id={}, content={}, ts={}", payload.id(), payload.content(), payload.ts()); } catch (Exception e) { log.error("? JSON解析失敗,原始消息: {}", msg, e); } } }
6. 運(yùn)行流程
- 點(diǎn)火:Kafka Broker 先啟動(dòng)
- 開(kāi)店:先跑 consumer,再跑 provider
- 點(diǎn)單:
GET http://localhost:1003/provider/done
- 吃菜:consumer 日志里出現(xiàn) ?? → 成功!
7. 常見(jiàn)坑
- 鍋點(diǎn)不著:
bootstrap-servers
不通,先查網(wǎng)絡(luò) - 沒(méi)菜:topic 不存在?開(kāi)
auto-create-topic
- 吃不到:改
group-id
或加auto-offset-reset=earliest
- 串味了:序列化不匹配 Producer 默認(rèn)用
StringSerializer
,Consumer 卻用JsonDeserializer
,兩邊火候不對(duì),消息就“夾生”了。- 建議:
- 如果傳字符串,就都用
StringSerializer
/StringDeserializer
。 - 如果傳對(duì)象,就統(tǒng)一用
JsonSerializer
/JsonDeserializer
,并在application.yaml
里顯式聲明spring.json.value.default.type
。
- 如果傳字符串,就都用
- 建議:
8. 總結(jié)
Spring Boot + Kafka 的套路:
?? Provider 點(diǎn)單,Kafka 傳菜,Consumer 開(kāi)吃。
到此這篇關(guān)于SpringBoot3+Kafka實(shí)戰(zhàn)指南的文章就介紹到這了,更多相關(guān)SpringBoot3 Kafka實(shí)戰(zhàn)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringBoot項(xiàng)目整合Vue做一個(gè)完整的用戶注冊(cè)功能
本文主要介紹了SpringBoot項(xiàng)目整合Vue做一個(gè)完整的用戶注冊(cè)功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07SSH框架網(wǎng)上商城項(xiàng)目第14戰(zhàn)之商城首頁(yè)UI的設(shè)計(jì)
這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項(xiàng)目第14戰(zhàn)之商城首頁(yè)UI的設(shè)計(jì),感興趣的小伙伴們可以參考一下2016-06-06MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用詳解
這篇文章主要介紹了MyBatis-Plus QueryWrapper及LambdaQueryWrapper的使用詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03java微信企業(yè)號(hào)開(kāi)發(fā)之開(kāi)發(fā)模式的開(kāi)啟
這篇文章主要為大家詳細(xì)介紹了java微信企業(yè)號(hào)開(kāi)發(fā)之開(kāi)發(fā)模式的開(kāi)啟方法,感興趣的小伙伴們可以參考一下2016-06-06Spring?AI?+?ollama?本地搭建聊天?AI?功能
這篇文章主要介紹了Spring?AI?+?ollama?本地搭建聊天?AI?,本文通過(guò)實(shí)例圖文相結(jié)合給大家講解的非常詳細(xì),需要的朋友可以參考下2024-11-11Spring Boot2配置Swagger2生成API接口文檔詳情
這篇文章主要介紹了Spring Boot2配置Swagger2生成API接口文檔詳情,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09Java流程控制語(yǔ)句之If選擇結(jié)構(gòu)
今天繼續(xù)帶大家復(fù)習(xí)Java流程控制語(yǔ)句的相關(guān)知識(shí),本文對(duì)If選擇結(jié)構(gòu)作了非常詳細(xì)的介紹及代碼示例,對(duì)正在學(xué)習(xí)的小伙伴們很有幫助,需要的朋友可以參考下2021-06-06