BlockingQueue隊列處理高并發(fā)下的日志

前言
當系統(tǒng)流量負載比較高時,業(yè)務日志的寫入操作也要納入系統(tǒng)性能考量之內(nèi),如若處理不當,將影響系統(tǒng)的正常業(yè)務操作,之前寫過一篇《spring boot通過MQ消費log4j2的日志》的博文,采用了RabbitMQ消息中間件來存儲抗高并發(fā)下的日志,因為引入了中間件,操作使用起來可能沒那么簡便,今天分享使用多線程消費阻塞隊列的方式來處理我們的海量日志
what阻塞隊列?
阻塞隊列(BlockingQueue)是區(qū)別于普通隊列多了兩個附加操作的線程安全的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。
1.聲明存儲固定消息的隊列
/**
* Created by kl on 2017/3/20.
* Content :銷售操作日志隊列
*/
public class SalesLogQueue{
//隊列大小
public static final int QUEUE_MAX_SIZE = 1000;
private static SalesLogQueue alarmMessageQueue = new SalesLogQueue();
//阻塞隊列
private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
private SalesLogQueue(){}
public static SalesLogQueue getInstance() {
return alarmMessageQueue;
}
/**
* 消息入隊
* @param salesLog
* @return
*/
public boolean push(SalesLog salesLog) {
return this.blockingQueue.add(salesLog);//隊列滿了就拋出異常,不阻塞
}
/**
* 消息出隊
* @return
*/
public SalesLog poll() {
SalesLog result = null;
try {
result = this.blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
/**
* 獲取隊列大小
* @return
*/
public int size() {
return this.blockingQueue.size();
}
}ps:因為業(yè)務原因,采用add的方式入隊,隊列滿了就拋異常,不阻塞
2.消息入隊
消息入隊可以在任何需要保存日志的地方操作,如aop統(tǒng)一攔截日志處理,filter過濾請求日志處理,或者耦合的業(yè)務日志,記住,不阻塞入隊操作,不然將影響正常的業(yè)務操作,如下為filter統(tǒng)一處理請求日志:
/**
* Created by kl on 2017/3/20.
* Content :訪問請求攔截,保存操作日志
*/
public class SalesLogFilter implements Filter {
private RoleResourceService resourceService;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
ServletContext context = filterConfig.getServletContext();
ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context);
resourceService = ctx.getBean(RoleResourceService.class);
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
try {
HttpServletRequest request = (HttpServletRequest) servletRequest;
String requestUrl = request.getRequestURI();
String requestType=request.getMethod();
String ipAddress = HttpClientUtil.getIpAddr(request);
Map resource=resourceService.getResource();
String context=resource.get(requestUrl);
//動態(tài)url正則匹配
if(StringUtil.isNull(context)){
for(Map.Entry entry:resource.entrySet()){
String resourceUrl= entry.getKey();
if(requestUrl.matches(resourceUrl)){
context=entry.getValue();
break;
}
}
}
SalesLog log=new SalesLog();
log.setCreateDate(new Timestamp(System.currentTimeMillis()));
log.setContext(context);
log.setOperateUser(UserTokenUtil.currentUser.get().get("realname"));
log.setRequestIp(ipAddress);
log.setRequestUrl(requestUrl);
log.setRequestType(requestType);
SalesLogQueue.getInstance().push(log);
}catch (Exception e){
e.printStackTrace();
}
filterChain.doFilter(servletRequest, servletResponse);
}
@Override
public void destroy() {
}
}3.消息出隊被消費
BlockingQueue是線程安全的,所以可以放心的在多個線程中去處理隊列中的消息,如下代碼聲明了一個兩個大小的固定線程池,并添加了兩個線程去處理隊列中的消息
/**
* Created by kl on 2017/3/20.
* Content :啟動消費操作日志隊列的線程
*/
@Component
public class ConsumeSalesLogQueue {
@Autowired
SalesLogService salesLogService;
@PostConstruct
public void startrtThread() {
ExecutorService e = Executors.newFixedThreadPool(2);//兩個大小的固定線程池
e.submit(new PollSalesLog(salesLogService));
e.submit(new PollSalesLog(salesLogService));
}
class PollSalesLog implements Runnable {
SalesLogService salesLogService;
public PollSalesLog(SalesLogService salesLogService) {
this.salesLogService = salesLogService;
}
@Override
public void run() {
while (true) {
try {
SalesLog salesLog = SalesLogQueue.getInstance().poll();
if(salesLog!=null){
salesLogService.saveSalesLog(salesLog);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}參考博文如下,對BlockingQueue隊列更多了解,可讀一讀如下的博文:
詳細分析Java并發(fā)集合ArrayBlockingQueue的用法
詳解Java阻塞隊列(BlockingQueue)的實現(xiàn)原理
以上就是BlockingQueue隊列處理高并發(fā)下的日志的詳細內(nèi)容,更多關于BlockingQueue隊列處理高并發(fā)日志的資料請關注腳本之家其它相關文章!
相關文章
java實現(xiàn)多線程的兩種方式繼承Thread類和實現(xiàn)Runnable接口的方法
下面小編就為大家?guī)硪黄猨ava實現(xiàn)多線程的兩種方式繼承Thread類和實現(xiàn)Runnable接口的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-09-09

