Redis+Hbase+RocketMQ?實(shí)際使用問(wèn)題案例講解
需求
- 將Hbase數(shù)據(jù),解析后推送到RocketMQ。
- redis使用list數(shù)據(jù)類(lèi)型,存儲(chǔ)了需要推送的數(shù)據(jù)的RowKey及表名。
簡(jiǎn)單畫(huà)個(gè)流程圖就是:

分析及確定方案
Redis
- 明確list中元素結(jié)構(gòu)
{"rowkey":rowkey,"table":table}解析出rowkey; - 一次取多個(gè)元素加快效率;取了之后放入重試隊(duì)列,并刪除原來(lái)的元素;
- 處理數(shù)據(jù)永遠(yuǎn)是重試隊(duì)列里的,成功之后刪除,失敗就加上重試次數(shù)并重新放回;
- 明確從list中取值所使用的redis命令;
- 范圍獲取
LRANGE; - 范圍刪除(留下指定范圍的數(shù)據(jù))
LTRIM; - 判斷l(xiāng)ist長(zhǎng)度
LLEN; - 加入list
RPUSH;刪除LREM等等; - 從Hbase獲取數(shù)據(jù)失敗和發(fā)送到mq失敗都令重試次數(shù)加一;
- 每次碰到重試次數(shù)不為0的數(shù)據(jù)都休眠1s;
- 設(shè)置最大重試次數(shù),達(dá)到限制后丟棄;
- 考慮客戶(hù)redis部署方式,單機(jī)、主從、集群、哨兵等;
- 選擇合適的客戶(hù)端,Jedis、Redisson、Lettuce等;
- 編寫(xiě)不同的操作代碼,也可以利用配置文件、環(huán)境變量、工廠模式等適配各種部署模式;
Hbase
- 基本理論知識(shí)學(xué)習(xí)(原來(lái)沒(méi)接觸過(guò)),rowkey是沒(méi)條數(shù)據(jù)的主鍵,限定符是字段名,列族是多個(gè)限定名的集合等;
- 當(dāng)時(shí)看這個(gè)覺(jué)得不錯(cuò)http://www.dbjr.com.cn/article/230731.htm因?yàn)槭遣煌Wx取數(shù)據(jù)、鏈接、Table不用close,可以緩存起來(lái),沒(méi)必要每次都創(chuàng)建;
- 確定批量獲取數(shù)據(jù)方式為批量
Get,沒(méi)用scan; - 了解解析方式,一些網(wǎng)上的解析試了之后會(huì)亂碼,這邊用的是它自帶的
CellUtil.clone相關(guān)方法; - 考慮所有都沒(méi)數(shù)據(jù)時(shí)休眠10s;
RocketMQ
- 有現(xiàn)成的發(fā)送代碼,公司封裝好的;
- 調(diào)整發(fā)送的速度、太快了服務(wù)端會(huì)吃不消(獲取Hbase數(shù)據(jù)速度太快了,最開(kāi)始沒(méi)限制一會(huì)兒就入了百萬(wàn)數(shù)據(jù)),設(shè)置超時(shí)時(shí)間(默認(rèn)3s);
- 調(diào)整服務(wù)端的內(nèi)存、線程數(shù)等參數(shù);
實(shí)現(xiàn)
配置
#server configuration server.port=8896 #log config logging.file.path=./logs #redis-standalone redis.standalone.host= redis.standalone.port=6379 redis.standalone.password= redis.standalone.enable=true #redis-cluster redis.cluster.nodes= redis.cluster.password= redis.cluster.timeout=30000 redis.cluster.enable=false # Zookeeper 集群地址,逗號(hào)分隔 hbase.zookeeper.quorum= # Zookeeper 端口 hbase.zookeeper.property.clientPort=2181 # 消息目的rocketmq地址 rocketmq.server.host= # 發(fā)送消息間隔時(shí)間,防止發(fā)送過(guò)快mq受不了 rocketmq.send.interval.millisec=10 # 每次從redis讀取數(shù)據(jù)量限制。 data.access.redisDataSize=100 # 失敗數(shù)據(jù)重試次數(shù),超過(guò)的直接丟棄 data.access.retryNum=10 # 需要接入的表,需要發(fā)送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
部分代碼
獲取配置,其余的直接@Value("${}"):
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {
/**
* key:topic; value:redis的key
*/
private Map<String, String> topicKeyMap = new HashMap<>();
/**
* 一次從redis中讀取數(shù)據(jù)量限制
*/
private long redisDataSize = 50;
/**
* 失敗數(shù)據(jù)重試次數(shù)
*/
private int retryNum = 10;
}開(kāi)啟接入:
@Component
public class AdapterRunner implements ApplicationRunner {
@Resource
private DataAccessService dataAccessService;
@Override
public void run(ApplicationArguments args) {
System.out.println("項(xiàng)目已啟動(dòng),開(kāi)始接入數(shù)據(jù)到RocketMQ……");
dataAccessService.accessData2Mq();
}
}其他代碼其實(shí)也在分析里了。
踩坑
mq發(fā)送問(wèn)題
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523) at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610) at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167) at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572) at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Wo
上面分析也說(shuō)了,注意發(fā)送速度,有多少資源就接入多快。還有注意相關(guān)三個(gè)端口是否開(kāi)放。
總結(jié)
程序很簡(jiǎn)單,主要涉及方案的是,獲取redis的list數(shù)據(jù)時(shí),是考慮效率,及加入重試策略,保證數(shù)據(jù)不丟失等。
相關(guān)文章
Redis瞬時(shí)高并發(fā)秒殺方案總結(jié)
本文講述了Redis瞬時(shí)高并發(fā)秒殺方案總結(jié),具有很好的參考價(jià)值,感興趣的小伙伴們可以參考一下,具體如下:2018-05-05
Redis實(shí)現(xiàn)每日簽到功能(大數(shù)據(jù)量)
在面對(duì)百萬(wàn)級(jí)用戶(hù)簽到情況下,傳統(tǒng)數(shù)據(jù)庫(kù)存儲(chǔ)和判斷會(huì)遇到瓶頸,使用Redis的二進(jìn)制數(shù)據(jù)類(lèi)型可實(shí)現(xiàn)高效的簽到功能,示例代碼展示了如何調(diào)用這些功能,包括當(dāng)天簽到、補(bǔ)簽以及查詢(xún)簽到記錄,PHP結(jié)合Redis二進(jìn)制數(shù)據(jù)類(lèi)型可有效處理大數(shù)據(jù)量下的簽到問(wèn)題2024-10-10
redis+mysql+quartz 一種紅包發(fā)送功能的實(shí)現(xiàn)
這篇文章主要介紹了redis+mysql+quartz 一種紅包發(fā)送功能的實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考下2017-01-01
Redis?sentinel哨兵集群的實(shí)現(xiàn)步驟
本文主要介紹了Redis?sentinel哨兵集群的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07

