欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Spark?Sql在UDF中如何引用外部數(shù)據(jù)

 更新時(shí)間:2023年02月01日 11:02:15   作者:KYs_Daddy  
這篇文章主要為大家介紹了詳解Spark?Sql在UDF中如何引用外部數(shù)據(jù)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

Spark Sql可以通過UDF來對DataFrame的Column進(jìn)行自定義操作。在特定場景下定義UDF可能需要用到Spark Context以外的資源或數(shù)據(jù)。比如從List或Map中取值,或是通過連接池從外部的數(shù)據(jù)源中讀取數(shù)據(jù),然后再參與Column的運(yùn)算。

Excutor中每個(gè)task的工作線程都會(huì)對UDF的call進(jìn)行調(diào)用,外部資源的使用發(fā)生在Excutor端,而資源加載既能發(fā)生在Driver端,也可以發(fā)生在Excutor端。如果外部資源對象能序列化,我們可以在Driver端進(jìn)行初始化,然后廣播(broadcast)到Excutor端參與運(yùn)算。對于不能進(jìn)行序列化的對象,如JedisPool(redis連接池),只能在Excutor端進(jìn)行初始化。

因此,在UDF中引用外部資源有以下兩類方法:

  • 能序列化:在Driver端進(jìn)行初始化,然后通過spark的broadcast方法廣播到Excutor上進(jìn)行使用;
  • 不能序列化:在Excutor端進(jìn)行初始化然后使用。

下面我們將用一個(gè)實(shí)際例子對上述兩種方法進(jìn)行詳細(xì)介紹。

本文使用環(huán)境:Spark-2.3.0,Java 8。

場景介紹

我們以一個(gè)DataFrame(兩個(gè)字段node_1、node_2)作為原始數(shù)據(jù);一棵二叉搜索樹(BST)作為Spark外部被引用數(shù)據(jù);目標(biāo)是定義一個(gè)UDF來判斷:BST中是否剛好存在一個(gè)父節(jié)點(diǎn),它的左右子節(jié)點(diǎn)值與node_1、node_2兩個(gè)字段值相同。然后將判斷結(jié)果輸出到新列is_bro。其中DataFrame:

BST:

輸出DataFrame:

二叉樹的定義與判斷是否為父節(jié)點(diǎn)的左右子節(jié)點(diǎn)的邏輯如下:

import java.io.Serializable;
/**
 * @author wangjiahui
 * @create 2021-03-14-10:57
 */
public class TreeNode implements Serializable{
    private Integer val;
    private TreeNode left;
    private TreeNode right;
    public TreeNode() {
    }
    public TreeNode(Integer val) {
        this.val = val;
    }
    public TreeNode(Integer val, TreeNode left, TreeNode right) {
        this.val = val;
        this.left = left;
        this.right = right;
    }
    public Integer getVal() {
        return val;
    }
    public void setVal(Integer val) {
        this.val = val;
    }
    public TreeNode getLeft() {
        return left;
    }
    public void setLeft(TreeNode left) {
        this.left = left;
    }
    public TreeNode getRight() {
        return right;
    }
    public void setRight(TreeNode right) {
        this.right = right;
    }
    /**
     * 判斷是否剛好有一個(gè)父節(jié)點(diǎn)的左、右子節(jié)點(diǎn)值與num1、num2相同
     * @param num1
     * @param num2
     * @return
     */
    public Boolean isBro( Integer num1, Integer num2) {
        if (null == getLeft()||null == getRight()) {
            return false;
        }
        if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) {
            return true;
        }
        return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2);
    }
}

生成上圖所示BST的方法createTree()如下:

public static TreeNode createTree(){
    TreeNode[] treeNodes = new TreeNode[8];
    for(int i=1; i<=7; i++){
        treeNodes[i] =  new TreeNode(i);
    }
    treeNodes[2].setLeft(treeNodes[1]);
    treeNodes[2].setRight(treeNodes[3]);
    treeNodes[6].setLeft(treeNodes[5]);
    treeNodes[6].setRight(treeNodes[7]);
    treeNodes[4].setLeft(treeNodes[2]);
    treeNodes[4].setRight(treeNodes[6]);
    return treeNodes[4];
}

方法一 Driver端加載

在Driver端完成初始化并定義UDF

JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
//  初始化樹
TreeNode tree = createTree();
//  broadcast
Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree);
//  lambda表達(dá)式定義udf
UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> {
    return broadcastTree.getValue().isBro(num1,num2);
}, BooleanType);
//  注冊udf
spark.udf().register("isBro",udf);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

方法二 Excutor端加載

如果我們直接在call中進(jìn)行初始化會(huì)存在問題:由于多個(gè)task的線程會(huì)在同一時(shí)刻對UDF中的call進(jìn)行調(diào)用,導(dǎo)致資源對象在同一時(shí)刻被初始化多次,造成Excutor內(nèi)存資源浪費(fèi)。此外,如果外部資源為連接池對象,在同一時(shí)刻初始化多次會(huì)建立多個(gè)連接,增加外部數(shù)據(jù)源的訪問壓力。

