RabbitMQ?延遲隊(duì)列實(shí)現(xiàn)訂單支付結(jié)果異步階梯性通知(實(shí)例代碼)
在第三方支付中,例如支付寶、或者微信,對(duì)于訂單請(qǐng)求,第三方支付系統(tǒng)采用的是消息同步返回、異步通知+主動(dòng)補(bǔ)償查詢的補(bǔ)償機(jī)制。
由于互聯(lián)網(wǎng)通信的不可靠性,例如雙方網(wǎng)絡(luò)、服務(wù)器、應(yīng)用等因素的影響,不管是同步返回、異步通知、主動(dòng)查詢報(bào)文都可能出現(xiàn)超時(shí)無(wú)響應(yīng)、報(bào)文丟失等情況,所以像支付業(yè)務(wù),對(duì)結(jié)果的通知一般采用幾種方案結(jié)合的補(bǔ)償機(jī)制,不能完全依賴某一種機(jī)制。
例如一個(gè)支付結(jié)果的通知,一方面會(huì)在支付頁(yè)面跳轉(zhuǎn)時(shí)候返回支付結(jié)果(一般只用作前端展示使用,非最終狀態(tài)),同時(shí)會(huì)采用后臺(tái)異步通知機(jī)制(有前臺(tái)、后臺(tái)通知的,以后臺(tái)異步通知結(jié)果為準(zhǔn)),但由于前臺(tái)跳轉(zhuǎn)、后臺(tái)結(jié)果通知都可能失效,因此還以定時(shí)補(bǔ)單+請(qǐng)求方主動(dòng)查詢接口作為輔助手段。
常見(jiàn)的補(bǔ)單操作,任務(wù)調(diào)度策略一般設(shè)定30秒、60秒、3分鐘、6分鐘、10分鐘調(diào)度多次(以自己業(yè)務(wù)需要),如果調(diào)度接收到響應(yīng)確認(rèn)報(bào)文,補(bǔ)單成功,則中止對(duì)應(yīng)訂單的調(diào)度任務(wù);如果超過(guò)補(bǔ)單上限次數(shù),則停止補(bǔ)單,避免無(wú)謂的資源浪費(fèi)。請(qǐng)求端隨時(shí)可以發(fā)起請(qǐng)求報(bào)文查詢對(duì)應(yīng)訂單的狀態(tài)。在日常開(kāi)發(fā)中,對(duì)于網(wǎng)站前端來(lái)說(shuō),支付計(jì)費(fèi)中心對(duì)于訂單請(qǐng)求信息的處理也是通過(guò)消息同步返回、異步通知+主動(dòng)補(bǔ)償查詢相結(jié)合的機(jī)制,其中對(duì)于訂單的異步通知,目前的通知策略為3s、30s、60s、120s、180、300s的階梯性通知。返回成功情況下就不繼續(xù)通知了,本來(lái)打算使用將失敗的消息寫到數(shù)據(jù)庫(kù)等待發(fā)送,然后每秒查詢數(shù)據(jù)庫(kù)獲取消息通知前端。但覺(jué)得這樣的處理方式太粗暴。
存在以下缺點(diǎn):
1 、每秒請(qǐng)求有點(diǎn)兒浪費(fèi)資源;
2 、通知方式不穩(wěn)定;
3 、無(wú)法承受大數(shù)據(jù)量等等
所以最終打算使用rabbitmq的消息延遲+死信隊(duì)列來(lái)實(shí)現(xiàn)。消息模型如下:

