Springboot微服務項目整合Kafka實現(xiàn)文章上下架功能
前言:
1.前面基于Springboot的單體項目介紹已經(jīng)完結了,至于項目中的其他功能實現(xiàn)我這里就不打算介紹了,因為涉及的知識點不難,而且都是簡單的CRUD操作,假如有興趣的話可以私信我我再看看要不要寫幾篇文章做個介紹。
2.完成上一階段的學習,我就投入到了微服務的學習當中,所用教程為B站上面黑馬的微服務教程。由于我的記性不是很好,所以對于新事物的學習我比較喜歡做筆記以加強理解,在這里我會將筆記的重點內(nèi)容做個總結發(fā)布到“微服務學習”筆記欄目中。我是趙四,一名有追求的程序員,希望大家能多多支持,能給我點個關注就更好了。
一:Kafka消息發(fā)送快速入門
1.傳遞字符串消息
(1)發(fā)送消息
創(chuàng)建一個Controller包并編寫一個測試類用于發(fā)送消息
package com.my.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("hello")
public String helloProducer(){
kafkaTemplate.send("my-topic","Hello~");
return "ok";
}
}(2)監(jiān)聽消息
編寫測試類用于接收消息:
package com.my.kafka.listener;
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "my-topic")
public void helloListener(String message) {
if(StringUtils.isNotBlank(message)) {
System.out.println(message);
}
}
}(3)測試結果
打開瀏覽器輸入localhost:9991/hello,然后到控制臺查看消息,可以看到成功消息監(jiān)聽到并且進行了消費。

2.傳遞對象消息
目前springboot整合后的kafka,因為序列化器是StringSerializer,這個時候如果需要傳遞對象可以有兩種方式:
方式一:可以自定義序列化器,對象類型眾多,這種方式通用性不強,這里不做介紹。
方式二:可以把要傳遞的對象進行轉json字符串,接收消息后再轉為對象即可,本項目采用這種方式。
(1)修改生產(chǎn)者代碼
@GetMapping("hello")
public String helloProducer(){
User user = new User();
user.setName("趙四");
user.setAge(20);
kafkaTemplate.send("my-topic", JSON.toJSONString(user));
return "ok";
}(2)結果測試

可以看到成功接收都對象參數(shù),后期要使用該對象只需要將其轉換成User對象即可。
二:功能引入
1.需求分析
發(fā)布文章之后,可能會由于文章出現(xiàn)某些錯誤或者其他原因,我們會在文章管理端實現(xiàn)文章的上下架功能(見下圖),也即當管理端實現(xiàn)對文章下架之后移動端將不會再展示該文章,只有該文章重新被上架之后才能在移動端看到該文章信息。

2.邏輯分析

后端接收到前端傳過來的參數(shù)之后要先做一個校驗,參數(shù)不為空才能繼續(xù)往下執(zhí)行,首先應該根據(jù)前端傳過來的文章id(自媒體端文章id)查詢自媒體數(shù)據(jù)庫的文章信息并判斷該文章是否已是發(fā)布狀態(tài),因為只有審核成功并成功發(fā)布了的文章才能進行上下架操作。自媒體端微服務對文章上下架狀態(tài)進行修改之后便可以向Kafka發(fā)送一條消息,該消息為Map對象,里面存儲的數(shù)據(jù)為移動端的文章id以及前端傳過來的上下架參數(shù)enable,當然要將該Map對象轉換成JSON字符串才能進行發(fā)送。
文章微服務監(jiān)聽到Kafka發(fā)送過來的消息之后將JSON字符串轉換成Map對象之后再獲取相關參數(shù)對移動端文章的上下架狀態(tài)進行修改。
三:前期準備
1.引入依賴
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>2.定義常量
package com.my.common.constans;
public class WmNewsMessageConstants {
public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}3.Kafka配置信息
由于我是用Nacos來作為注冊中心,所以配置信息放置在Nacos上面即可。
(1)自媒體端配置
spring:
kafka:
bootstrap-servers: 4.234.52.122:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer(2)移動端配置
spring:
kafka:
bootstrap-servers: 4.234.52.122:9092
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer四:代碼實現(xiàn)
1.自媒體端
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 文章下架或上架
* @param id
* @param enable
* @return
*/
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
log.info("執(zhí)行文章上下架操作...");
if(id == null || enable == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//根據(jù)id獲取文章
WmNews news = getById(id);
if(news == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
}
//獲取當前文章狀態(tài)
Short status = news.getStatus();
if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非發(fā)布狀態(tài),不能上下架");
}
//更改文章狀態(tài)
news.setEnable(enable.shortValue());
updateById(news);
log.info("更改文章上架狀態(tài){}-->{}",status,news.getEnable());
//發(fā)送消息到Kafka
Map<String, Object> map = new HashMap<>();
map.put("articleId",news.getArticleId());
map.put("enable",enable.shortValue());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
log.info("發(fā)送消息到Kafka...");
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}2.移動端
(1)設置監(jiān)聽器
package com.my.article.listener;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
@Slf4j
@Component
public class EnableListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void downOrUp(String message) {
if(StringUtils.isNotBlank(message)) {
log.info("監(jiān)聽到消息{}",message);
apArticleService.downOrUp(message);
}
}
}(2)獲取消息并修改文章狀態(tài)
/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
Map map = JSON.parseObject(message, Map.class);
//獲取文章id
Long articleId = (Long) map.get("articleId");
//獲取文章待修改狀態(tài)
Integer enable = (Integer) map.get("enable");
//查詢文章配置
ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
(Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
if(apArticleConfig != null) {
//上架
if(enable == 1) {
log.info("文章重新上架");
apArticleConfig.setIsDown(false);
apArticleConfigMapper.updateById(apArticleConfig);
}
//下架
if(enable == 0) {
log.info("文章下架");
apArticleConfig.setIsDown(true);
apArticleConfigMapper.updateById(apArticleConfig);
}
}
else {
throw new RuntimeException("文章信息不存在");
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}到此這篇關于Springboot微服務項目整合Kafka實現(xiàn)文章上下架功能的文章就介紹到這了,更多相關Springboot整合Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
springboot?vue測試平臺接口定義前后端新增功能實現(xiàn)
這篇文章主要介紹了springboot?vue測試平臺接口定義前后端新增功能實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
Java中Pattern.compile函數(shù)的使用詳解
這篇文章主要介紹了Java中Pattern.compile函數(shù)的使用詳解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
Spring注解開發(fā)@Bean和@ComponentScan使用案例
這篇文章主要介紹了Spring注解開發(fā)@Bean和@ComponentScan使用案例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-09-09
Springboot如何使用@Async實現(xiàn)異步任務
這篇文章主要介紹了Springboot如何使用@Async實現(xiàn)異步任務問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09
Java并發(fā)編程深入理解之Synchronized的使用及底層原理詳解 上
在并發(fā)編程中存在線程安全問題,主要原因有:1.存在共享數(shù)據(jù) 2.多線程共同操作共享數(shù)據(jù)。關鍵字synchronized可以保證在同一時刻,只有一個線程可以執(zhí)行某個方法或某個代碼塊,同時synchronized可以保證一個線程的變化可見(可見性),即可以代替volatile2021-09-09
mybatis-plus動態(tài)表名實現(xiàn)方法
本文主要介紹了mybatis-plus動態(tài)表名實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
詳解Java中NullPointerException異常的原因和解決辦法
本文主要介紹了詳解Java中NullPointerException異常的原因和解決辦法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-07-07

