Java HTTP協(xié)議收發(fā)MQ 消息代碼實(shí)例詳解
1. 準(zhǔn)備環(huán)境
在工程 POM 文件添加 HTTP Java 客戶端的依賴。
<dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-client</artifactId> <version>9.3.4.RC1</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.1.11</version> </dependency>
2. 運(yùn)行代碼配置(user.properties)
您需要設(shè)置配置文件(user.properties)的相關(guān)內(nèi)容,具體請(qǐng)參考申請(qǐng) MQ 資源 。
#您在控制臺(tái)創(chuàng)建的Topic Topic=xxx #公測(cè)url URL=http://publictest-rest.ons.aliyun.com #阿里云身份驗(yàn)證碼 Ak=xxx #阿里云身份驗(yàn)證密鑰 Sk=xxx #MQ控制臺(tái)創(chuàng)建的Producer ID ProducerID=xxx #MQ控制臺(tái)創(chuàng)建的Consumer ID ConsumerID=xxx
說明:URL 中的 Key,Tag以及 POST Content-Type 沒有任何的限制,只要確保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。
3. HTTP 發(fā)送消息示例代碼
您可以按以下說明設(shè)置相應(yīng)參數(shù)并測(cè)試 HTTP 消息發(fā)送功能。 package com.aliyun.openservice.ons.http.demo; import java.nio.charset.Charset; import java.util.Date; import java.util.Properties; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.util.StringContentProvider; import com.aliyun.openservices.ons.api.impl.authority.AuthUtil; public class HttpProducer { public static String SIGNATURE="Signature"; public static String NUM="num"; public static String CONSUMERID="ConsumerID"; public static String PRODUCERID="ProducerID"; public static String TIMEOUT="timeout"; public static String TOPIC="Topic"; public static String AK="AccessKey"; public static String BODY="body"; public static String MSGHANDLE="msgHandle"; public static String TIME="time"; public static void main(String[] args) throws Exception { HttpClient httpClient=new HttpClient(); httpClient.setMaxConnectionsPerDestination(1); httpClient.start(); Properties properties=new Properties(); properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties")); String topic=properties.getProperty("Topic"); //請(qǐng)?jiān)趗ser.properties配置您的Topic String url=properties.getProperty("URL");//公測(cè)集群配置為http://publictest-rest.ons.aliyun.com/ String ak=properties.getProperty("Ak");//請(qǐng)?jiān)趗ser.properties配置您的Ak String sk=properties.getProperty("Sk");//請(qǐng)?jiān)趗ser.properties配置您的Sk String pid=properties.getProperty("ProducerID");//請(qǐng)?jiān)趗ser.properties配置您的Producer ID String date=String.valueOf(new Date().getTime()); String sign=null; String body="hello ons http"; String NEWLINE="\n"; String signString; for (int i = 0; i < 10; i++) { date=String.valueOf(new Date().getTime()); Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http"); ContentProvider content=new StringContentProvider(body); req.content(content); signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date; System.out.println(signString); sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(PRODUCERID, pid); ContentResponse response; response=req.send(); System.out.println("send msg:"+response.getStatus()+response.getContentAsString()); } } }
4. HTTP接收消息示例代碼
請(qǐng)按以下說明設(shè)置相應(yīng)參數(shù)并測(cè)試 HTTP 消息接收功能。
package com.aliyun.openservice.ons.http.demo; import java.nio.charset.Charset; import java.util.Date; import java.util.List; import java.util.Properties; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.util.StringContentProvider; import org.eclipse.jetty.http.HttpMethod; import com.alibaba.fastjson.JSON; import com.aliyun.openservice.ons.mqtt.demo.MqttProducer; import com.aliyun.openservices.ons.api.impl.authority.AuthUtil; public class HttpConsumer { public static String SIGNATURE="Signature"; public static String NUM="num"; public static String CONSUMERID="ConsumerID"; public static String PRODUCERID="ProducerID"; public static String TIMEOUT="timeout"; public static String TOPIC="Topic"; public static String AK="AccessKey"; public static String BODY="body"; public static String MSGHANDLE="msgHandle"; public static String TIME="time"; public static void main(String[] args) throws Exception { HttpClient httpClient=new HttpClient(); httpClient.setMaxConnectionsPerDestination(1); httpClient.start(); Properties properties=new Properties(); properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties")); String topic=properties.getProperty("Topic"); //請(qǐng)?jiān)趗ser.properties配置您的topic String url=properties.getProperty("URL");//公測(cè)集群配置為http://publictest-rest.ons.aliyun.com/ String ak=properties.getProperty("Ak");//請(qǐng)?jiān)趗ser.properties配置您的Ak String sk=properties.getProperty("Sk");//請(qǐng)?jiān)趗ser.properties配置您的Sk String cid=properties.getProperty("ConsumerID");//請(qǐng)?jiān)趗ser.properties配置您的Consumer ID String date=String.valueOf(new Date().getTime()); String sign=null; String NEWLINE="\n"; String signString; System.out.println(NEWLINE+NEWLINE); while (true) { try { date=String.valueOf(new Date().getTime()); Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32); req.method(HttpMethod.GET); ContentResponse response; signString=topic+NEWLINE+cid+NEWLINE+date; sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(CONSUMERID, cid); long start=System.currentTimeMillis(); response=req.send(); System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000 +" "+response.getStatus()+" "+response.getContentAsString()); List<SimpleMessage> list = null; if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) { list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class); } if (list==null||list.size()==0) { Thread.sleep(100); continue; } System.out.println("size is :"+list.size()); for (SimpleMessage simpleMessage : list) { date=String.valueOf(new Date().getTime()); System.out.println("receive msg:"+simpleMessage.getBody()+" born time "+simpleMessage.getBornTime()); req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date); req.method(HttpMethod.DELETE); signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date; sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(CONSUMERID, cid); response=req.send(); System.out.println("delete msg:"+response.toString()); } Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } }
5. HTTP示例程序工具類
(1)消息封裝類: SimpleMessage.java
package com.aliyun.openservice.ons.http.demo; public class SimpleMessage { private String body; private String msgId; private String bornTime; private String msgHandle; private int reconsumeTimes; private String tag; public void setTag(String tag) { this.tag = tag; } public String getTag() { return tag; } public int getReconsumeTimes() { return reconsumeTimes; } public void setReconsumeTimes(int reconsumeTimes) { this.reconsumeTimes = reconsumeTimes; } public void setMsgHandle(String msgHandle) { this.msgHandle = msgHandle; } public String getMsgHandle() { return msgHandle; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } public String getBornTime() { return bornTime; } public void setBornTime(String bornTime) { this.bornTime = bornTime; } }
(2)字符串簽名類: MD5.java
package com.aliyun.openservice.ons.http.demo; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.security.MessageDigest; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.LoggerFactory; public class MD5 { private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class); private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16); static { for (int i = 0; i < digits.length; ++i) { rDigits.put(digits[i], i); } } private static MD5 me = new MD5(); private MessageDigest mHasher; private final ReentrantLock opLock = new ReentrantLock(); private MD5() { try { this.mHasher = MessageDigest.getInstance("md5"); } catch (Exception e) { throw new RuntimeException(e); } } public static MD5 getInstance() { return me; } public String getMD5String(String content) { return this.bytes2string(this.hash(content)); } public String getMD5String(byte[] content) { return this.bytes2string(this.hash(content)); } public byte[] getMD5Bytes(byte[] content) { return this.hash(content); } public byte[] hash(String str) { this.opLock.lock(); try { byte[] bt = this.mHasher.digest(str.getBytes("utf-8")); if (null == bt || bt.length != 16) { throw new IllegalArgumentException("md5 need"); } return bt; } catch (UnsupportedEncodingException e) { throw new RuntimeException("unsupported utf-8 encoding", e); } finally { this.opLock.unlock(); } } public byte[] hash(byte[] data) { this.opLock.lock(); try { byte[] bt = this.mHasher.digest(data); if (null == bt || bt.length != 16) { throw new IllegalArgumentException("md5 need"); } return bt; } finally { this.opLock.unlock(); } } public String bytes2string(byte[] bt) { int l = bt.length; char[] out = new char[l << 1]; for (int i = 0, j = 0; i < l; i++) { out[j++] = digits[(0xF0 & bt[i]) >>> 4]; out[j++] = digits[0x0F & bt[i]]; } if (log.isDebugEnabled()) { log.debug("[hash]" + new String(out)); } return new String(out); } public byte[] string2bytes(String str) { if (null == str) { throw new NullPointerException("Argument is not allowed empty"); } if (str.length() != 32) { throw new IllegalArgumentException("String length must equals 32"); } byte[] data = new byte[16]; char[] chs = str.toCharArray(); for (int i = 0; i < 16; ++i) { int h = rDigits.get(chs[i * 2]).intValue(); int l = rDigits.get(chs[i * 2 + 1]).intValue(); data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F); } return data; } }
希望本篇文章對(duì)您有所幫助
相關(guān)文章
Java設(shè)計(jì)模式之依賴倒轉(zhuǎn)原則精解
設(shè)計(jì)模式(Design pattern)代表了最佳的實(shí)踐,通常被有經(jīng)驗(yàn)的面向?qū)ο蟮能浖_發(fā)人員所采用。設(shè)計(jì)模式是軟件開發(fā)人員在軟件開發(fā)過程中面臨的一般問題的解決方案。本篇介紹設(shè)計(jì)模式七大原則之一的依賴倒轉(zhuǎn)原則2022-02-02Java使用Thread和Runnable的線程實(shí)現(xiàn)方法比較
這篇文章主要介紹了Java使用Thread和Runnable的線程實(shí)現(xiàn)方法,結(jié)合實(shí)例形式對(duì)比分析了Java使用Thread和Runnable實(shí)現(xiàn)與使用線程的相關(guān)操作技巧,需要的朋友可以參考下2019-10-10Springboot項(xiàng)目中內(nèi)嵌sqlite數(shù)據(jù)庫(kù)的配置流程
這篇文章主要介紹了Springboot項(xiàng)目中內(nèi)嵌sqlite數(shù)據(jù)庫(kù)的配置流程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06Apache Calcite進(jìn)行SQL解析(java代碼實(shí)例)
Calcite是一款開源SQL解析工具, 可以將各種SQL語(yǔ)句解析成抽象語(yǔ)法樹AST(Abstract Syntax Tree), 之后通過操作AST就可以把SQL中所要表達(dá)的算法與關(guān)系體現(xiàn)在具體代碼之中,今天通過代碼實(shí)例給大家介紹Apache Calcite進(jìn)行SQL解析問題,感興趣的朋友一起看看吧2022-01-01Spring,hibernate,struts經(jīng)典面試筆試題(含答案)
這篇文章主要介紹了Spring,hibernate,struts經(jīng)典面試筆試題極其參考含答案,涉及SSH基本概念,原理與使用技巧,需要的朋友可以參考下2016-03-03java比較器Comparable接口與Comaprator接口的深入分析
本篇文章是對(duì)java比較器Comparable接口與Comaprator接口進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-06-06Java實(shí)現(xiàn)猜數(shù)字小游戲詳解流程
猜數(shù)字是興起于英國(guó)的益智類小游戲,起源于20世紀(jì)中期,一般由兩個(gè)人或多人玩,也可以由一個(gè)人和電腦玩。游戲規(guī)則為一方出數(shù)字,一方猜,今天我們來用Java把這個(gè)小游戲?qū)懗鰜砭毦毷?/div> 2021-10-10Java中List集合去除重復(fù)數(shù)據(jù)的方法匯總
這篇文章主要給大家介紹了關(guān)于Java中List集合去除重復(fù)數(shù)據(jù)的方法,文中通過圖文介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02SpringBoot實(shí)現(xiàn)License生成和校驗(yàn)的過程詳解
在我們向客戶銷售商業(yè)軟件的時(shí)候,常常需要對(duì)所發(fā)布的軟件實(shí)行一系列管控措施,諸如驗(yàn)證使用者身份、軟件是否到期,以及保存版權(quán)信息和開發(fā)商詳情等,所以本文給大家介紹了SpringBoot實(shí)現(xiàn)License生成和校驗(yàn)的過程,需要的朋友可以參考下2024-09-09最新評(píng)論