一、UDAF 回顾
- 1.定义:UDAF(User Defined Aggregation Funcation ) 用户自定义聚类方法,和group by联合使用,接受多个输入数据行,并产生一个输出数据行。
- 2.Hive有两种UDAF:简单和通用
简单:利用抽象类UDAF和UDAFEvaluator,使用Java反射导致性能损失,且有些特性不能使用,如可变长度参数列表 。
通用:利用接口GenericUDAFResolver2(或抽象类AbstractGenericUDAFResolver)和抽象类GenericUDAFEvaluator,可以使用所有功能,但比较复杂,不直观。 - 3.一个计算函数必须实现的5个方法的具体含义如下:
init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输 入值合法或者正确计算了,则就返回true。
terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。二、需求
使用UDAF简单方式实现统计区域产品用户访问排名三、自定义UDAF函数代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88package hive.org.ruozedata;
import java.util.*;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.log4j.Logger;
public class UserClickUDAF extends UDAF {
// 日志对象初始化
public static Logger logger = Logger.getLogger(UserClickUDAF.class);
// 静态类实现UDAFEvaluator
public static class Evaluator implements UDAFEvaluator {
// 设置成员变量,存储每个统计范围内的总记录数
private static Map<String, String> courseScoreMap;
private static Map<String, String> city_info;
private static Map<String, String> product_info;
private static Map<String, String> user_click;
//初始化函数,map和reduce均会执行该函数,起到初始化所需要的变量的作用
public Evaluator() {
init();
}
// 初始化函数间传递的中间变量
public void init() {
courseScoreMap = new HashMap<String, String>();
city_info = new HashMap<String, String>();
product_info = new HashMap<String, String>();
}
//map阶段,返回值为boolean类型,当为true则程序继续执行,当为false则程序退出
public boolean iterate(String pcid, String pcname, String pccount) {
if (pcid == null || pcname == null || pccount == null) {
return true;
}
if (pccount.equals("-1")) {
// 城市表
city_info.put(pcid, pcname);
}
else if (pccount.equals("-2")) {
// 产品表
product_info.put(pcid, pcname);
}
else {
// 处理用户点击关联
unionCity_Prod_UserClic1(pcid, pcname, pccount);
}
return true;
}
// 处理用户点击关联
private void unionCity_Prod_UserClic1(String pcid, String pcname, String pccount) {
if (product_info.containsKey(pcid)) {
if (city_info.containsKey(pcname)) {
String city_name = city_info.get(pcname);
String prod_name = product_info.get(pcid);
String cp_name = city_name + prod_name;
// 如果之前已经Put过Key值为区域信息,则把记录相加处理
if (courseScoreMap.containsKey(cp_name)) {
int pcrn = 0;
String strTemp = courseScoreMap.get(cp_name);
String courseScoreMap_pn
= strTemp.substring(strTemp.lastIndexOf("\t".toString())).trim();
pcrn = Integer.parseInt(pccount) + Integer.parseInt(courseScoreMap_pn);
courseScoreMap.put(cp_name, city_name + "\t" + prod_name + "\t"+ Integer.toString(pcrn));
}
else {
courseScoreMap.put(cp_name, city_name + "\t" + prod_name + "\t"+ pccount);
}
}
}
}
/**
* 类似于combiner,在map范围内做部分聚合,将结果传给merge函数中的形参mapOutput
* 如果需要聚合,则对iterator返回的结果处理,否则直接返回iterator的结果即可
*/
public Map<String, String> terminatePartial() {
return courseScoreMap;
}
// reduce 阶段,用于逐个迭代处理map当中每个不同key对应的 terminatePartial的结果
public boolean merge(Map<String, String> mapOutput) {
this.courseScoreMap.putAll(mapOutput);
return true;
}
// 处理merge计算完成后的结果,即对merge完成后的结果做最后的业务处理
public String terminate() {
return courseScoreMap.toString();
}
}
}
四、创建hive中的临时函数
1 | DROP TEMPORARY FUNCTION user_click; |
五、调用自定义UDAF函数处理数据
1 | insert overwrite directory '/works/tmp1' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' |
六、创建Hive临时外部表
1 | create external table tmp1( |
七、统计最终区域前3产品排名
1 | select * from ( |