欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spark刪除redis千萬級別set集合數據實現分析

 更新時間:2023年06月20日 10:08:24   作者:spark打醬油  
這篇文章主要為大家介紹了Spark刪除redis千萬級別set集合數據實現過程分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

1.使用pipline的原因

Redis 使用的是客戶端-服務器(CS)模型和請求/響應協議的 TCP 服務器。

這意味著通常情況下一個請求會遵循以下步驟:

  • 客戶端向服務端發(fā)送一個查詢請求,并監(jiān)聽 Socket 返回,通常是以阻塞模式,等待服務端響應。
  • 服務端處理命令,并將結果返回給客戶端。
  • 管道(pipeline)可以一次性發(fā)送多條命令并在執(zhí)行完后一次性將結果返回,pipeline 通過減少客戶端與 redis 的通信次數來實現降低往返延時時間,而且 Pipeline 實現的原理是隊列,而隊列的原理是時先進先出,這樣就保證數據的順序性。

通俗點:

  • pipeline就是把一組命令進行打包,然后一次性通過網絡發(fā)送到Redis。同時將執(zhí)行的結果批量的返回回來
  • pipelined.sync()表示我一次性的異步發(fā)送到redis,不關注執(zhí)行結果。
  • pipeline.syncAndReturnAll ();將返回執(zhí)行過的命令結果返回到List列表中

2.方法

2.1寫入redis的方法

2.1.1參數說明

sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API編程Spark的入口點

def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={
    // spark讀取數據集
     val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet")
    df.show(1,false)
    val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r"))
       // 這個集合寫的是2000多萬的數據
    sc.toRedisSET(rdd,"test:task:deplicate")
  }

2.2讀取本地待刪除數據的方法

2.2.1參數說明

sc:SparkContext Spark上下文

spark:SparkSession 使用Dataset和DataFrame API編程Spark的入口點

def readParquet(spark: SparkSession,path:String): RDD[String] ={
    val df: DataFrame = spark.read.parquet(path)
    val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r"))
    // 返回String類型的RDD
    strRDD
  }

2.3調用pipline刪除的方法

2.3.1參數說明

collectionName 其中redis set集合的名稱

num是要刪除的數據量是多少

arr是要刪除的數據存放的是set集合的key

jedis是redis的客戶端

def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {
    try{
        val pipeline: Pipeline = jedis.pipelined()
        // 選擇數據庫  默認為 0
        pipeline.select(1)
        for(i <- 0 to (num - 1) ){
          pipeline.srem(collectionName,arr(i))
        }
        //表示我一次性的異步發(fā)送到redis,不關注執(zhí)行結果
        pipeline.sync()
    }catch {
      case e : JedisException => e.printStackTrace()
    }finally if(jedis !=null) jedis.close()
  }

3.完整代碼

import com.redislabs.provider.redis._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.exceptions.JedisException
import redis.clients.jedis.{Jedis, Pipeline}
/**
  * Date 2022/5/25 17:57
  */
object DelRedis {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      // 驅動進程使用的內核數,僅在集群模式下使用。
      .set("spark.driver.cores","5")
      /**
        * 驅動進程使用的內存數量,也就是SparkContext初始化的地方,
        * 其格式與JVM內存字符串具有大小單位后綴(“k”,“m”,“g”或“t”)(例如512m, 2g)相同。
        * 注意:在客戶端模式下,不能直接在應用程序中通過SparkConf設置此配置,因為此時驅動程
        * 序JVM已經啟動。相反,請通過——driver-memory命令行選項或在默認屬性文件中設置。
        */
      .set("spark.driver.memory","5g")
      /**
        * 限制每個Spark操作(例如collect)的所有分區(qū)的序列化結果的總大小(以字節(jié)為單位)。
        * 應該至少是1M,或者0表示無限制。如果總大小超過此限制,則作業(yè)將被終止。
        * 過高的限制可能會導致驅動程序內存不足錯誤(取決于spark.driver.memory和JVM中對象的內存開銷)。
        * 設置適當的限制可以防止驅動程序出現內存不足的錯誤。
        */
      .set("spark.driver.maxResultSize","10g")
      /**
        * 每個執(zhí)行程序進程使用的內存數量,
        * 格式與帶有大小單位后綴(“k”,“m”,“g”或“t”)的JVM內存字符串相同(例如512m, 2g)。
        *
        */
      .set("spark.executor.memory","5g")
      /**
        * 默認 1在YARN模式下,worker上所有可用的內核在standalone和Mesos粗粒度模式下。
        */
      .set("spark.executor.cores","5")
    val spark: SparkSession = SparkSession.builder().appName("DelRedis").master("local[*]")
      .config("spark.redis.host","192.168.100.201")
      .config("spark.redis.port","6379")
      .config("spark.redis.db","1")     // 可選的數據庫編號。避免使用它,尤其是在集群模式下,redisRedis默認支持16個數據庫,默認是選擇數據庫0,這里設置為1。
      .config("spark.redis.timeout","2000000")   // 連接超時,以毫秒為單位,默認為 2000 毫秒
      .config(conf)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
     //1.寫入數據集
   writeRedis(sc,spark)
    // 2.讀取待刪除的數據key
      val path = "file:///F://delRedisData//test.parquet"
    val rdd: RDD[String] = readParquet(spark,path)
    //3.使用redis 中的 pipeline 方法 進行刪除操作
    rdd.foreachPartition(iter=>{
      // 連接redis客戶端
      val jedis = new Jedis("192.168.100.201",6379)
      val array: Array[String] = iter.toArray
      val length: Int = array.length
      val beginTime: Long = System.currentTimeMillis()
      delPipleine(collectionName,length,array,jedis)
      val endTime: Long = System.currentTimeMillis()
      println("刪除:"+length+"條數據,耗時:"+(endTime-beginTime)/1000+"秒")
    })
    sc.stop()
    spark.stop()
  }