producer發(fā)布消息,通過(guò)exchangeA的消息會(huì)被分發(fā)到QueueA,Consumer監(jiān)聽(tīng)queueA,一旦有消息到來(lái)就被消費(fèi),這邊的消費(fèi)業(yè)務(wù)就是通知前端,如果通知失敗,就創(chuàng)建一個(gè)延遲隊(duì)列declareQueue,設(shè)置每個(gè)消息的ttl然后通過(guò)declare_exchange將消息分發(fā)到declare_queue,因?yàn)閐eclare_queue沒(méi)有consumer并且declare_queue中的消息設(shè)置了ttl,當(dāng)ttl到期后,將通過(guò)DEX路由到queueA,被重新消費(fèi)。代碼如下:DeclareQueue.java
package org.delayQueue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DeclareQueue {
public static String EXCHANGE_NAME = "notifyExchange";
public static void init() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = "AliPaynotify";
String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T?ify_id=4ab9bed148d043d0bf75460706f7774a?ify_time=2014-08-29+16%3A22%3A02?ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent :" + message);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
public static void main(String args[]) {
init();
}DeclareConsumer.java
package org.delayQueue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DeclareConsumer {
public static String EXCHANGE_NAME = "notifyExchange";
public static String QU_declare_15S = "Qu_declare_15s";
public static String EX_declare_15S = "EX_declare_15s";
public static String ROUTINGKEY = "AliPaynotify";
public static Connection connection = null;
public static Channel channel = null;
public static Channel DECLARE_15S_CHANNEL = null;
public static String declare_queue = "init";
public static String originalExpiration = "0";
public static void init() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
connection = factory.newConnection();
channel = connection.createChannel();
DECLARE_15S_CHANNEL = connection.createChannel();
}
public static void consume() {
try {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
final String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
System.out.println("xDeath--- > " + xDeath);
if (xDeath != null && !xDeath.isEmpty()) {
Map<String, Object> entrys = xDeath.get(0);
// for(Entry<String, Object>
// entry:entrys.entrySet()){
// System.out.println(entry.getKey()+":"+entry.getValue());
// }
originalExpiration = entrys.get("original-expiration").toString();
}
}
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());
HttpClient httpClient = new DefaultHttpClient();
HttpPost post = new HttpPost(message);
HttpResponse response = httpClient.execute(post);
BufferedReader inreader = null;
if (response.getStatusLine().getStatusCode() == 200) {
inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
StringBuffer responseBody = new StringBuffer();
String line = null;
while ((line = inreader.readLine()) != null) {
responseBody.append(line);
if (!responseBody.equals("success")) {
// putDeclre15s(message);
if (originalExpiration.equals("0")) {
putDeclreQueue(message, 3000, QU_declare_15S);
}
if (originalExpiration.equals("3000")) {
putDeclreQueue(message, 30000, QU_declare_15S);
if (originalExpiration.equals("30000")) {
putDeclreQueue(message, 60000, QU_declare_15S);
if (originalExpiration.equals("60000")) {
putDeclreQueue(message, 120000, QU_declare_15S);
if (originalExpiration.equals("120000")) {
putDeclreQueue(message, 180000, QU_declare_15S);
if (originalExpiration.equals("180000")) {
putDeclreQueue(message, 300000, QU_declare_15S);
if (originalExpiration.equals("300000")) {
// channel.basicConsume(QU_declare_300S,true, this);
System.out.println("finish notify");
} else {
System.out.println(response.getStatusLine().getStatusCode());
}
};
channel.basicConsume(queueName, true, consumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
static Map<String, Object> xdeathMap = new HashMap<String, Object>();
static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();
static Map<String, Object> xdeathParam = new HashMap<String, Object>();
public static void putDeclre15s(String message) throws IOException {
channel.exchangeDeclare(EX_declare_15S, "topic");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("3000").deliveryMode(2);// 設(shè)置消息TTL
AMQP.BasicProperties properties = builder.build();
channel.queueDeclare(QU_declare_15S, false, false, false, args);
channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);
channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());
public static void putDeclreQueue(String message, int mis, String queue) throws IOException {
builder.expiration(String.valueOf(mis)).deliveryMode(2);// 設(shè)置消息TTL
channel.queueDeclare(queue, false, false, false, args);
channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);
System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());
public static void main(String args[]) throws Exception {
init();
consume();
}消息通過(guò)dlx轉(zhuǎn)發(fā)的情況下,header頭部會(huì)帶有x-death的一個(gè)數(shù)組,里面包含消息的各項(xiàng)屬性,比如說(shuō)消息成為死信的原因reason,original-expiration這個(gè)字段表示消息在原來(lái)隊(duì)列中的過(guò)期時(shí)間,根據(jù)這個(gè)值來(lái)確定下一次通知的延遲時(shí)間應(yīng)該是多少秒。運(yùn)行結(jié)果如下:



到此這篇關(guān)于RabbitMQ 延遲隊(duì)列實(shí)現(xiàn)訂單支付結(jié)果異步階梯性通知的文章就介紹到這了,更多相關(guān)RabbitMQ 延遲隊(duì)列實(shí)現(xiàn)訂單支付內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Boot中使用Swagger3.0.0版本構(gòu)建RESTful?APIs的方法
Swagger?是一個(gè)規(guī)范和完整的框架,用于生成、描述、調(diào)用和可視化?RESTful?風(fēng)格的?Web?服務(wù),這篇文章主要介紹了Spring?Boot中使用Swagger3.0.0版本構(gòu)建RESTful?APIs的方法,需要的朋友可以參考下2022-11-11
Java sleep方法及中斷方式、yield方法代碼實(shí)例
這篇文章主要介紹了Java sleep方法及中斷方式、yield方法代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
微信java開(kāi)發(fā)之實(shí)現(xiàn)微信主動(dòng)推送消息
這篇文章主要介紹了微信開(kāi)發(fā)過(guò)程中的使用java實(shí)現(xiàn)微信主動(dòng)推送消息示例,需要的朋友可以參考下2014-03-03
Elasticsearch查詢Range Query語(yǔ)法示例
這篇文章主要為大家介紹了Elasticsearch查詢Range Query語(yǔ)法示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
SparkStreaming-Kafka通過(guò)指定偏移量獲取數(shù)據(jù)實(shí)現(xiàn)
這篇文章主要為大家介紹了SparkStreaming-Kafka通過(guò)指定偏移量獲取數(shù)據(jù),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
使用jenkins部署springboot項(xiàng)目的方法步驟
這篇文章主要介紹了使用jenkins部署springboot項(xiàng)目的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04