為此,我們可以借助單例模式中的懶漢式實(shí)現(xiàn),讓資源在每個(gè)Excutor中只被初始化一次。懶漢式的實(shí)現(xiàn)需要新建一個(gè)類(命名為IsBroUDF2)并實(shí)現(xiàn)UDF2<Integer, Integer, Boolean>接口,重寫UDF2的call方法:

import org.apache.spark.sql.api.java.UDF2;
/**
 * @author wangjiahui
 * @create 2021-03-14-14:25
 */
public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> {
    // 定義靜態(tài)的TreeNode成員變量
    private static volatile TreeNode treeNode;
    public IsBroUDF2() {
    }
    @Override
    public Boolean call(Integer num1, Integer num2) throws Exception {
//        懶漢式 二次判定
        if(null==treeNode){
            synchronized (IsBroUDF2.class){
                if(null==treeNode){
                    treeNode=createTree();
                }
            }
        }
        return treeNode.isBro(num1,num2);
    }
    // 輔助方法
    public static TreeNode createTree(){
        TreeNode[] treeNodes = new TreeNode[8];
        for(int i=1; i<=7; i++){
            treeNodes[i] =  new TreeNode(i);
        }
        treeNodes[2].setLeft(treeNodes[1]);
        treeNodes[2].setRight(treeNodes[3]);
        treeNodes[6].setLeft(treeNodes[5]);
        treeNodes[6].setRight(treeNodes[7]);
        treeNodes[4].setLeft(treeNodes[2]);
        treeNodes[4].setRight(treeNodes[6]);
        return treeNodes[4];
    }
}

然后注冊和使用UDF

//  注冊udf
spark.udf().register("isBro",new IsBroUDF2(), BooleanType);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

在call方法中通過加鎖可以實(shí)現(xiàn)TreeNode資源在同一個(gè)Excutor中只被初始化一次。除了上面介紹的這種懶漢式的寫法之外,還可以通過靜態(tài)內(nèi)部類懶加載、枚舉等方式實(shí)現(xiàn)TreeNode資源在Excutor端只被初始化一次。

小結(jié)

想要在Spark Sql的UDF中使用Spark外的資源和數(shù)據(jù)進(jìn)行運(yùn)算,我們既可以在Driver端預(yù)先進(jìn)行初始化然后廣播到各Excutor上(要求對象能序列化),也可以直接在Excutor端進(jìn)行加載;如果在Excutor端加載要保證外部資源對象只被初始化一次。

以上就是詳解Spark Sql在UDF中如何引用外部數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Spark Sql UDF引用外部數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 詳解java中DelayQueue的使用

    詳解java中DelayQueue的使用

    這篇文章主要介紹了java中DelayQueue的使用,幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下
    2020-10-10
  • 在Java的Struts中判斷是否調(diào)用AJAX及用攔截器對其優(yōu)化

    在Java的Struts中判斷是否調(diào)用AJAX及用攔截器對其優(yōu)化

    這篇文章主要介紹了在Java的Struts中判斷是否調(diào)用AJAX及用攔截器對其優(yōu)化的方法,Struts框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下
    2016-01-01
  • selenium+java環(huán)境搭建過程推薦

    selenium+java環(huán)境搭建過程推薦

    這篇文章主要介紹了selenium+java環(huán)境搭建過程推薦,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-10-10
  • java實(shí)現(xiàn)文件復(fù)制上傳操作

    java實(shí)現(xiàn)文件復(fù)制上傳操作

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)文件復(fù)制上傳操作,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-11-11
  • Mybatis如何按順序查詢出對應(yīng)的數(shù)據(jù)字段

    Mybatis如何按順序查詢出對應(yīng)的數(shù)據(jù)字段

    這篇文章主要介紹了Mybatis如何按順序查詢出對應(yīng)的數(shù)據(jù)字段,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • Java8中CompletableFuture的用法全解

    Java8中CompletableFuture的用法全解

    這篇文章主要給大家介紹了關(guān)于Java8中CompletableFuture用法的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-01-01
  • Springboot自動(dòng)掃描包路徑來龍去脈示例詳解

    Springboot自動(dòng)掃描包路徑來龍去脈示例詳解

    這篇文章主要介紹了Springboot自動(dòng)掃描包路徑來龍去脈示例詳解,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-12-12
  • 匯總java調(diào)用python方法

    匯總java調(diào)用python方法

    這篇文章主要為大家詳細(xì)介紹了java調(diào)用python的方法,文章中介紹了三種java調(diào)用python方法,感興趣的朋友可以參考一下
    2016-02-02
  • 基于注解的springboot+mybatis的多數(shù)據(jù)源組件的實(shí)現(xiàn)代碼

    基于注解的springboot+mybatis的多數(shù)據(jù)源組件的實(shí)現(xiàn)代碼

    這篇文章主要介紹了基于注解的springboot+mybatis的多數(shù)據(jù)源組件的實(shí)現(xiàn),會(huì)使用到多個(gè)數(shù)據(jù)源,文中通過代碼講解的非常詳細(xì),需要的朋友可以參考下
    2021-04-04
  • 解決Springboot中Feignclient調(diào)用時(shí)版本問題

    解決Springboot中Feignclient調(diào)用時(shí)版本問題

    這篇文章主要介紹了解決Springboot中Feign?client調(diào)用時(shí)版本問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03

最新評論