第05章-自定义函数和JSON数据解析
  jOqYTgEXfJbg 2023年11月02日 42 0

目录

5.1 实现自定义UDF 2

5.2 实现自定义UDTF 3

5.3 实现自定义UDAF 4

5.4 解析JSON数据 6

5.4.1 解析OBJECT数据 6

5.4.2 解析ARRAY数据 7

5.4.3 禁止使用get_json_object函数 8

第05章 自定义函数和JSON数据解析

自定义函数简介

有一些sql很难处理的逻辑,我们可以使用自定义函数去处理。比如对一个字符串加密、对字符串解密、解析json,调用外部服务等。

5.1 实现自定义UDF

UDF就是一进一出的函数,类似于Spark SQL中的round(四舍五入)函数。

输入是一行数据的某一个字段,转为某一个值。

先引入jar包

<dependency>
<groupId>
org.apache.hive</groupId>
<artifactId>
hive-exec</artifactId>
<version>
${hive.version}</version>
<scope>
provided</scope>
</dependency>

再开发UDF代码,我们在这里实现一个将字符串转为大写的功能

/**
* 将字符串转为大写字符串
*/
@Description(name = "up",value = "_FUNC_(object) - 将字符串转大写字符串")public class Up extends UDF {public String evaluate(Object key) {if(null==key){return null;}return key.toString().toUpperCase();}
}

再编译代码mvn clean package -DskipTests

并将编译后的代码上传到hdfs上

hadoop fs -put /Users/jack/eclipse-workspace/udf/target/udf-1.0-SNAPSHOT.jar /tmp/hive/

启动spark-sql并创建临时UDF函数

create temporary function up as 'udf.Up' using jar 'hdfs:///tmp/hive/udf-1.0-SNAPSHOT.jar';

使用UDF查询数据

spark-sql> select up("aadbbc");

AADBBC

Time taken: 0.074 seconds, Fetched 1 row(s)

项目github地址:

https://github.com/liukunyuan/udf_new.git

5.2 实现自定义UDTF

UDTF就是一进多出的函数,类似于Spark SQL中的split(对字段分割)函数。将输入的一行数据的某一个字段,转为一个数组。

开发UDTF代码,我们在这里实现一个将字符串转为集合对象的功能

@Description(name = "json_array",value = "_FUNC_(array_string) - 将字符串转为集合对象")public class JsonArray extends UDF {public ArrayList<String> evaluate(String jsonString) throws JSONException {if (StringUtils.isBlank(jsonString)) {return null;}
JSONArray extractObject =
new JSONArray(jsonString);ArrayList<String> result = new ArrayList<String>();
for
(int i = 0; i < extractObject.length(); i++) {
result.add(extractObject.get(i).toString())
;}return result;}
}

再编译代码mvn clean package -DskipTests

并将编译后的代码上传到hdfs上

hadoop fs -put /Users/jack/eclipse-workspace/udf/target/udf-1.0-SNAPSHOT.jar /tmp/hive/

启动spark-sql并创建临时UDTF函数

create temporary function json_array as 'udtf.JsonArray' using jar 'hdfs:///tmp/hive/udf-1.0-SNAPSHOT.jar';

使用UDTF查询JSON数据

spark-sql> select json_array("[{ \"name\": \"XiaoHong\", \"age\":18}]");

["{"name":"XiaoHong","age":18}"]

Time taken: 0.055 seconds, Fetched 1 row(s)

spark-sql>

5.3 实现自定义UDAF

UDAF就是多进一出的函数,类似于Spark SQL中的count(求条数)函数。将输入的多行数据的某一个值,转为一个值。

开发UDAF代码,我们在这里实现一个求平均数的功能。

参考:org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleAvg



