Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法
背景
公司項(xiàng)目有個(gè)需求, 前端上傳excel文件, 后端讀取數(shù)據(jù)、處理數(shù)據(jù)、返回錯(cuò)誤數(shù)據(jù), 最簡(jiǎn)單的方式同步處理, 客戶端上傳文件后一直阻塞等待響應(yīng), 但用戶體驗(yàn)無(wú)疑很差, 處理數(shù)據(jù)可能十分耗時(shí), 沒(méi)人愿意傻等, 由于項(xiàng)目暫未使用ActiveMQ等消息隊(duì)列中間件, 而redis的lpush和rpop很適合作為一種輕量級(jí)的消息隊(duì)列實(shí)現(xiàn), 所以用它完成此次功能開(kāi)發(fā)
一、本文涉及知識(shí)點(diǎn)
- excel文件讀寫(xiě)--阿里easyexcel sdk
- 文件上傳、下載--騰訊云對(duì)象存儲(chǔ)
- 遠(yuǎn)程服務(wù)調(diào)用--restTemplate
- 生產(chǎn)者、消費(fèi)者--redisTemplate leftPush和rightPop操作
- 異步處理數(shù)據(jù)--Executors線程池
- 讀取網(wǎng)絡(luò)文件流--HttpClient
- 自定義注解實(shí)現(xiàn)用戶身份認(rèn)證--JWT token認(rèn)證, 攔截器攔截標(biāo)注有@LoginRequired注解的請(qǐng)求入口
當(dāng)然, Java實(shí)現(xiàn)咯
涉及的知識(shí)點(diǎn)比較多, 每一個(gè)知識(shí)點(diǎn)都可以作為專題進(jìn)行學(xué)習(xí)分析, 本文將完整實(shí)現(xiàn)呈現(xiàn)出來(lái), 后期拆分與小伙伴分享學(xué)習(xí)
二、項(xiàng)目目錄結(jié)構(gòu)

說(shuō)明: 數(shù)據(jù)庫(kù)DAO層放到另一個(gè)模塊了, 不是本文重點(diǎn)
三、主要maven依賴
1、easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>${easyexcel-latestVersion}</version>
</dependency>
JWT
<dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>0.7.0</version> </dependency>
redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.3.5.RELEASE</version> </dependency>
騰訊cos
<dependency> <groupId>com.qcloud</groupId> <artifactId>cos_api</artifactId> <version>5.4.5</version> </dependency>
四、流程
- 用戶上傳文件
- 將文件存儲(chǔ)到騰訊cos
- 將上傳后的文件id及上傳記錄保存到數(shù)據(jù)庫(kù)
- redis生產(chǎn)一條導(dǎo)入消息, 即保存文件id到redis
- 請(qǐng)求結(jié)束, 返回"處理中"狀態(tài)
- redis消費(fèi)消息
- 讀取cos文件, 異步處理數(shù)據(jù)
- 將錯(cuò)誤數(shù)據(jù)以excel形式上傳至cos, 以供用戶下載, 并更新處理狀態(tài)為"處理完成"
- 客戶端輪詢查詢處理狀態(tài), 并可以下載錯(cuò)誤文件
- 結(jié)束
五、實(shí)現(xiàn)效果
上傳文件

數(shù)據(jù)庫(kù)導(dǎo)入記錄

導(dǎo)入的數(shù)據(jù)

下載錯(cuò)誤文件

錯(cuò)誤數(shù)據(jù)提示

查詢導(dǎo)入記錄

