BlockingQueue隊(duì)列處理高并發(fā)下的日志
前言
當(dāng)系統(tǒng)流量負(fù)載比較高時(shí),業(yè)務(wù)日志的寫入操作也要納入系統(tǒng)性能考量之內(nèi),如若處理不當(dāng),將影響系統(tǒng)的正常業(yè)務(wù)操作,之前寫過一篇《spring boot通過MQ消費(fèi)log4j2的日志》的博文,采用了RabbitMQ消息中間件來存儲(chǔ)抗高并發(fā)下的日志,因?yàn)橐肓酥虚g件,操作使用起來可能沒那么簡便,今天分享使用多線程消費(fèi)阻塞隊(duì)列的方式來處理我們的海量日志
what阻塞隊(duì)列?
阻塞隊(duì)列(BlockingQueue)是區(qū)別于普通隊(duì)列多了兩個(gè)附加操作的線程安全的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?。?dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
1.聲明存儲(chǔ)固定消息的隊(duì)列
/** * Created by kl on 2017/3/20. * Content :銷售操作日志隊(duì)列 */ public class SalesLogQueue{ //隊(duì)列大小 public static final int QUEUE_MAX_SIZE = 1000; private static SalesLogQueue alarmMessageQueue = new SalesLogQueue(); //阻塞隊(duì)列 private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); private SalesLogQueue(){} public static SalesLogQueue getInstance() { return alarmMessageQueue; } /** * 消息入隊(duì) * @param salesLog * @return */ public boolean push(SalesLog salesLog) { return this.blockingQueue.add(salesLog);//隊(duì)列滿了就拋出異常,不阻塞 } /** * 消息出隊(duì) * @return */ public SalesLog poll() { SalesLog result = null; try { result = this.blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return result; } /** * 獲取隊(duì)列大小 * @return */ public int size() { return this.blockingQueue.size(); } }
ps:因?yàn)闃I(yè)務(wù)原因,采用add的方式入隊(duì),隊(duì)列滿了就拋異常,不阻塞
2.消息入隊(duì)
消息入隊(duì)可以在任何需要保存日志的地方操作,如aop統(tǒng)一攔截日志處理,filter過濾請(qǐng)求日志處理,或者耦合的業(yè)務(wù)日志,記住,不阻塞入隊(duì)操作,不然將影響正常的業(yè)務(wù)操作,如下為filter統(tǒng)一處理請(qǐng)求日志:
/** * Created by kl on 2017/3/20. * Content :訪問請(qǐng)求攔截,保存操作日志 */ public class SalesLogFilter implements Filter { private RoleResourceService resourceService; @Override public void init(FilterConfig filterConfig) throws ServletException { ServletContext context = filterConfig.getServletContext(); ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context); resourceService = ctx.getBean(RoleResourceService.class); } @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { try { HttpServletRequest request = (HttpServletRequest) servletRequest; String requestUrl = request.getRequestURI(); String requestType=request.getMethod(); String ipAddress = HttpClientUtil.getIpAddr(request); Map resource=resourceService.getResource(); String context=resource.get(requestUrl); //動(dòng)態(tài)url正則匹配 if(StringUtil.isNull(context)){ for(Map.Entry entry:resource.entrySet()){ String resourceUrl= entry.getKey(); if(requestUrl.matches(resourceUrl)){ context=entry.getValue(); break; } } } SalesLog log=new SalesLog(); log.setCreateDate(new Timestamp(System.currentTimeMillis())); log.setContext(context); log.setOperateUser(UserTokenUtil.currentUser.get().get("realname")); log.setRequestIp(ipAddress); log.setRequestUrl(requestUrl); log.setRequestType(requestType); SalesLogQueue.getInstance().push(log); }catch (Exception e){ e.printStackTrace(); } filterChain.doFilter(servletRequest, servletResponse); } @Override public void destroy() { } }
3.消息出隊(duì)被消費(fèi)
BlockingQueue是線程安全的,所以可以放心的在多個(gè)線程中去處理隊(duì)列中的消息,如下代碼聲明了一個(gè)兩個(gè)大小的固定線程池,并添加了兩個(gè)線程去處理隊(duì)列中的消息
/** * Created by kl on 2017/3/20. * Content :啟動(dòng)消費(fèi)操作日志隊(duì)列的線程 */ @Component public class ConsumeSalesLogQueue { @Autowired SalesLogService salesLogService; @PostConstruct public void startrtThread() { ExecutorService e = Executors.newFixedThreadPool(2);//兩個(gè)大小的固定線程池 e.submit(new PollSalesLog(salesLogService)); e.submit(new PollSalesLog(salesLogService)); } class PollSalesLog implements Runnable { SalesLogService salesLogService; public PollSalesLog(SalesLogService salesLogService) { this.salesLogService = salesLogService; } @Override public void run() { while (true) { try { SalesLog salesLog = SalesLogQueue.getInstance().poll(); if(salesLog!=null){ salesLogService.saveSalesLog(salesLog); } } catch (Exception e) { e.printStackTrace(); } } } } }
參考博文如下,對(duì)BlockingQueue隊(duì)列更多了解,可讀一讀如下的博文:
詳細(xì)分析Java并發(fā)集合ArrayBlockingQueue的用法
詳解Java阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn)原理
以上就是BlockingQueue隊(duì)列處理高并發(fā)下的日志的詳細(xì)內(nèi)容,更多關(guān)于BlockingQueue隊(duì)列處理高并發(fā)日志的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實(shí)現(xiàn)多線程的兩種方式繼承Thread類和實(shí)現(xiàn)Runnable接口的方法
下面小編就為大家?guī)硪黄猨ava實(shí)現(xiàn)多線程的兩種方式繼承Thread類和實(shí)現(xiàn)Runnable接口的方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-09-09java 實(shí)現(xiàn)增量同步和自定義同步的操作
這篇文章主要介紹了java 實(shí)現(xiàn)增量同步和自定義同步的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-01-01深入淺析Spring 的aop實(shí)現(xiàn)原理
AOP(Aspect-OrientedProgramming,面向方面編程),可以說是OOP(Object-Oriented Programing,面向?qū)ο缶幊蹋┑难a(bǔ)充和完善。本文給大家介紹Spring 的aop實(shí)現(xiàn)原理,感興趣的朋友一起學(xué)習(xí)吧2016-03-03Java基礎(chǔ)學(xué)習(xí)之實(shí)參和形參
這篇文章主要介紹了Java基礎(chǔ)學(xué)習(xí)之實(shí)參形參,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有一定的幫助,需要的朋友可以參考下2021-05-05Java分布式學(xué)習(xí)之Kafka消息隊(duì)列
Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái),由Scala和Java編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)2022-07-07Java線程讓步_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
yield()的作用是讓步。它能讓當(dāng)前線程由“運(yùn)行狀態(tài)”進(jìn)入到“就緒狀態(tài)”,從而讓其它具有相同優(yōu)先級(jí)的等待線程獲取執(zhí)行權(quán)。下面通過本文給大家介紹Java線程讓步的相關(guān)知識(shí),需要的朋友參考下吧2017-05-05Java之使用POI教你玩轉(zhuǎn)Excel導(dǎo)入與導(dǎo)出
這篇文章主要介紹了Java之使用POI教你玩轉(zhuǎn)Excel導(dǎo)入與導(dǎo)出,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10