def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {
    try{
        val pipeline: Pipeline = jedis.pipelined()
        // 選擇數據庫  默認為 0
        pipeline.select(1)
        for(i <- 0 to (num - 1) ){
          pipeline.srem(collectionName,arr(i))
        }
        //表示我一次性的異步發(fā)送到redis,不關注執(zhí)行結果
        pipeline.sync()
    }catch {
      case e : JedisException => e.printStackTrace()
    }finally if(jedis !=null) jedis.close()
  }
def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={
    // spark讀取數據集
    val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet")
    df.show(1,false)
    val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r"))
       // 這個集合寫的是2000多萬的數據
    sc.toRedisSET(rdd,"test:task:deplicate")
  }
def readParquet(spark: SparkSession,path:String): RDD[String] ={
    val df: DataFrame = spark.read.parquet(path)
    val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r"))
    // 返回String類型的RDD
    strRDD
  }
  }

4.總結

經檢測:redis 的 pipeline(管道)方法 ,經單機版的redis測試 ,百萬級別數據刪除僅需要1分鐘左右與硬件有關,還包括讀取數據的時長等方面原因

以上就是Spark刪除redis千萬級別set集合數據實現分析的詳細內容,更多關于Spark刪除redis set集合的資料請關注腳本之家其它相關文章!

相關文章

  • 編譯安裝redisd的方法示例詳解

    編譯安裝redisd的方法示例詳解

    這篇文章主要介紹了編譯安裝redisd的方法示例詳解,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-02-02
  • Java實現多級緩存的方法詳解

    Java實現多級緩存的方法詳解

    對于高并發(fā)系統(tǒng)來說,有三個重要的機制來保障其高效運行,它們分別是:緩存、限流和熔斷,所以本文就來和大家探討一下多級緩存的實現方法,希望對大家有所幫助
    2024-02-02
  • 一文詳解如何停止/重啟/啟動Redis服務

    一文詳解如何停止/重啟/啟動Redis服務

    Redis是當前比較熱門的NOSQL系統(tǒng)之一,它是一個key-value存儲系統(tǒng),這篇文章主要給大家介紹了關于如何停止/重啟/啟動Redis服務的相關資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-03-03
  • Redis如何存儲對象與集合示例詳解

    Redis如何存儲對象與集合示例詳解

    redis是一個key-value存儲系統(tǒng)。和Memcached類似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、 zset(sorted set --有序集合)和hash(哈希類型)本文介紹了關于Redis是如何存儲對象與集合的相關資料,需要的朋友可以參考下
    2018-05-05
  • Redis設置Hash數據類型的過期時間

    Redis設置Hash數據類型的過期時間

    在Redis中,我們可以使用Hash數據結構來存儲一組鍵值對,而有時候,我們可能需要設置這些鍵值對的過期時間,本文主要介紹了Redis設置Hash數據類型的過期時間,具有一定的參考價值,感興趣的可以了解一下
    2024-01-01
  • Redis面試必備之緩存設計規(guī)范與性能優(yōu)化詳解

    Redis面試必備之緩存設計規(guī)范與性能優(yōu)化詳解

    你是否在使用Redis時,不清楚Redis應該遵循的設計規(guī)范而苦惱,你是否在Redis出現性能問題時,不知道該如何優(yōu)化而發(fā)愁,快跟隨小編一起學習起來吧
    2024-03-03
  • Redis+Caffeine兩級緩存的實現

    Redis+Caffeine兩級緩存的實現

    本文主要介紹了Redis+Caffeine兩級緩存的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-06-06
  • 關于Redis中bitmap的原理和使用詳解

    關于Redis中bitmap的原理和使用詳解

    這篇文章主要介紹了關于Redis中bitmap的原理和使用詳解,BitMap即位圖,使用每個位表示某種狀態(tài),適合處理整型的海量數據,本質上是哈希表的一種應用實現,需要的朋友可以參考下
    2023-05-05
  • 使用Redis實現向量相似度搜索

    使用Redis實現向量相似度搜索

    在自然語言處理領域,有一個常見且重要的任務就是文本相似度搜索,所以本文為大家介紹一下如何利用Redis實現向量相似度搜索,解決文本、圖像和音頻之間的相似度匹配問題,需要的可以了解下
    2023-07-07
  • 詳解redis端口號

    詳解redis端口號

    在本篇內容中我們給大家整理了關于redis端口號的相關知識點內容,有興趣的朋友們學習下。
    2019-06-06

最新評論