java結(jié)合WebSphere MQ實現(xiàn)接收隊列文件功能
首先我們先來簡單介紹下websphere mq以及安裝使用簡介
websphere mq : 用于傳輸信息 具有跨平臺的功能。
1 安裝websphere mq 并啟動
2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM)
3 建立queue 類型選擇 Local類型 的 (如lq )
4 建立channels 類型選擇Server Connection (如BridgeChannel)
接下來,我們來看實例代碼:
MQFileReceiver.java package com.mq.dpca.file; import java.io.File; import java.io.FileOutputStream; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.MQConstants; import com.mq.dpca.msg.MQConfig; import com.mq.dpca.util.ReadCmdLine; import com.mq.dpca.util.RenameUtil; /** * * MQ分組接收文件功能 * 主動輪詢 */ public class MQFileReceiver { private MQQueueManager qmgr; // 連接到隊列管理器 private MQQueue inQueue; // 傳輸隊列 private String queueName = ""; // 隊列名稱 private String host = ""; // private int port = 1414; // 偵聽器的端口號 private String channel = ""; // 通道名稱 private String qmgrName = ""; // 隊列管理器 private MQMessage inMsg; // 創(chuàng)建消息緩沖 private MQGetMessageOptions gmo; // 設(shè)置獲取消息選項 private static String fileName = null; // 接收隊列上的消息并存入文件 private int ccsid = 0; private static String file_dir = null; /** * 程序的入口 * * @param args */ public static void main(String args[]) { MQFileReceiver mfs = new MQFileReceiver(); //初始化連接 mfs.initproperty(); //接收文件 mfs.runGoupReceiver(); //獲取shell腳本名 // String shellname = MQConfig.getValueByKey(fileName); // if(shellname!=null&&!"".equals(shellname)){ // //調(diào)用shell // ReadCmdLine.callShell(shellname); // }else{ // System.out.println("have no shell name,Only receive files."); // } } public void runGoupReceiver() { try { init(); getGroupMessages(); qmgr.commit(); System.out.println("\n Messages successfully Receive "); } catch (MQException mqe) { mqe.printStackTrace(); try { System.out.println("\n Backing out Transaction "); qmgr.backout(); System.exit(2); } catch (Exception e) { e.printStackTrace(); System.exit(2); } } catch (Exception e) { e.printStackTrace(); System.exit(2); } } /** * 初始化服務(wù)器連接信息 * * @throws Exception */ private void init() throws Exception { /* 為客戶機連接設(shè)置MQEnvironment屬性 */ MQEnvironment.hostname = host; MQEnvironment.channel = channel; MQEnvironment.port = port; /* 連接到隊列管理器 */ qmgr = new MQQueueManager(qmgrName); /* 設(shè)置隊列打開選項以輸 */ int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_FAIL_IF_QUIESCING; /* 打開隊列以輸 */ inQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null); } /** * 接受文件的主函數(shù) * * @throws Exception */ public void getGroupMessages() { /* 設(shè)置獲取消息選項 */ gmo = new MQGetMessageOptions(); gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING; gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT; /* 等待消息 */ gmo.options = gmo.options + MQConstants.MQGMO_WAIT; /* 設(shè)置等待時間限制 */ gmo.waitInterval = 5000; /* 只獲取消息 */ gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE; /* 以輯順序獲取消息 */ gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER; gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID; /* 創(chuàng)建消息緩沖 */ inMsg = new MQMessage(); try { FileOutputStream fos = null; /* 處理組消息 */ while (true) { try { inQueue.get(inMsg, gmo); if (fos == null) { try { fileName = inMsg.getStringProperty("fileName"); String fileName_full = null; fileName_full = file_dir + RenameUtil.rename(fileName); fos = new FileOutputStream(new File(fileName_full)); int msgLength = inMsg.getMessageLength(); byte[] buffer = new byte[msgLength]; inMsg.readFully(buffer); fos.write(buffer, 0, msgLength); /* 查看是否是最后消息標(biāo)識 */ char x = gmo.groupStatus; if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { System.out.println("Last Msg in Group"); break; } inMsg.clearMessage(); } catch (Exception e) { System.out .println("Receiver the message without property,do nothing!"); inMsg.clearMessage(); } } else { int msgLength = inMsg.getMessageLength(); byte[] buffer = new byte[msgLength]; inMsg.readFully(buffer); fos.write(buffer, 0, msgLength); /* 查看是否是最后消息標(biāo)識 */ char x = gmo.groupStatus; if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { System.out.println("Last Msg in Group"); break; } inMsg.clearMessage(); } } catch (Exception e) { char x = gmo.groupStatus; if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { System.out.println("Last Msg in Group"); } break; } } if (fos != null) fos.close(); } catch (Exception e) { System.out.println(e.getMessage()); } } public void initproperty() { MQConfig config = new MQConfig().getInstance(); if (config.getMQ_MANAGER() != null) { qmgrName = config.getMQ_MANAGER(); queueName = config.getMQ_QUEUE_NAME(); channel = config.getMQ_CHANNEL(); host = config.getMQ_HOST_NAME(); port = Integer.valueOf(config.getMQ_PROT()); ccsid = Integer.valueOf(config.getMQ_CCSID()); file_dir = config.getFILE_DIR(); } } }
相關(guān)文章
Spring Boot Async異步執(zhí)行任務(wù)過程詳解
這篇文章主要介紹了Spring Boot Async異步執(zhí)行任務(wù)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08淺析SpringBoot微服務(wù)中異步調(diào)用數(shù)據(jù)提交數(shù)據(jù)庫的問題
這篇文章主要介紹了SpringBoot微服務(wù)中異步調(diào)用數(shù)據(jù)提交數(shù)據(jù)庫的問題,今天本文涉及到的知識點不難,都是很簡單的crud操作,本文結(jié)合實例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07Java OpenSSL生成的RSA公私鑰進行數(shù)據(jù)加解密詳細(xì)介紹
這篇文章主要介紹了Java OpenSSL生成的RSA公私鑰進行數(shù)據(jù)加解密詳細(xì)介紹的相關(guān)資料,這里提供實例代碼及說明具體如何實現(xiàn),需要的朋友可以參考下2016-12-12MyBatis學(xué)習(xí)教程(三)-MyBatis配置優(yōu)化
這篇文章主要介紹了MyBatis學(xué)習(xí)教程(三)-MyBatis配置優(yōu)化的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-05-05