Springboot微服務(wù)項(xiàng)目整合Kafka實(shí)現(xiàn)文章上下架功能
前言:
1.前面基于Springboot的單體項(xiàng)目介紹已經(jīng)完結(jié)了,至于項(xiàng)目中的其他功能實(shí)現(xiàn)我這里就不打算介紹了,因?yàn)樯婕暗闹R(shí)點(diǎn)不難,而且都是簡(jiǎn)單的CRUD操作,假如有興趣的話(huà)可以私信我我再看看要不要寫(xiě)幾篇文章做個(gè)介紹。
2.完成上一階段的學(xué)習(xí),我就投入到了微服務(wù)的學(xué)習(xí)當(dāng)中,所用教程為B站上面黑馬的微服務(wù)教程。由于我的記性不是很好,所以對(duì)于新事物的學(xué)習(xí)我比較喜歡做筆記以加強(qiáng)理解,在這里我會(huì)將筆記的重點(diǎn)內(nèi)容做個(gè)總結(jié)發(fā)布到“微服務(wù)學(xué)習(xí)”筆記欄目中。我是趙四,一名有追求的程序員,希望大家能多多支持,能給我點(diǎn)個(gè)關(guān)注就更好了。
一:Kafka消息發(fā)送快速入門(mén)
1.傳遞字符串消息
(1)發(fā)送消息
創(chuàng)建一個(gè)Controller包并編寫(xiě)一個(gè)測(cè)試類(lèi)用于發(fā)送消息
package com.my.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("hello") public String helloProducer(){ kafkaTemplate.send("my-topic","Hello~"); return "ok"; } }
(2)監(jiān)聽(tīng)消息
編寫(xiě)測(cè)試類(lèi)用于接收消息:
package com.my.kafka.listener; import org.junit.platform.commons.util.StringUtils; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class HelloListener { @KafkaListener(topics = "my-topic") public void helloListener(String message) { if(StringUtils.isNotBlank(message)) { System.out.println(message); } } }
(3)測(cè)試結(jié)果
打開(kāi)瀏覽器輸入localhost:9991/hello,然后到控制臺(tái)查看消息,可以看到成功消息監(jiān)聽(tīng)到并且進(jìn)行了消費(fèi)。
2.傳遞對(duì)象消息
目前springboot整合后的kafka,因?yàn)樾蛄谢魇荢tringSerializer,這個(gè)時(shí)候如果需要傳遞對(duì)象可以有兩種方式:
方式一:可以自定義序列化器,對(duì)象類(lèi)型眾多,這種方式通用性不強(qiáng),這里不做介紹。
方式二:可以把要傳遞的對(duì)象進(jìn)行轉(zhuǎn)json字符串,接收消息后再轉(zhuǎn)為對(duì)象即可,本項(xiàng)目采用這種方式。
(1)修改生產(chǎn)者代碼
@GetMapping("hello") public String helloProducer(){ User user = new User(); user.setName("趙四"); user.setAge(20); kafkaTemplate.send("my-topic", JSON.toJSONString(user)); return "ok"; }
(2)結(jié)果測(cè)試
可以看到成功接收都對(duì)象參數(shù),后期要使用該對(duì)象只需要將其轉(zhuǎn)換成User對(duì)象即可。
二:功能引入
1.需求分析
發(fā)布文章之后,可能會(huì)由于文章出現(xiàn)某些錯(cuò)誤或者其他原因,我們會(huì)在文章管理端實(shí)現(xiàn)文章的上下架功能(見(jiàn)下圖),也即當(dāng)管理端實(shí)現(xiàn)對(duì)文章下架之后移動(dòng)端將不會(huì)再展示該文章,只有該文章重新被上架之后才能在移動(dòng)端看到該文章信息。
2.邏輯分析
后端接收到前端傳過(guò)來(lái)的參數(shù)之后要先做一個(gè)校驗(yàn),參數(shù)不為空才能繼續(xù)往下執(zhí)行,首先應(yīng)該根據(jù)前端傳過(guò)來(lái)的文章id(自媒體端文章id)查詢(xún)自媒體數(shù)據(jù)庫(kù)的文章信息并判斷該文章是否已是發(fā)布狀態(tài),因?yàn)橹挥袑徍顺晒Σ⒊晒Πl(fā)布了的文章才能進(jìn)行上下架操作。自媒體端微服務(wù)對(duì)文章上下架狀態(tài)進(jìn)行修改之后便可以向Kafka發(fā)送一條消息,該消息為Map對(duì)象,里面存儲(chǔ)的數(shù)據(jù)為移動(dòng)端的文章id以及前端傳過(guò)來(lái)的上下架參數(shù)enable,當(dāng)然要將該Map對(duì)象轉(zhuǎn)換成JSON字符串才能進(jìn)行發(fā)送。
文章微服務(wù)監(jiān)聽(tīng)到Kafka發(fā)送過(guò)來(lái)的消息之后將JSON字符串轉(zhuǎn)換成Map對(duì)象之后再獲取相關(guān)參數(shù)對(duì)移動(dòng)端文章的上下架狀態(tài)進(jìn)行修改。
三:前期準(zhǔn)備
1.引入依賴(lài)
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
2.定義常量
package com.my.common.constans; public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
3.Kafka配置信息
由于我是用Nacos來(lái)作為注冊(cè)中心,所以配置信息放置在Nacos上面即可。
(1)自媒體端配置
spring: kafka: bootstrap-servers: 4.234.52.122:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
(2)移動(dòng)端配置
spring: kafka: bootstrap-servers: 4.234.52.122:9092 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
四:代碼實(shí)現(xiàn)
1.自媒體端
@Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 文章下架或上架 * @param id * @param enable * @return */ @Override public ResponseResult downOrUp(Integer id,Integer enable) { log.info("執(zhí)行文章上下架操作..."); if(id == null || enable == null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //根據(jù)id獲取文章 WmNews news = getById(id); if(news == null) { return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在"); } //獲取當(dāng)前文章?tīng)顟B(tài) Short status = news.getStatus(); if(!status.equals(WmNews.Status.PUBLISHED.getCode())) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非發(fā)布狀態(tài),不能上下架"); } //更改文章?tīng)顟B(tài) news.setEnable(enable.shortValue()); updateById(news); log.info("更改文章上架狀態(tài){}-->{}",status,news.getEnable()); //發(fā)送消息到Kafka Map<String, Object> map = new HashMap<>(); map.put("articleId",news.getArticleId()); map.put("enable",enable.shortValue()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); log.info("發(fā)送消息到Kafka..."); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
2.移動(dòng)端
(1)設(shè)置監(jiān)聽(tīng)器
package com.my.article.listener; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.my.article.service.ApArticleService; import com.my.common.constans.WmNewsMessageConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.kafka.annotation.KafkaListener; @Slf4j @Component public class EnableListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void downOrUp(String message) { if(StringUtils.isNotBlank(message)) { log.info("監(jiān)聽(tīng)到消息{}",message); apArticleService.downOrUp(message); } } }
(2)獲取消息并修改文章?tīng)顟B(tài)
/** * 文章上下架 * @param message * @return */ @Override public ResponseResult downOrUp(String message) { Map map = JSON.parseObject(message, Map.class); //獲取文章id Long articleId = (Long) map.get("articleId"); //獲取文章待修改狀態(tài) Integer enable = (Integer) map.get("enable"); //查詢(xún)文章配置 ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId)); if(apArticleConfig != null) { //上架 if(enable == 1) { log.info("文章重新上架"); apArticleConfig.setIsDown(false); apArticleConfigMapper.updateById(apArticleConfig); } //下架 if(enable == 0) { log.info("文章下架"); apArticleConfig.setIsDown(true); apArticleConfigMapper.updateById(apArticleConfig); } } else { throw new RuntimeException("文章信息不存在"); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
到此這篇關(guān)于Springboot微服務(wù)項(xiàng)目整合Kafka實(shí)現(xiàn)文章上下架功能的文章就介紹到這了,更多相關(guān)Springboot整合Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java之JFrame輸出Helloworld實(shí)例
這篇文章主要介紹了Java之JFrame輸出Helloworld的方法,以輸出Helloworld的實(shí)例分析了JFrame的簡(jiǎn)單入門(mén)技巧,需要的朋友可以參考下2015-02-02springboot?vue測(cè)試平臺(tái)接口定義前后端新增功能實(shí)現(xiàn)
這篇文章主要介紹了springboot?vue測(cè)試平臺(tái)接口定義前后端新增功能實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05Java中Pattern.compile函數(shù)的使用詳解
這篇文章主要介紹了Java中Pattern.compile函數(shù)的使用詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08IDEA中項(xiàng)目集成git提交代碼的詳細(xì)步驟
這篇文章主要介紹了IDEA中項(xiàng)目集成git提交代碼的詳細(xì)步驟,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10Spring注解開(kāi)發(fā)@Bean和@ComponentScan使用案例
這篇文章主要介紹了Spring注解開(kāi)發(fā)@Bean和@ComponentScan使用案例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09Springboot如何使用@Async實(shí)現(xiàn)異步任務(wù)
這篇文章主要介紹了Springboot如何使用@Async實(shí)現(xiàn)異步任務(wù)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09Java并發(fā)編程深入理解之Synchronized的使用及底層原理詳解 上
在并發(fā)編程中存在線(xiàn)程安全問(wèn)題,主要原因有:1.存在共享數(shù)據(jù) 2.多線(xiàn)程共同操作共享數(shù)據(jù)。關(guān)鍵字synchronized可以保證在同一時(shí)刻,只有一個(gè)線(xiàn)程可以執(zhí)行某個(gè)方法或某個(gè)代碼塊,同時(shí)synchronized可以保證一個(gè)線(xiàn)程的變化可見(jiàn)(可見(jiàn)性),即可以代替volatile2021-09-09mybatis-plus動(dòng)態(tài)表名實(shí)現(xiàn)方法
本文主要介紹了mybatis-plus動(dòng)態(tài)表名實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02詳解Java中NullPointerException異常的原因和解決辦法
本文主要介紹了詳解Java中NullPointerException異常的原因和解決辦法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07