使用Kotlin+RocketMQ實現(xiàn)延時消息的示例代碼
一. 延時消息
延時消息是指消息被發(fā)送以后,并不想讓消費者立即拿到消息,而是等待指定時間后,消費者才拿到這個消息進行消費。
使用延時消息的典型場景,例如:
- 在電商系統(tǒng)中,用戶下完訂單30分鐘內(nèi)沒支付,則訂單可能會被取消。
- 在電商系統(tǒng)中,用戶七天內(nèi)沒有評價商品,則默認好評。
這些場景對應的解決方案,包括:
- 輪詢遍歷數(shù)據(jù)庫記錄
- JDK 的 DelayQueue
- ScheduledExecutorService
- 基于 Quartz 的定時任務
- 基于 Redis 的 zset 實現(xiàn)延時隊列。
除此之外,還可以使用消息隊列來實現(xiàn)延時消息,例如 RocketMQ。
二. RocketMQ
RocketMQ 是一個分布式消息和流數(shù)據(jù)平臺,具有低延遲、高性能、高可靠性、萬億級容量和靈活的可擴展性。RocketMQ 是2012年阿里巴巴開源的第三代分布式消息中間件。

三. RocketMQ 實現(xiàn)延時消息
3.1 業(yè)務背景
我們的系統(tǒng)完成某項操作之后,會推送事件消息到業(yè)務方的接口。當我們調用業(yè)務方的通知接口返回值為成功時,表示本次推送消息成功;當返回值為失敗時,則會多次推送消息,直到返回成功為止(保證至少成功一次)。
當我們推送失敗后,雖然會進行多次推送消息,但并不是立即進行。會有一定的延遲,并按照一定的規(guī)則進行推送消息。
例如:1小時后嘗試推送、3小時后嘗試推送、1天后嘗試推送、3天后嘗試推送等等。因此,考慮使用延時消息實現(xiàn)該功能。
3.2 生產(chǎn)者(Producer)
生產(chǎn)者負責產(chǎn)生消息,生產(chǎn)者向消息服務器發(fā)送由業(yè)務應用程序系統(tǒng)生成的消息。
首先,定義一個支持延時發(fā)送的 AbstractProducer。
abstract class AbstractProducer :ProducerBean() {
var producerId: String? = null
var topic: String? = null
var tag: String?=null
var timeoutMillis: Int? = null
var delaySendTimeMills: Long? = null
val log = LogFactory.getLog(this.javaClass)
open fun sendMessage(messageBody: Any, tag: String) {
val msgBody = JSON.toJSONString(messageBody)
val message = Message(topic, tag, msgBody.toByteArray())
if (delaySendTimeMills != null) {
val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
message.startDeliverTime = startDeliverTime
log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")
}
val logMessageId = buildLogMessageId(message)
try {
val sendResult = send(message)
log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
} catch (e: Exception) {
log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
}
}
fun buildLogMessageId(message: Message): String {
return "topic: " + message.topic + "\n" +
"producer: " + producerId + "\n" +
"tag: " + message.tag + "\n" +
"key: " + message.key + "\n"
}
}
根據(jù)業(yè)務需要,增加一個支持重試機制的 Producer
@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {
lateinit var delaySecondList:List<Long>
fun sendMessage(messageBody: CleanReportPushEventMessage){
//重試超過次數(shù)之后不再發(fā)事件
if (delaySecondList!=null) {
if(messageBody.times>=delaySecondList.size){
return
}
val msgBody = JSON.toJSONString(messageBody)
val message = Message(topic, tag, msgBody.toByteArray())
val delayTimeMills = delaySecondList[messageBody.times]*1000L
message.startDeliverTime = System.currentTimeMillis() + delayTimeMills
log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
val logMessageId = buildLogMessageId(message)
try {
val sendResult = send(message)
log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
} catch (e: Exception) {
log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
}
}
}
}
在 CleanReportPushEventProducer 中,超過了重試的次數(shù)就不會再發(fā)送消息了。
每一次延時消息的時間也會不同,因此需要根據(jù)重試的次數(shù)來獲取這個delayTimeMills 。
通過 System.currentTimeMillis() + delayTimeMills 可以設置 message 的 startDeliverTime。然后調用 send(message) 即可發(fā)送延時消息。
我們使用商用版的 RocketMQ,因此支持精度為秒級別的延遲消息。在開源版本中,RocketMQ 只支持18個特定級別的延遲消息。:(
3.3 消費者(Consumer)
消費者負責消費消息,消費者從消息服務器拉取信息并將其輸入用戶應用程序。
定義 Push 類型的 AbstractConsumer:
@Data
abstract class AbstractConsumer ():MessageListener{
var consumerId: String? = null
lateinit var subscribeOptions: List<SubscribeOptions>
var threadNums: Int? = null
val log = LogFactory.getLog(this.javaClass)
override fun consume(message: Message, context: ConsumeContext): Action {
val logMessageId = buildLogMessageId(message)
val body = String(message.body)
try {
log.info(logMessageId + " body: " + body)
val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
log.info(logMessageId + " result: " + result.name)
return result
} catch (e: Exception) {
if (message.reconsumeTimes >= 3) {
log.error(logMessageId + " error: " + e.message, e)
}
return Action.ReconsumeLater
}
}
abstract fun getMessageBodyType(tag: String): Type?
abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action
protected fun buildLogMessageId(message: Message): String {
return "topic: " + message.topic + "\n" +
"consumer: " + consumerId + "\n" +
"tag: " + message.tag + "\n" +
"key: " + message.key + "\n" +
"MsgId:" + message.msgID + "\n" +
"BornTimestamp" + message.bornTimestamp + "\n" +
"StartDeliverTime:" + message.startDeliverTime + "\n" +
"ReconsumeTimes:" + message.reconsumeTimes + "\n"
}
}
再定義具體的消費者,并且在消費失敗之后能夠再發(fā)送一次消息。
@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {
val logger: Logger = LoggerFactory.getLogger(this.javaClass)
override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
if(obj is CleanReportPushEventMessage){
//清除事件
logger.info("consumer clean-report event report_id:${obj.id} ")
//消費失敗之后再發(fā)送一次消息
if(!cleanReportService.sendCleanReportEvent(obj.id)){
val times = obj.times+1
eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
}
}
return Action.CommitMessage
}
override fun getMessageBodyType(tag: String): Type? {
return CleanReportPushEventMessage::class.java
}
}
其中,cleanReportService 的 sendCleanReportEvent() 會通過 http 的方式調用業(yè)務方提供的接口,進行事件消息的推送。如果推送失敗了,則會進行下一次的推送。(這里使用了 eventProducer 的 sendMessage() 方法再次投遞消息,是因為要根據(jù)調用的http接口返回的內(nèi)容來判斷消息是否發(fā)送成功。)
最后,定義 ConsumerFactory
@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {
val logger: Logger = LoggerFactory.getLogger(this.javaClass)
@PostConstruct
fun start() {
CompletableFuture.runAsync{
consumers.stream().forEach {
val properties = buildProperties(it.consumerId!!, it.threadNums)
val consumer = ONSFactory.createConsumer(properties)
if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {
for (options in it.subscribeOptions!!) {
consumer.subscribe(options.topic, options.tag, it)
}
consumer.start()
val message = "\n".plus(
it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
.collect(Collectors.toList<Any>()))
logger.info(String.format("consumer: %s\n", message))
}
}
}
}
private fun buildProperties(consumerId: String,threadNums: Int?): Properties {
val properties = Properties()
properties.put(PropertyKeyConst.ConsumerId, consumerId)
properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
} else {
// 測試環(huán)境接入RocketMQ
properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
}
properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
return properties
}
}
四. 總結
正如本文開頭曾介紹過,可以使用多種方式來實現(xiàn)延時消息。然而,我們的系統(tǒng)本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 實現(xiàn)延時消息不失為一種可靠而又方便的方式。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
android基于ListView和CheckBox實現(xiàn)多選和全選記錄的功能
本篇文章主要介紹了android基于ListView和CheckBox實現(xiàn)多選和全選記錄的功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2016-11-11
快速解決android webview https圖片不顯示的問題
今天小編就為大家分享一篇快速解決android webview https圖片不顯示的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07
Android 開發(fā)中l(wèi)ayout下的子文件夾
這篇文章主要介紹了android 開發(fā)中l(wèi)ayout下的子文件夾,需要的朋友可以參考下2017-12-12
Android編程實現(xiàn)WebView全屏播放的方法(附源碼)
這篇文章主要介紹了Android編程實現(xiàn)WebView全屏播放的方法,結合實例形式較為詳細的分析了Android實現(xiàn)WebView全屏播放的布局與功能相關技巧,需要的朋友可以參考下2015-11-11

