Springboot微服務(wù)項(xiàng)目整合Kafka實(shí)現(xiàn)文章上下架功能
前言:
1.前面基于Springboot的單體項(xiàng)目介紹已經(jīng)完結(jié)了,至于項(xiàng)目中的其他功能實(shí)現(xiàn)我這里就不打算介紹了,因?yàn)樯婕暗闹R點(diǎn)不難,而且都是簡單的CRUD操作,假如有興趣的話可以私信我我再看看要不要寫幾篇文章做個介紹。
2.完成上一階段的學(xué)習(xí),我就投入到了微服務(wù)的學(xué)習(xí)當(dāng)中,所用教程為B站上面黑馬的微服務(wù)教程。由于我的記性不是很好,所以對于新事物的學(xué)習(xí)我比較喜歡做筆記以加強(qiáng)理解,在這里我會將筆記的重點(diǎn)內(nèi)容做個總結(jié)發(fā)布到“微服務(wù)學(xué)習(xí)”筆記欄目中。我是趙四,一名有追求的程序員,希望大家能多多支持,能給我點(diǎn)個關(guān)注就更好了。
一:Kafka消息發(fā)送快速入門
1.傳遞字符串消息
(1)發(fā)送消息
創(chuàng)建一個Controller包并編寫一個測試類用于發(fā)送消息
package com.my.kafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @GetMapping("hello") public String helloProducer(){ kafkaTemplate.send("my-topic","Hello~"); return "ok"; } }
(2)監(jiān)聽消息
編寫測試類用于接收消息:
package com.my.kafka.listener; import org.junit.platform.commons.util.StringUtils; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class HelloListener { @KafkaListener(topics = "my-topic") public void helloListener(String message) { if(StringUtils.isNotBlank(message)) { System.out.println(message); } } }
(3)測試結(jié)果
打開瀏覽器輸入localhost:9991/hello,然后到控制臺查看消息,可以看到成功消息監(jiān)聽到并且進(jìn)行了消費(fèi)。
2.傳遞對象消息
目前springboot整合后的kafka,因?yàn)樾蛄谢魇荢tringSerializer,這個時候如果需要傳遞對象可以有兩種方式:
方式一:可以自定義序列化器,對象類型眾多,這種方式通用性不強(qiáng),這里不做介紹。
方式二:可以把要傳遞的對象進(jìn)行轉(zhuǎn)json字符串,接收消息后再轉(zhuǎn)為對象即可,本項(xiàng)目采用這種方式。
(1)修改生產(chǎn)者代碼
@GetMapping("hello") public String helloProducer(){ User user = new User(); user.setName("趙四"); user.setAge(20); kafkaTemplate.send("my-topic", JSON.toJSONString(user)); return "ok"; }
(2)結(jié)果測試
可以看到成功接收都對象參數(shù),后期要使用該對象只需要將其轉(zhuǎn)換成User對象即可。
二:功能引入
1.需求分析
發(fā)布文章之后,可能會由于文章出現(xiàn)某些錯誤或者其他原因,我們會在文章管理端實(shí)現(xiàn)文章的上下架功能(見下圖),也即當(dāng)管理端實(shí)現(xiàn)對文章下架之后移動端將不會再展示該文章,只有該文章重新被上架之后才能在移動端看到該文章信息。
2.邏輯分析
后端接收到前端傳過來的參數(shù)之后要先做一個校驗(yàn),參數(shù)不為空才能繼續(xù)往下執(zhí)行,首先應(yīng)該根據(jù)前端傳過來的文章id(自媒體端文章id)查詢自媒體數(shù)據(jù)庫的文章信息并判斷該文章是否已是發(fā)布狀態(tài),因?yàn)橹挥袑徍顺晒Σ⒊晒Πl(fā)布了的文章才能進(jìn)行上下架操作。自媒體端微服務(wù)對文章上下架狀態(tài)進(jìn)行修改之后便可以向Kafka發(fā)送一條消息,該消息為Map對象,里面存儲的數(shù)據(jù)為移動端的文章id以及前端傳過來的上下架參數(shù)enable,當(dāng)然要將該Map對象轉(zhuǎn)換成JSON字符串才能進(jìn)行發(fā)送。
文章微服務(wù)監(jiān)聽到Kafka發(fā)送過來的消息之后將JSON字符串轉(zhuǎn)換成Map對象之后再獲取相關(guān)參數(shù)對移動端文章的上下架狀態(tài)進(jìn)行修改。
三:前期準(zhǔn)備
1.引入依賴
<!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
2.定義常量
package com.my.common.constans; public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
3.Kafka配置信息
由于我是用Nacos來作為注冊中心,所以配置信息放置在Nacos上面即可。
(1)自媒體端配置
spring: kafka: bootstrap-servers: 4.234.52.122:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
(2)移動端配置
spring: kafka: bootstrap-servers: 4.234.52.122:9092 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
四:代碼實(shí)現(xiàn)
1.自媒體端
@Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 文章下架或上架 * @param id * @param enable * @return */ @Override public ResponseResult downOrUp(Integer id,Integer enable) { log.info("執(zhí)行文章上下架操作..."); if(id == null || enable == null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //根據(jù)id獲取文章 WmNews news = getById(id); if(news == null) { return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在"); } //獲取當(dāng)前文章狀態(tài) Short status = news.getStatus(); if(!status.equals(WmNews.Status.PUBLISHED.getCode())) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非發(fā)布狀態(tài),不能上下架"); } //更改文章狀態(tài) news.setEnable(enable.shortValue()); updateById(news); log.info("更改文章上架狀態(tài){}-->{}",status,news.getEnable()); //發(fā)送消息到Kafka Map<String, Object> map = new HashMap<>(); map.put("articleId",news.getArticleId()); map.put("enable",enable.shortValue()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); log.info("發(fā)送消息到Kafka..."); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
2.移動端
(1)設(shè)置監(jiān)聽器
package com.my.article.listener; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.my.article.service.ApArticleService; import com.my.common.constans.WmNewsMessageConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.kafka.annotation.KafkaListener; @Slf4j @Component public class EnableListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void downOrUp(String message) { if(StringUtils.isNotBlank(message)) { log.info("監(jiān)聽到消息{}",message); apArticleService.downOrUp(message); } } }
(2)獲取消息并修改文章狀態(tài)
/** * 文章上下架 * @param message * @return */ @Override public ResponseResult downOrUp(String message) { Map map = JSON.parseObject(message, Map.class); //獲取文章id Long articleId = (Long) map.get("articleId"); //獲取文章待修改狀態(tài) Integer enable = (Integer) map.get("enable"); //查詢文章配置 ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId)); if(apArticleConfig != null) { //上架 if(enable == 1) { log.info("文章重新上架"); apArticleConfig.setIsDown(false); apArticleConfigMapper.updateById(apArticleConfig); } //下架 if(enable == 0) { log.info("文章下架"); apArticleConfig.setIsDown(true); apArticleConfigMapper.updateById(apArticleConfig); } } else { throw new RuntimeException("文章信息不存在"); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
到此這篇關(guān)于Springboot微服務(wù)項(xiàng)目整合Kafka實(shí)現(xiàn)文章上下架功能的文章就介紹到這了,更多相關(guān)Springboot整合Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java之JFrame輸出Helloworld實(shí)例
這篇文章主要介紹了Java之JFrame輸出Helloworld的方法,以輸出Helloworld的實(shí)例分析了JFrame的簡單入門技巧,需要的朋友可以參考下2015-02-02springboot?vue測試平臺接口定義前后端新增功能實(shí)現(xiàn)
這篇文章主要介紹了springboot?vue測試平臺接口定義前后端新增功能實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05Java中Pattern.compile函數(shù)的使用詳解
這篇文章主要介紹了Java中Pattern.compile函數(shù)的使用詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08IDEA中項(xiàng)目集成git提交代碼的詳細(xì)步驟
這篇文章主要介紹了IDEA中項(xiàng)目集成git提交代碼的詳細(xì)步驟,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10Spring注解開發(fā)@Bean和@ComponentScan使用案例
這篇文章主要介紹了Spring注解開發(fā)@Bean和@ComponentScan使用案例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09Springboot如何使用@Async實(shí)現(xiàn)異步任務(wù)
這篇文章主要介紹了Springboot如何使用@Async實(shí)現(xiàn)異步任務(wù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09Java并發(fā)編程深入理解之Synchronized的使用及底層原理詳解 上
在并發(fā)編程中存在線程安全問題,主要原因有:1.存在共享數(shù)據(jù) 2.多線程共同操作共享數(shù)據(jù)。關(guān)鍵字synchronized可以保證在同一時刻,只有一個線程可以執(zhí)行某個方法或某個代碼塊,同時synchronized可以保證一個線程的變化可見(可見性),即可以代替volatile2021-09-09mybatis-plus動態(tài)表名實(shí)現(xiàn)方法
本文主要介紹了mybatis-plus動態(tài)表名實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02詳解Java中NullPointerException異常的原因和解決辦法
本文主要介紹了詳解Java中NullPointerException異常的原因和解決辦法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07