Spark刪除redis千萬級(jí)別set集合數(shù)據(jù)實(shí)現(xiàn)分析
1.使用pipline的原因
Redis 使用的是客戶端-服務(wù)器(CS)模型和請(qǐng)求/響應(yīng)協(xié)議的 TCP 服務(wù)器。
這意味著通常情況下一個(gè)請(qǐng)求會(huì)遵循以下步驟:
- 客戶端向服務(wù)端發(fā)送一個(gè)查詢請(qǐng)求,并監(jiān)聽 Socket 返回,通常是以阻塞模式,等待服務(wù)端響應(yīng)。
- 服務(wù)端處理命令,并將結(jié)果返回給客戶端。
- 管道(pipeline)可以一次性發(fā)送多條命令并在執(zhí)行完后一次性將結(jié)果返回,pipeline 通過減少客戶端與 redis 的通信次數(shù)來實(shí)現(xiàn)降低往返延時(shí)時(shí)間,而且 Pipeline 實(shí)現(xiàn)的原理是隊(duì)列,而隊(duì)列的原理是時(shí)先進(jìn)先出,這樣就保證數(shù)據(jù)的順序性。
通俗點(diǎn):
- pipeline就是把一組命令進(jìn)行打包,然后一次性通過網(wǎng)絡(luò)發(fā)送到Redis。同時(shí)將執(zhí)行的結(jié)果批量的返回回來
- pipelined.sync()表示我一次性的異步發(fā)送到redis,不關(guān)注執(zhí)行結(jié)果。
- pipeline.syncAndReturnAll ();將返回執(zhí)行過的命令結(jié)果返回到List列表中
2.方法
2.1寫入redis的方法
2.1.1參數(shù)說明
sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API編程Spark的入口點(diǎn)
def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={ // spark讀取數(shù)據(jù)集 val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet") df.show(1,false) val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r")) // 這個(gè)集合寫的是2000多萬的數(shù)據(jù) sc.toRedisSET(rdd,"test:task:deplicate") }
2.2讀取本地待刪除數(shù)據(jù)的方法
2.2.1參數(shù)說明
sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API編程Spark的入口點(diǎn)
def readParquet(spark: SparkSession,path:String): RDD[String] ={ val df: DataFrame = spark.read.parquet(path) val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r")) // 返回String類型的RDD strRDD }
2.3調(diào)用pipline刪除的方法
2.3.1參數(shù)說明
collectionName 其中redis set集合的名稱
num是要?jiǎng)h除的數(shù)據(jù)量是多少
arr是要?jiǎng)h除的數(shù)據(jù)存放的是set集合的key
jedis是redis的客戶端
def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = { try{ val pipeline: Pipeline = jedis.pipelined() // 選擇數(shù)據(jù)庫 默認(rèn)為 0 pipeline.select(1) for(i <- 0 to (num - 1) ){ pipeline.srem(collectionName,arr(i)) } //表示我一次性的異步發(fā)送到redis,不關(guān)注執(zhí)行結(jié)果 pipeline.sync() }catch { case e : JedisException => e.printStackTrace() }finally if(jedis !=null) jedis.close() }
3.完整代碼
import com.redislabs.provider.redis._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import redis.clients.jedis.exceptions.JedisException import redis.clients.jedis.{Jedis, Pipeline} /** * Date 2022/5/25 17:57 */ object DelRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf() // 驅(qū)動(dòng)進(jìn)程使用的內(nèi)核數(shù),僅在集群模式下使用。 .set("spark.driver.cores","5") /** * 驅(qū)動(dòng)進(jìn)程使用的內(nèi)存數(shù)量,也就是SparkContext初始化的地方, * 其格式與JVM內(nèi)存字符串具有大小單位后綴(“k”,“m”,“g”或“t”)(例如512m, 2g)相同。 * 注意:在客戶端模式下,不能直接在應(yīng)用程序中通過SparkConf設(shè)置此配置,因?yàn)榇藭r(shí)驅(qū)動(dòng)程 * 序JVM已經(jīng)啟動(dòng)。相反,請(qǐng)通過——driver-memory命令行選項(xiàng)或在默認(rèn)屬性文件中設(shè)置。 */ .set("spark.driver.memory","5g") /** * 限制每個(gè)Spark操作(例如collect)的所有分區(qū)的序列化結(jié)果的總大小(以字節(jié)為單位)。 * 應(yīng)該至少是1M,或者0表示無限制。如果總大小超過此限制,則作業(yè)將被終止。 * 過高的限制可能會(huì)導(dǎo)致驅(qū)動(dòng)程序內(nèi)存不足錯(cuò)誤(取決于spark.driver.memory和JVM中對(duì)象的內(nèi)存開銷)。 * 設(shè)置適當(dāng)?shù)南拗瓶梢苑乐跪?qū)動(dòng)程序出現(xiàn)內(nèi)存不足的錯(cuò)誤。 */ .set("spark.driver.maxResultSize","10g") /** * 每個(gè)執(zhí)行程序進(jìn)程使用的內(nèi)存數(shù)量, * 格式與帶有大小單位后綴(“k”,“m”,“g”或“t”)的JVM內(nèi)存字符串相同(例如512m, 2g)。 * */ .set("spark.executor.memory","5g") /** * 默認(rèn) 1在YARN模式下,worker上所有可用的內(nèi)核在standalone和Mesos粗粒度模式下。 */ .set("spark.executor.cores","5") val spark: SparkSession = SparkSession.builder().appName("DelRedis").master("local[*]") .config("spark.redis.host","192.168.100.201") .config("spark.redis.port","6379") .config("spark.redis.db","1") // 可選的數(shù)據(jù)庫編號(hào)。避免使用它,尤其是在集群模式下,redisRedis默認(rèn)支持16個(gè)數(shù)據(jù)庫,默認(rèn)是選擇數(shù)據(jù)庫0,這里設(shè)置為1。 .config("spark.redis.timeout","2000000") // 連接超時(shí),以毫秒為單位,默認(rèn)為 2000 毫秒 .config(conf) .getOrCreate() val sc: SparkContext = spark.sparkContext //1.寫入數(shù)據(jù)集 writeRedis(sc,spark) // 2.讀取待刪除的數(shù)據(jù)key val path = "file:///F://delRedisData//test.parquet" val rdd: RDD[String] = readParquet(spark,path) //3.使用redis 中的 pipeline 方法 進(jìn)行刪除操作 rdd.foreachPartition(iter=>{ // 連接redis客戶端 val jedis = new Jedis("192.168.100.201",6379) val array: Array[String] = iter.toArray val length: Int = array.length val beginTime: Long = System.currentTimeMillis() delPipleine(collectionName,length,array,jedis) val endTime: Long = System.currentTimeMillis() println("刪除:"+length+"條數(shù)據(jù),耗時(shí):"+(endTime-beginTime)/1000+"秒") }) sc.stop() spark.stop() } def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = { try{ val pipeline: Pipeline = jedis.pipelined() // 選擇數(shù)據(jù)庫 默認(rèn)為 0 pipeline.select(1) for(i <- 0 to (num - 1) ){ pipeline.srem(collectionName,arr(i)) } //表示我一次性的異步發(fā)送到redis,不關(guān)注執(zhí)行結(jié)果 pipeline.sync() }catch { case e : JedisException => e.printStackTrace() }finally if(jedis !=null) jedis.close() } def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={ // spark讀取數(shù)據(jù)集 val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet") df.show(1,false) val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r")) // 這個(gè)集合寫的是2000多萬的數(shù)據(jù) sc.toRedisSET(rdd,"test:task:deplicate") } def readParquet(spark: SparkSession,path:String): RDD[String] ={ val df: DataFrame = spark.read.parquet(path) val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r")) // 返回String類型的RDD strRDD } }
4.總結(jié)
經(jīng)檢測:redis 的 pipeline(管道)方法 ,經(jīng)單機(jī)版的redis測試 ,百萬級(jí)別數(shù)據(jù)刪除僅需要1分鐘左右與硬件有關(guān),還包括讀取數(shù)據(jù)的時(shí)長等方面原因
以上就是Spark刪除redis千萬級(jí)別set集合數(shù)據(jù)實(shí)現(xiàn)分析的詳細(xì)內(nèi)容,更多關(guān)于Spark刪除redis set集合的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)多級(jí)緩存的方法詳解
對(duì)于高并發(fā)系統(tǒng)來說,有三個(gè)重要的機(jī)制來保障其高效運(yùn)行,它們分別是:緩存、限流和熔斷,所以本文就來和大家探討一下多級(jí)緩存的實(shí)現(xiàn)方法,希望對(duì)大家有所幫助2024-02-02一文詳解如何停止/重啟/啟動(dòng)Redis服務(wù)
Redis是當(dāng)前比較熱門的NOSQL系統(tǒng)之一,它是一個(gè)key-value存儲(chǔ)系統(tǒng),這篇文章主要給大家介紹了關(guān)于如何停止/重啟/啟動(dòng)Redis服務(wù)的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-03-03Redis設(shè)置Hash數(shù)據(jù)類型的過期時(shí)間
在Redis中,我們可以使用Hash數(shù)據(jù)結(jié)構(gòu)來存儲(chǔ)一組鍵值對(duì),而有時(shí)候,我們可能需要設(shè)置這些鍵值對(duì)的過期時(shí)間,本文主要介紹了Redis設(shè)置Hash數(shù)據(jù)類型的過期時(shí)間,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01Redis面試必備之緩存設(shè)計(jì)規(guī)范與性能優(yōu)化詳解
你是否在使用Redis時(shí),不清楚Redis應(yīng)該遵循的設(shè)計(jì)規(guī)范而苦惱,你是否在Redis出現(xiàn)性能問題時(shí),不知道該如何優(yōu)化而發(fā)愁,快跟隨小編一起學(xué)習(xí)起來吧2024-03-03Redis+Caffeine兩級(jí)緩存的實(shí)現(xiàn)
本文主要介紹了Redis+Caffeine兩級(jí)緩存的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06