詳解Spark?Sql在UDF中如何引用外部數(shù)據(jù)
前言
Spark Sql可以通過UDF來對DataFrame的Column進(jìn)行自定義操作。在特定場景下定義UDF可能需要用到Spark Context以外的資源或數(shù)據(jù)。比如從List或Map中取值,或是通過連接池從外部的數(shù)據(jù)源中讀取數(shù)據(jù),然后再參與Column的運(yùn)算。
Excutor中每個(gè)task的工作線程都會(huì)對UDF的call進(jìn)行調(diào)用,外部資源的使用發(fā)生在Excutor端,而資源加載既能發(fā)生在Driver端,也可以發(fā)生在Excutor端。如果外部資源對象能序列化,我們可以在Driver端進(jìn)行初始化,然后廣播(broadcast)到Excutor端參與運(yùn)算。對于不能進(jìn)行序列化的對象,如JedisPool(redis連接池),只能在Excutor端進(jìn)行初始化。
因此,在UDF中引用外部資源有以下兩類方法:
- 能序列化:在Driver端進(jìn)行初始化,然后通過spark的broadcast方法廣播到Excutor上進(jìn)行使用;
- 不能序列化:在Excutor端進(jìn)行初始化然后使用。
下面我們將用一個(gè)實(shí)際例子對上述兩種方法進(jìn)行詳細(xì)介紹。
本文使用環(huán)境:Spark-2.3.0,Java 8。
場景介紹
我們以一個(gè)DataFrame(兩個(gè)字段node_1、node_2)作為原始數(shù)據(jù);一棵二叉搜索樹(BST)作為Spark外部被引用數(shù)據(jù);目標(biāo)是定義一個(gè)UDF來判斷:BST中是否剛好存在一個(gè)父節(jié)點(diǎn),它的左右子節(jié)點(diǎn)值與node_1、node_2兩個(gè)字段值相同。然后將判斷結(jié)果輸出到新列is_bro。其中DataFrame:
BST:
輸出DataFrame:
二叉樹的定義與判斷是否為父節(jié)點(diǎn)的左右子節(jié)點(diǎn)的邏輯如下:
import java.io.Serializable; /** * @author wangjiahui * @create 2021-03-14-10:57 */ public class TreeNode implements Serializable{ private Integer val; private TreeNode left; private TreeNode right; public TreeNode() { } public TreeNode(Integer val) { this.val = val; } public TreeNode(Integer val, TreeNode left, TreeNode right) { this.val = val; this.left = left; this.right = right; } public Integer getVal() { return val; } public void setVal(Integer val) { this.val = val; } public TreeNode getLeft() { return left; } public void setLeft(TreeNode left) { this.left = left; } public TreeNode getRight() { return right; } public void setRight(TreeNode right) { this.right = right; } /** * 判斷是否剛好有一個(gè)父節(jié)點(diǎn)的左、右子節(jié)點(diǎn)值與num1、num2相同 * @param num1 * @param num2 * @return */ public Boolean isBro( Integer num1, Integer num2) { if (null == getLeft()||null == getRight()) { return false; } if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) { return true; } return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2); } }
生成上圖所示BST的方法createTree()如下:
public static TreeNode createTree(){ TreeNode[] treeNodes = new TreeNode[8]; for(int i=1; i<=7; i++){ treeNodes[i] = new TreeNode(i); } treeNodes[2].setLeft(treeNodes[1]); treeNodes[2].setRight(treeNodes[3]); treeNodes[6].setLeft(treeNodes[5]); treeNodes[6].setRight(treeNodes[7]); treeNodes[4].setLeft(treeNodes[2]); treeNodes[4].setRight(treeNodes[6]); return treeNodes[4]; }
方法一 Driver端加載
在Driver端完成初始化并定義UDF
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext()); // 初始化樹 TreeNode tree = createTree(); // broadcast Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree); // lambda表達(dá)式定義udf UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> { return broadcastTree.getValue().isBro(num1,num2); }, BooleanType); // 注冊udf spark.udf().register("isBro",udf); // 使用udf df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
方法二 Excutor端加載
如果我們直接在call中進(jìn)行初始化會(huì)存在問題:由于多個(gè)task的線程會(huì)在同一時(shí)刻對UDF中的call進(jìn)行調(diào)用,導(dǎo)致資源對象在同一時(shí)刻被初始化多次,造成Excutor內(nèi)存資源浪費(fèi)。此外,如果外部資源為連接池對象,在同一時(shí)刻初始化多次會(huì)建立多個(gè)連接,增加外部數(shù)據(jù)源的訪問壓力。
為此,我們可以借助單例模式中的懶漢式實(shí)現(xiàn),讓資源在每個(gè)Excutor中只被初始化一次。懶漢式的實(shí)現(xiàn)需要新建一個(gè)類(命名為IsBroUDF2)并實(shí)現(xiàn)UDF2<Integer, Integer, Boolean>接口,重寫UDF2的call方法:
import org.apache.spark.sql.api.java.UDF2; /** * @author wangjiahui * @create 2021-03-14-14:25 */ public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> { // 定義靜態(tài)的TreeNode成員變量 private static volatile TreeNode treeNode; public IsBroUDF2() { } @Override public Boolean call(Integer num1, Integer num2) throws Exception { // 懶漢式 二次判定 if(null==treeNode){ synchronized (IsBroUDF2.class){ if(null==treeNode){ treeNode=createTree(); } } } return treeNode.isBro(num1,num2); } // 輔助方法 public static TreeNode createTree(){ TreeNode[] treeNodes = new TreeNode[8]; for(int i=1; i<=7; i++){ treeNodes[i] = new TreeNode(i); } treeNodes[2].setLeft(treeNodes[1]); treeNodes[2].setRight(treeNodes[3]); treeNodes[6].setLeft(treeNodes[5]); treeNodes[6].setRight(treeNodes[7]); treeNodes[4].setLeft(treeNodes[2]); treeNodes[4].setRight(treeNodes[6]); return treeNodes[4]; } }
然后注冊和使用UDF
// 注冊udf spark.udf().register("isBro",new IsBroUDF2(), BooleanType); // 使用udf df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
在call方法中通過加鎖可以實(shí)現(xiàn)TreeNode資源在同一個(gè)Excutor中只被初始化一次。除了上面介紹的這種懶漢式的寫法之外,還可以通過靜態(tài)內(nèi)部類懶加載、枚舉等方式實(shí)現(xiàn)TreeNode資源在Excutor端只被初始化一次。
小結(jié)
想要在Spark Sql的UDF中使用Spark外的資源和數(shù)據(jù)進(jìn)行運(yùn)算,我們既可以在Driver端預(yù)先進(jìn)行初始化然后廣播到各Excutor上(要求對象能序列化),也可以直接在Excutor端進(jìn)行加載;如果在Excutor端加載要保證外部資源對象只被初始化一次。
以上就是詳解Spark Sql在UDF中如何引用外部數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Spark Sql UDF引用外部數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在Java的Struts中判斷是否調(diào)用AJAX及用攔截器對其優(yōu)化
這篇文章主要介紹了在Java的Struts中判斷是否調(diào)用AJAX及用攔截器對其優(yōu)化的方法,Struts框架是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2016-01-01Mybatis如何按順序查詢出對應(yīng)的數(shù)據(jù)字段
這篇文章主要介紹了Mybatis如何按順序查詢出對應(yīng)的數(shù)據(jù)字段,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Springboot自動(dòng)掃描包路徑來龍去脈示例詳解
這篇文章主要介紹了Springboot自動(dòng)掃描包路徑來龍去脈示例詳解,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12基于注解的springboot+mybatis的多數(shù)據(jù)源組件的實(shí)現(xiàn)代碼
這篇文章主要介紹了基于注解的springboot+mybatis的多數(shù)據(jù)源組件的實(shí)現(xiàn),會(huì)使用到多個(gè)數(shù)據(jù)源,文中通過代碼講解的非常詳細(xì),需要的朋友可以參考下2021-04-04解決Springboot中Feignclient調(diào)用時(shí)版本問題
這篇文章主要介紹了解決Springboot中Feign?client調(diào)用時(shí)版本問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03