六、代碼實(shí)現(xiàn)
1、導(dǎo)入excel控制層
@LoginRequired
@RequestMapping(value = "doImport", method = RequestMethod.POST)
public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
PLUser user = getUser(request);
return orderImportService.doImport(file, user.getId());
}
2、service層
@Override
public JsonResponse doImport(MultipartFile file, Integer userId) {
if (null == file || file.isEmpty()) {
throw new ServiceException("文件不能為空");
}
String filename = file.getOriginalFilename();
if (!checkFileSuffix(filename)) {
throw new ServiceException("當(dāng)前僅支持xlsx格式的excel");
}
// 存儲(chǔ)文件
String fileId = saveToOss(file);
if (StringUtils.isBlank(fileId)) {
throw new ServiceException("文件上傳失敗, 請(qǐng)稍后重試");
}
// 保存記錄到數(shù)據(jù)庫(kù)
saveRecordToDB(userId, fileId, filename);
// 生產(chǎn)一條訂單導(dǎo)入消息
redisProducer.produce(RedisKey.orderImportKey, fileId);
return JsonResponse.ok("導(dǎo)入成功, 處理中...");
}
/**
* 校驗(yàn)文件格式
* @param fileName
* @return
*/
private static boolean checkFileSuffix(String fileName) {
if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
return false;
}
int pointIndex = fileName.lastIndexOf(".");
String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
if (".xlsx".equals(suffix)) {
return true;
}
return false;
}
/**
* 將文件存儲(chǔ)到騰訊OSS
* @param file
* @return
*/
private String saveToOss(MultipartFile file) {
InputStream ins = null;
try {
ins = file.getInputStream();
} catch (IOException e) {
e.printStackTrace();
}
String fileId;
try {
String originalFilename = file.getOriginalFilename();
File f = new File(originalFilename);
inputStreamToFile(ins, f);
FileSystemResource resource = new FileSystemResource(f);
MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("file", resource);
ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
fileId = (String) responseResult.getData();
} catch (Exception e) {
fileId = null;
}
return fileId;
}
3、redis生產(chǎn)者
@Service
public class RedisProducerImpl implements RedisProducer {
@Autowired
private RedisTemplate redisTemplate;
@Override
public JsonResponse produce(String key, String msg) {
Map<String, String> map = Maps.newHashMap();
map.put("fileId", msg);
redisTemplate.opsForList().leftPush(key, map);
return JsonResponse.ok();
}
}
4、redis消費(fèi)者
@Service
public class RedisConsumer {
@Autowired
public RedisTemplate redisTemplate;
@Value("${txOssFileUrl}")
private String txOssFileUrl;
@Value("${txOssUploadUrl}")
private String txOssUploadUrl;
@PostConstruct
public void init() {
processOrderImport();
}
/**
* 處理訂單導(dǎo)入
*/
private void processOrderImport() {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
while (true) {
Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
if (null == object) {
continue;
}
String msg = JSON.toJSONString(object);
executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
}
});
}
}
5、處理任務(wù)線程類
public class OrderImportTask implements Runnable {
public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
this.msg = msg;
this.txOssFileUrl = txOssFileUrl;
this.txOssUploadUrl = txOssUploadUrl;
}
}
/**
* 注入bean
*/
private void autowireBean() {
this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
}
@Override
public void run() {
// 注入bean
autowireBean();
JSONObject jsonObject = JSON.parseObject(msg);
String fileId = jsonObject.getString("fileId");
MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("id", fileId);
ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
String fileUrl = (String) responseResult.getData();
if (StringUtils.isBlank(fileUrl)) {
return;
}
InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
List<Object> list = ExcelUtil.read(inputStream);
process(list, fileId);
}
/**
* 將文件上傳至oss
* @param file
* @return
*/
private String saveToOss(File file) {
String fileId;
try {
FileSystemResource resource = new FileSystemResource(file);
MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("file", resource);
ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
fileId = (String) responseResult.getData();
} catch (Exception e) {
fileId = null;
}
return fileId;
}
說(shuō)明: 處理數(shù)據(jù)的業(yè)務(wù)邏輯代碼就不用貼了
6、上傳文件到cos
@RequestMapping("/txOssUpload")
@ResponseBody
public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
if (null == file || file.isEmpty()) {
return ResponseResult.fail("文件不能為空");
}
String originalFilename = file.getOriginalFilename();
originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問(wèn)題
String contentType = getContentType(originalFilename);
String key;
InputStream ins = null;
File f = null;
try {
ins = file.getInputStream();
f = new File(originalFilename);
inputStreamToFile(ins, f);
key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
} catch (Exception e) {
return ResponseResult.fail(e.getMessage());
} finally {
if (null != ins) {
try {
ins.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (f.exists()) {// 刪除臨時(shí)文件
f.delete();
}
}
return ResponseResult.ok(key);
}
public static void inputStreamToFile(InputStream ins,File file) {
try {
OutputStream os = new FileOutputStream(file);
int bytesRead = 0;
byte[] buffer = new byte[8192];
while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
os.write(buffer, 0, bytesRead);
}
os.close();
ins.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
key = Uuid.getUuid() + "-" + key;
OSSUtil.txOssUpload(inputStream, key, contentType);
try {
if (null != inputStream) {
inputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return key;
}
public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
ObjectMetadata objectMetadata = new ObjectMetadata();
try{
int length = inputStream.available();
objectMetadata.setContentLength(length);
}catch (Exception e){
logger.info(e.getMessage());
}
objectMetadata.setContentType(contentType);
cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
}
7、下載文件
/**
* 騰訊云文件下載
* @param response
* @param id
* @return
*/
@RequestMapping("/txOssDownload")
public Object txOssDownload(HttpServletResponse response, String id) {
COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
String contentType = getContentType(id);
FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
return null;
}
public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
FileOutputStream fos = null;
response.reset();
OutputStream os = null;
try {
response.setContentType(contentType + "; charset=utf-8");
if(!contentType.equals(PlConstans.FileContentType.image)){
try {
response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
} catch (UnsupportedEncodingException e) {
response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
logger.error("encoding file name failed", e);
}
}
os = response.getOutputStream();
byte[] b = new byte[1024 * 1024];
int len;
while ((len = fileStream.read(b)) > 0) {
os.write(b, 0, len);
os.flush();
try {
if(fos != null) {
fos.write(b, 0, len);
fos.flush();
}
} catch (Exception e) {
logger.error(e.getMessage());
}
}
} catch (IOException e) {
IOUtils.closeQuietly(fos);
fos = null;
} finally {
IOUtils.closeQuietly(os);
IOUtils.closeQuietly(fileStream);
if(fos != null) {
IOUtils.closeQuietly(fos);
}
}
}
8、讀取網(wǎng)絡(luò)文件流
/**
* 讀取網(wǎng)絡(luò)文件流
* @param url
* @return
*/
public static InputStream readFileFromURL(String url) {
if (StringUtils.isBlank(url)) {
return null;
}
HttpClient httpClient = new DefaultHttpClient();
HttpGet methodGet = new HttpGet(url);
try {
HttpResponse response = httpClient.execute(methodGet);
if (response.getStatusLine().getStatusCode() == 200) {
HttpEntity entity = response.getEntity();
return entity.getContent();
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
9、ExcelUtil
/**
* 讀excel
* @param inputStream 文件輸入流
* @return list集合
*/
public static List<Object> read(InputStream inputStream) {
return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
}
/**
* 寫(xiě)excel
* @param data list數(shù)據(jù)
* @param clazz
* @param saveFilePath 文件保存路徑
* @throws IOException
*/
public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
File tempFile = new File(saveFilePath);
OutputStream out = new FileOutputStream(tempFile);
ExcelWriter writer = EasyExcelFactory.getWriter(out);
Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
writer.write(data, sheet);
writer.finish();
out.close();
}
說(shuō)明: 至此, 整個(gè)流程算是完整了, 下面將其他知識(shí)點(diǎn)代碼也貼出來(lái)參考
七、其他
1、@LoginRequired注解
/**
* 在需要登錄驗(yàn)證的Controller的方法上使用此注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}
2、MyControllerAdvice
@ControllerAdvice
public class MyControllerAdvice {
@ResponseBody
@ExceptionHandler(TokenValidationException.class)
public JsonResponse tokenValidationExceptionHandler() {
return JsonResponse.loginInvalid();
}
@ResponseBody
@ExceptionHandler(ServiceException.class)
public JsonResponse serviceExceptionHandler(ServiceException se) {
return JsonResponse.fail(se.getMsg());
}
@ResponseBody
@ExceptionHandler(Exception.class)
public JsonResponse exceptionHandler(Exception e) {
e.printStackTrace();
return JsonResponse.fail(e.getMessage());
}
}
3、AuthenticationInterceptor
public class AuthenticationInterceptor implements HandlerInterceptor {
private static final String CURRENT_USER = "user";
@Autowired
private UserService userService;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
// 如果不是映射到方法直接通過(guò)
if (!(handler instanceof HandlerMethod)) {
return true;
}
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
// 判斷接口是否有@LoginRequired注解, 有則需要登錄
LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
if (methodAnnotation != null) {
// 驗(yàn)證token
Integer userId = JwtUtil.verifyToken(request);
PLUser plUser = userService.selectByPrimaryKey(userId);
if (null == plUser) {
throw new RuntimeException("用戶不存在,請(qǐng)重新登錄");
}
request.setAttribute(CURRENT_USER, plUser);
return true;
}
return true;
}
@Override
public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
}
}
4、JwtUtil
public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
public static final String SECRET = "pl_token_secret";
public static final String HEADER = "token";
public static final String USER_ID = "userId";
/**
* 根據(jù)userId生成token
* @param userId
* @return
*/
public static String generateToken(String userId) {
HashMap<String, Object> map = new HashMap<>();
map.put(USER_ID, userId);
String jwt = Jwts.builder()
.setClaims(map)
.setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
.signWith(SignatureAlgorithm.HS512, SECRET)
.compact();
return jwt;
}
/**
* 驗(yàn)證token
* @param request
* @return 驗(yàn)證通過(guò)返回userId
*/
public static Integer verifyToken(HttpServletRequest request) {
String token = request.getHeader(HEADER);
if (token != null) {
try {
Map<String, Object> body = Jwts.parser()
.setSigningKey(SECRET)
.parseClaimsJws(token)
.getBody();
for (Map.Entry entry : body.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
if (key.toString().equals(USER_ID)) {
return Integer.valueOf(value.toString());// userId
}
}
return null;
} catch (Exception e) {
logger.error(e.getMessage());
throw new TokenValidationException("unauthorized");
}
} else {
throw new TokenValidationException("missing token");
}
}
結(jié)語(yǔ): OK, 搞定,睡了, 好困
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。
- SpringBoot利用redis集成消息隊(duì)列的方法
- Spring boot集成Kafka消息中間件代碼實(shí)例
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- spring boot整合kafka過(guò)程解析
- SpringBoot Kafka 整合使用及安裝教程
- 在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
- spring boot 與kafka集成的示例代碼
- Spring Boot集成Kafka的示例代碼
- Spring boot 整合KAFKA消息隊(duì)列的示例
相關(guān)文章
Java實(shí)現(xiàn)簡(jiǎn)單的酒店管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-07-07
SpringBoot?實(shí)現(xiàn)全局異常處理的示例代碼
本文主要介紹了SpringBoot實(shí)現(xiàn)全局異常處理,全局異常處理器的使用可以顯著提高Spring Boot項(xiàng)目的代碼質(zhì)量和可維護(hù)性,減少冗余代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-06-06
java 實(shí)現(xiàn)隨機(jī)數(shù)組輸出及求和實(shí)例詳解
這篇文章主要介紹了java 實(shí)現(xiàn)隨機(jī)數(shù)組輸出及求和實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2016-11-11
SpringBoot @Autowired注解注入規(guī)則介紹
這篇文章主要介紹了SpringBoot @Autowired注解注入規(guī)則介紹,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。2021-11-11
關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法
這篇文章主要介紹了關(guān)于Java使用Http輕量級(jí)請(qǐng)求庫(kù)Unirest的方法,Unirest 是一個(gè)輕量級(jí)的 HTTP 請(qǐng)求庫(kù),可發(fā)起 GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS 請(qǐng)求,支持 Node、Ruby、Java、PHP、Python、Objective-C、.NET 等多種語(yǔ)言,需要的朋友可以參考下2023-08-08
Java實(shí)現(xiàn)Linux下雙守護(hù)進(jìn)程
這篇文章主要介紹了Java實(shí)現(xiàn)Linux下雙守護(hù)進(jìn)程的思路、原理以及具體實(shí)現(xiàn)方式,非常的詳細(xì),希望對(duì)大家有所幫助2014-10-10
java 數(shù)據(jù)結(jié)構(gòu)之棧與隊(duì)列
這篇文章主要介紹了java 數(shù)據(jù)結(jié)構(gòu)之棧與隊(duì)列的相關(guān)資料,這里對(duì)java中的棧和隊(duì)列都做出實(shí)現(xiàn)實(shí)例來(lái)幫助大家理解學(xué)習(xí)數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2017-07-07