@Description(name = "avg_num",value = "_FUNC_(col) - 计算平均数")public class AvgNum extends UDAF {public static class UDAFAvgState {private long mCount;
private double
mSum;}public static class UDAFExampleAvgEvaluator implements UDAFEvaluator {
UDAFAvgState
state;
public
UDAFExampleAvgEvaluator() {super();state = new UDAFAvgState();init();}public void init() {state.mSum = 0;state.mCount = 0;}/**
* 遍历所有行
*
@param o* @return*/public boolean iterate(Double o) {if (o != null) {state.mSum += o;state.mCount++;}return true;}/**
* 输出部分聚合结果
*
@return*/public UDAFAvgState terminatePartial() {// This is SQL standard - average of zero items should be null.return state.mCount == 0 ? null : state;}/**
* 提前合并部分数据
*
@param o* @return*/public boolean merge(UDAFAvgState o) {if (o != null) {state.mSum += o.mSum;state.mCount += o.mCount;}return true;}/**
* 终止聚合并返回最终结果
*
@return*/public Double terminate() {// This is SQL standard - average of zero items should be null.return state.mCount == 0 ? null : Double.valueOf(state.mSum/ state.mCount);}
}
}

再编译代码mvn clean package -DskipTests

并将编译后的代码上传到hdfs上

hadoop fs -put /Users/jack/eclipse-workspace/udf/target/udf-1.0-SNAPSHOT.jar /tmp/hive/

hadoop fs -put /Users/jack/Desktop/test.csv hdfs:///tmp/hive/csv/

启动spark-sql并创建临时UDAF函数

create table default.csv(data int)

location '/tmp/hive/csv';

create temporary function avg_num as 'udaf.AvgNum' using jar 'hdfs:///tmp/hive/udf-1.0-SNAPSHOT.jar';

使用UDAF查询数据

spark-sql> select avg_num(data) ,avg(data) from default.csv;

93.4 93.4

Time taken: 1.888 seconds, Fetched 1 row(s)

可以发现,我们实现的求平均数和引擎自带的avg功能一致。

5.4 解析JSON数据

在大数据处理中,我们经常会遇到一些json格式的数据,所以我们必须掌握如何在Spark SQL中处理json数据。而json数据分为json object格式和json array格式。而我们将通过如下学习掌握这2种json格式数据的通用解析方法。

5.4.1 解析OBJECT数据

通过lateral view outer json_tuple解析json object类型数据,单次可以解析多个key,性能比get_json_object好。

第05章-自定义函数和JSON数据解析_hive

创建存放json数据的目录

hadoop fs -mkdir hdfs:///tmp/hive/json

将JSON数据上传到hdfs

hadoop fs -put /Users/jack/Desktop/test.json hdfs:///tmp/hive/json/

create table default.test (data string)

location '/tmp/hive/json';

书写sql查询数据里面的所有class

select t2.class

from default.test t

lateral view outer json_tuple(data,'class','cmd') t2 as class,cmd

limit 10;

注意:不要使用get_json_object处理json object数据,如果解析多个字段,会重复解析多次,性能很差。

5.4.2 解析ARRAY数据

通过我们前面编写的自定义函数json_array将字符串转成array类型数据,然后通过LATERAL view explode将array进行行转列。通过这种方式解析json array数据。

书写sql查询第二层的class字段

select t4.cmd_class

from default.test t

lateral view outer json_tuple(data,'class','cmd') t2 as class,cmd

LATERAL view explode(json_array(cmd)) t3 as tmp_object

lateral view outer json_tuple(tmp_object,'class') t4 as cmd_class

limit 10;

5.4.3 禁止使用get_json_object函数

这是我们优化json解析的生产案例:

优化前,解析json需要执行360分钟。优化后,解析json只需要执行24分钟,性能优化了15倍。

这是优化前的sql,有1000个get_json_object解析

select

t1.biz_no,

from_unixtime(cast(t1.date_created/1000 as int),'yyyy-MM-dd') as date_created,

get_json_object(t1.req_data,'$.hfwCredit_cust.casesNum') casesNum,

get_json_object(t1.req_data,'$.hfwCredit_cust.breakFaithNum') breakFaithNum,

get_json_object(t1.req_data,'$.caseInfo.custNo') cust_no,

get_json_object(t1.req_data,'$.caseInfo.adjDate') adjDate

……………………

from hdp_credit.xx t1 where pday='${pday}';

这是优化后的sql,使用lateral view outer json_tuple进行json解析。

select biz_no, pday, date_created, casesNum, breakFaithNum, adjDate, custNo

from (

select t1.biz_no,

pday,

from_unixtime(cast(t1.date_created / 1000 as int), 'yyyy-MM-dd') as date_created, map_array(t1.resp_data, 'dcItem,dcVal') as resp_data_json,

t2.*

t3.*

t4.*

from

(select * from hdp_credit.xx

where pday='20220424' ) t1

lateral view outer json_tuple(t1.req_data,'caseInfo','hfwCredit_cust') t2 as caseInfo,hfwCredit_cust

lateral view outer json_tuple(hfwCredit_cust,'breakFaithNum','casesNum') t3 asbreakFaithNum,casesNum lateral view outer json_tuple(caseInfo,'adjDate','custNo') t4 as adjDate,custNo

)tt

;

来自视频:《Spark SQL性能优化》

链接地址:

https://edu.51cto.com/course/34516.html

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
jOqYTgEXfJbg
最新推荐 更多

2024-05-31