Spring boot項目redisTemplate實現(xiàn)輕量級消息隊列的方法
背景
公司項目有個需求, 前端上傳excel文件, 后端讀取數(shù)據(jù)、處理數(shù)據(jù)、返回錯誤數(shù)據(jù), 最簡單的方式同步處理, 客戶端上傳文件后一直阻塞等待響應(yīng), 但用戶體驗無疑很差, 處理數(shù)據(jù)可能十分耗時, 沒人愿意傻等, 由于項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop很適合作為一種輕量級的消息隊列實現(xiàn), 所以用它完成此次功能開發(fā)
一、本文涉及知識點
- excel文件讀寫--阿里easyexcel sdk
- 文件上傳、下載--騰訊云對象存儲
- 遠程服務(wù)調(diào)用--restTemplate
- 生產(chǎn)者、消費者--redisTemplate leftPush和rightPop操作
- 異步處理數(shù)據(jù)--Executors線程池
- 讀取網(wǎng)絡(luò)文件流--HttpClient
- 自定義注解實現(xiàn)用戶身份認證--JWT token認證, 攔截器攔截標注有@LoginRequired注解的請求入口
當然, Java實現(xiàn)咯
涉及的知識點比較多, 每一個知識點都可以作為專題進行學(xué)習(xí)分析, 本文將完整實現(xiàn)呈現(xiàn)出來, 后期拆分與小伙伴分享學(xué)習(xí)
二、項目目錄結(jié)構(gòu)
說明: 數(shù)據(jù)庫DAO層放到另一個模塊了, 不是本文重點
三、主要maven依賴
1、easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>${easyexcel-latestVersion}</version> </dependency>
JWT
<dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>0.7.0</version> </dependency>
redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.3.5.RELEASE</version> </dependency>
騰訊cos
<dependency> <groupId>com.qcloud</groupId> <artifactId>cos_api</artifactId> <version>5.4.5</version> </dependency>
四、流程
- 用戶上傳文件
- 將文件存儲到騰訊cos
- 將上傳后的文件id及上傳記錄保存到數(shù)據(jù)庫
- redis生產(chǎn)一條導(dǎo)入消息, 即保存文件id到redis
- 請求結(jié)束, 返回"處理中"狀態(tài)
- redis消費消息
- 讀取cos文件, 異步處理數(shù)據(jù)
- 將錯誤數(shù)據(jù)以excel形式上傳至cos, 以供用戶下載, 并更新處理狀態(tài)為"處理完成"
- 客戶端輪詢查詢處理狀態(tài), 并可以下載錯誤文件
- 結(jié)束
五、實現(xiàn)效果
上傳文件
數(shù)據(jù)庫導(dǎo)入記錄
導(dǎo)入的數(shù)據(jù)
下載錯誤文件
錯誤數(shù)據(jù)提示
查詢導(dǎo)入記錄
六、代碼實現(xiàn)
1、導(dǎo)入excel控制層
@LoginRequired @RequestMapping(value = "doImport", method = RequestMethod.POST) public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) { PLUser user = getUser(request); return orderImportService.doImport(file, user.getId()); }
2、service層
@Override public JsonResponse doImport(MultipartFile file, Integer userId) { if (null == file || file.isEmpty()) { throw new ServiceException("文件不能為空"); } String filename = file.getOriginalFilename(); if (!checkFileSuffix(filename)) { throw new ServiceException("當前僅支持xlsx格式的excel"); } // 存儲文件 String fileId = saveToOss(file); if (StringUtils.isBlank(fileId)) { throw new ServiceException("文件上傳失敗, 請稍后重試"); } // 保存記錄到數(shù)據(jù)庫 saveRecordToDB(userId, fileId, filename); // 生產(chǎn)一條訂單導(dǎo)入消息 redisProducer.produce(RedisKey.orderImportKey, fileId); return JsonResponse.ok("導(dǎo)入成功, 處理中..."); } /** * 校驗文件格式 * @param fileName * @return */ private static boolean checkFileSuffix(String fileName) { if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) { return false; } int pointIndex = fileName.lastIndexOf("."); String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase(); if (".xlsx".equals(suffix)) { return true; } return false; } /** * 將文件存儲到騰訊OSS * @param file * @return */ private String saveToOss(MultipartFile file) { InputStream ins = null; try { ins = file.getInputStream(); } catch (IOException e) { e.printStackTrace(); } String fileId; try { String originalFilename = file.getOriginalFilename(); File f = new File(originalFilename); inputStreamToFile(ins, f); FileSystemResource resource = new FileSystemResource(f); MultiValueMap<String, Object> param = new LinkedMultiValueMap<>(); param.add("file", resource); ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class); fileId = (String) responseResult.getData(); } catch (Exception e) { fileId = null; } return fileId; }
3、redis生產(chǎn)者
@Service public class RedisProducerImpl implements RedisProducer { @Autowired private RedisTemplate redisTemplate; @Override public JsonResponse produce(String key, String msg) { Map<String, String> map = Maps.newHashMap(); map.put("fileId", msg); redisTemplate.opsForList().leftPush(key, map); return JsonResponse.ok(); } }
4、redis消費者
@Service public class RedisConsumer { @Autowired public RedisTemplate redisTemplate; @Value("${txOssFileUrl}") private String txOssFileUrl; @Value("${txOssUploadUrl}") private String txOssUploadUrl; @PostConstruct public void init() { processOrderImport(); } /** * 處理訂單導(dǎo)入 */ private void processOrderImport() { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> { while (true) { Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS); if (null == object) { continue; } String msg = JSON.toJSONString(object); executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl)); } }); } }
5、處理任務(wù)線程類
public class OrderImportTask implements Runnable { public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) { this.msg = msg; this.txOssFileUrl = txOssFileUrl; this.txOssUploadUrl = txOssUploadUrl; } } /** * 注入bean */ private void autowireBean() { this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class); this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class); this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class); } @Override public void run() { // 注入bean autowireBean(); JSONObject jsonObject = JSON.parseObject(msg); String fileId = jsonObject.getString("fileId"); MultiValueMap<String, Object> param = new LinkedMultiValueMap<>(); param.add("id", fileId); ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class); String fileUrl = (String) responseResult.getData(); if (StringUtils.isBlank(fileUrl)) { return; } InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl); List<Object> list = ExcelUtil.read(inputStream); process(list, fileId); } /** * 將文件上傳至oss * @param file * @return */ private String saveToOss(File file) { String fileId; try { FileSystemResource resource = new FileSystemResource(file); MultiValueMap<String, Object> param = new LinkedMultiValueMap<>(); param.add("file", resource); ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class); fileId = (String) responseResult.getData(); } catch (Exception e) { fileId = null; } return fileId; }
說明: 處理數(shù)據(jù)的業(yè)務(wù)邏輯代碼就不用貼了
6、上傳文件到cos
@RequestMapping("/txOssUpload") @ResponseBody public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException { if (null == file || file.isEmpty()) { return ResponseResult.fail("文件不能為空"); } String originalFilename = file.getOriginalFilename(); originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題 String contentType = getContentType(originalFilename); String key; InputStream ins = null; File f = null; try { ins = file.getInputStream(); f = new File(originalFilename); inputStreamToFile(ins, f); key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType); } catch (Exception e) { return ResponseResult.fail(e.getMessage()); } finally { if (null != ins) { try { ins.close(); } catch (IOException e) { e.printStackTrace(); } } if (f.exists()) {// 刪除臨時文件 f.delete(); } } return ResponseResult.ok(key); } public static void inputStreamToFile(InputStream ins,File file) { try { OutputStream os = new FileOutputStream(file); int bytesRead = 0; byte[] buffer = new byte[8192]; while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) { os.write(buffer, 0, bytesRead); } os.close(); ins.close(); } catch (Exception e) { e.printStackTrace(); } } public String txOssUpload(FileInputStream inputStream, String key, String contentType) { key = Uuid.getUuid() + "-" + key; OSSUtil.txOssUpload(inputStream, key, contentType); try { if (null != inputStream) { inputStream.close(); } } catch (IOException e) { e.printStackTrace(); } return key; } public static void txOssUpload(FileInputStream inputStream, String key, String contentType) { ObjectMetadata objectMetadata = new ObjectMetadata(); try{ int length = inputStream.available(); objectMetadata.setContentLength(length); }catch (Exception e){ logger.info(e.getMessage()); } objectMetadata.setContentType(contentType); cosclient.putObject(txbucketName, key, inputStream, objectMetadata); }
7、下載文件
/** * 騰訊云文件下載 * @param response * @param id * @return */ @RequestMapping("/txOssDownload") public Object txOssDownload(HttpServletResponse response, String id) { COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response); String contentType = getContentType(id); FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id); return null; } public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) { FileOutputStream fos = null; response.reset(); OutputStream os = null; try { response.setContentType(contentType + "; charset=utf-8"); if(!contentType.equals(PlConstans.FileContentType.image)){ try { response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1")); } catch (UnsupportedEncodingException e) { response.setHeader("Content-Disposition", "attachment; filename=" + fileName); logger.error("encoding file name failed", e); } } os = response.getOutputStream(); byte[] b = new byte[1024 * 1024]; int len; while ((len = fileStream.read(b)) > 0) { os.write(b, 0, len); os.flush(); try { if(fos != null) { fos.write(b, 0, len); fos.flush(); } } catch (Exception e) { logger.error(e.getMessage()); } } } catch (IOException e) { IOUtils.closeQuietly(fos); fos = null; } finally { IOUtils.closeQuietly(os); IOUtils.closeQuietly(fileStream); if(fos != null) { IOUtils.closeQuietly(fos); } } }
8、讀取網(wǎng)絡(luò)文件流
/** * 讀取網(wǎng)絡(luò)文件流 * @param url * @return */ public static InputStream readFileFromURL(String url) { if (StringUtils.isBlank(url)) { return null; } HttpClient httpClient = new DefaultHttpClient(); HttpGet methodGet = new HttpGet(url); try { HttpResponse response = httpClient.execute(methodGet); if (response.getStatusLine().getStatusCode() == 200) { HttpEntity entity = response.getEntity(); return entity.getContent(); } } catch (Exception e) { e.printStackTrace(); } return null; }
9、ExcelUtil
/** * 讀excel * @param inputStream 文件輸入流 * @return list集合 */ public static List<Object> read(InputStream inputStream) { return EasyExcelFactory.read(inputStream, new Sheet(1, 1)); } /** * 寫excel * @param data list數(shù)據(jù) * @param clazz * @param saveFilePath 文件保存路徑 * @throws IOException */ public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException { File tempFile = new File(saveFilePath); OutputStream out = new FileOutputStream(tempFile); ExcelWriter writer = EasyExcelFactory.getWriter(out); Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null); writer.write(data, sheet); writer.finish(); out.close(); }
說明: 至此, 整個流程算是完整了, 下面將其他知識點代碼也貼出來參考
七、其他
1、@LoginRequired注解
/** * 在需要登錄驗證的Controller的方法上使用此注解 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface LoginRequired { }
2、MyControllerAdvice
@ControllerAdvice public class MyControllerAdvice { @ResponseBody @ExceptionHandler(TokenValidationException.class) public JsonResponse tokenValidationExceptionHandler() { return JsonResponse.loginInvalid(); } @ResponseBody @ExceptionHandler(ServiceException.class) public JsonResponse serviceExceptionHandler(ServiceException se) { return JsonResponse.fail(se.getMsg()); } @ResponseBody @ExceptionHandler(Exception.class) public JsonResponse exceptionHandler(Exception e) { e.printStackTrace(); return JsonResponse.fail(e.getMessage()); } }
3、AuthenticationInterceptor
public class AuthenticationInterceptor implements HandlerInterceptor { private static final String CURRENT_USER = "user"; @Autowired private UserService userService; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { // 如果不是映射到方法直接通過 if (!(handler instanceof HandlerMethod)) { return true; } HandlerMethod handlerMethod = (HandlerMethod) handler; Method method = handlerMethod.getMethod(); // 判斷接口是否有@LoginRequired注解, 有則需要登錄 LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class); if (methodAnnotation != null) { // 驗證token Integer userId = JwtUtil.verifyToken(request); PLUser plUser = userService.selectByPrimaryKey(userId); if (null == plUser) { throw new RuntimeException("用戶不存在,請重新登錄"); } request.setAttribute(CURRENT_USER, plUser); return true; } return true; } @Override public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception { } }
4、JwtUtil
public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天 public static final String SECRET = "pl_token_secret"; public static final String HEADER = "token"; public static final String USER_ID = "userId"; /** * 根據(jù)userId生成token * @param userId * @return */ public static String generateToken(String userId) { HashMap<String, Object> map = new HashMap<>(); map.put(USER_ID, userId); String jwt = Jwts.builder() .setClaims(map) .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME)) .signWith(SignatureAlgorithm.HS512, SECRET) .compact(); return jwt; } /** * 驗證token * @param request * @return 驗證通過返回userId */ public static Integer verifyToken(HttpServletRequest request) { String token = request.getHeader(HEADER); if (token != null) { try { Map<String, Object> body = Jwts.parser() .setSigningKey(SECRET) .parseClaimsJws(token) .getBody(); for (Map.Entry entry : body.entrySet()) { Object key = entry.getKey(); Object value = entry.getValue(); if (key.toString().equals(USER_ID)) { return Integer.valueOf(value.toString());// userId } } return null; } catch (Exception e) { logger.error(e.getMessage()); throw new TokenValidationException("unauthorized"); } } else { throw new TokenValidationException("missing token"); } }
結(jié)語: OK, 搞定,睡了, 好困
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。
- SpringBoot利用redis集成消息隊列的方法
- Spring boot集成Kafka消息中間件代碼實例
- Spring Boot集群管理工具KafkaAdminClient使用方法解析
- spring boot整合kafka過程解析
- SpringBoot Kafka 整合使用及安裝教程
- 在Spring Boot應(yīng)用程序中使用Apache Kafka的方法步驟詳解
- Springboot集成Kafka實現(xiàn)producer和consumer的示例代碼
- spring boot 與kafka集成的示例代碼
- Spring Boot集成Kafka的示例代碼
- Spring boot 整合KAFKA消息隊列的示例
相關(guān)文章
SpringBoot?實現(xiàn)全局異常處理的示例代碼
本文主要介紹了SpringBoot實現(xiàn)全局異常處理,全局異常處理器的使用可以顯著提高Spring Boot項目的代碼質(zhì)量和可維護性,減少冗余代碼,具有一定的參考價值,感興趣的可以了解一下2024-06-06java 實現(xiàn)隨機數(shù)組輸出及求和實例詳解
這篇文章主要介紹了java 實現(xiàn)隨機數(shù)組輸出及求和實例詳解的相關(guān)資料,需要的朋友可以參考下2016-11-11SpringBoot @Autowired注解注入規(guī)則介紹
這篇文章主要介紹了SpringBoot @Autowired注解注入規(guī)則介紹,具有很好的參考價值,希望對大家有所幫助。2021-11-11關(guān)于Java使用Http輕量級請求庫Unirest的方法
這篇文章主要介紹了關(guān)于Java使用Http輕量級請求庫Unirest的方法,Unirest 是一個輕量級的 HTTP 請求庫,可發(fā)起 GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS 請求,支持 Node、Ruby、Java、PHP、Python、Objective-C、.NET 等多種語言,需要的朋友可以參考下2023-08-08java 數(shù)據(jù)結(jié)構(gòu)之棧與隊列
這篇文章主要介紹了java 數(shù)據(jù)結(jié)構(gòu)之棧與隊列的相關(guān)資料,這里對java中的棧和隊列都做出實現(xiàn)實例來幫助大家理解學(xué)習(xí)數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2017-07-07