皮尔逊相关系数简介

概述

皮尔逊相关系数(Pearson correlation coefficient)也称皮尔逊积矩相关系数(Pearson product-moment correlation coefficient),是一种线性相关系数。皮尔森相关系数是用来反映两个变量线性相关程度的统计量。

其中相关系数用r表示,n为样本量,分别为两个变量的观测值和均值。r描述的是两个变量间线性相关强弱的程度。r的绝对值越大、表明相关性越强。可以简单理解相关系数r为分别对x和y基于自身总体标准化后计算空间向量的余弦夹角。

这里的 1/n-1 可以约掉,所以是更下面的图。

公式

原公式

公式.png

分子分母相约后的公式

公式2.png

环境说明

TDH

475 版本,不是 5 以上容器的版本,也不是 6 版本的 Kubernetes 上。如果报错,可以去具体 Spark SQL 的日志里面去看什么错误。

Hive UDAF 执行流程

Model 和 Evaluator 的关系。来源于

Mode 各个阶段对应的 Evaluator 方法调用。

Model 和 Evaluator 的关系.png

Evaluator 各个阶段下处理 MapReduce 的流程

Evaluator 各个阶段下处理 MapReduce 的流程.png

代码

编写 UDAF 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package me.young1lin.hive;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

import java.util.ArrayList;
import java.util.List;

import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;

/**
* @author young1lin
* @version 1.0
* @date 2020/10/30 2:52 下午
*/
@Description(name = "regression", value = "_FUNC_(double x,avg(x),double y,avg(y)) - computes the simple linear regression")
public class LinearRegressionUDAF extends AbstractGenericUDAFResolver {

@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
ObjectInspector[] inputOIs = info.getParameterObjectInspectors();
// 参数个数校验
if (inputOIs.length != 4) {
throw new UDFArgumentLengthException("except 4 params,but " + inputOIs.length);
}
// 判断参数是否是 double 类型
for (ObjectInspector tmp : inputOIs) {
if (tmp.getCategory() != ObjectInspector.Category.PRIMITIVE
|| ((PrimitiveObjectInspector) tmp).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.DOUBLE) {
throw new UDFArgumentException("only support double");
}
}

return new LinearRegressionUDAFEvaluator();
}

// 二选一,就是最开始的函数入参校验的,我比较喜欢用上面那个
// @Override
// public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
// return super.getEvaluator(info);
// }

private static class LinearRegressionUDAFEvaluator extends GenericUDAFEvaluator {
/**
* 仅用于 init 方法中返回类型声明
*/
private StandardListObjectInspector javaDoubleListInspector = ObjectInspectorFactory.getStandardListObjectInspector(javaDoubleObjectInspector);

private DoubleObjectInspector doubleObjectInspector = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;

private DoubleObjectInspector[] originalDataOIs = new DoubleObjectInspector[4];

/**
* 这个函数一定要重写,如果是自定义 UDAF 函数,Mode 的几个状态一定要了解的
*
* @param m 状态
* @param parameters 入参
* @return 当前阶段该接受什么类型的 Inspector
*/
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
ObjectInspector result;
if (Mode.PARTIAL1.equals(m)) {
processOriginalDataObjectInspector(parameters);
result = javaDoubleListInspector;
} else if (Mode.PARTIAL2.equals(m)) {
result = javaDoubleListInspector;
} else if (Mode.FINAL.equals(m)) {
result = doubleObjectInspector;
} else {
// COMPLETE 阶段
result = doubleObjectInspector;
}
return result;
}

private void processOriginalDataObjectInspector(ObjectInspector[] parameters) {
originalDataOIs[0] = (DoubleObjectInspector) parameters[0];
originalDataOIs[1] = (DoubleObjectInspector) parameters[1];
originalDataOIs[2] = (DoubleObjectInspector) parameters[2];
originalDataOIs[3] = (DoubleObjectInspector) parameters[3];
}

/**
* 从这往下方法,依次在不同阶段执行
*
* @return AggregationBuffer 这个被标记弃用, 打算用 {@link AbstractAggregationBuffer}这个代替,
* 因为内存的一些原因,未来会隐藏这个废弃的接口
* @deprecated use {@link AbstractAggregationBuffer} instead
*
*/
@Override
public AggregationBuffer getNewAggregationBuffer() {
LinearRegressionAggregationBuffer buffer = new LinearRegressionAggregationBuffer();
buffer.reset();
return buffer;
}

@Override
public void reset(AggregationBuffer agg) {
LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg;
buffer.reset();
}

@Override
public void iterate(AggregationBuffer agg, Object[] inputs) {
LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg;

double x = PrimitiveObjectInspectorUtils.getDouble(inputs[0], originalDataOIs[0]);
double xBar = PrimitiveObjectInspectorUtils.getDouble(inputs[1], originalDataOIs[1]);
double y = PrimitiveObjectInspectorUtils.getDouble(inputs[2], originalDataOIs[2]);
double yBar = PrimitiveObjectInspectorUtils.getDouble(inputs[3], originalDataOIs[3]);

buffer.xBarSubReMultiplyYBarSubReSum += buffer.reminderMultiply(x, xBar, y, yBar);
buffer.xBarSubReSquareSum += buffer.reminderSquare(x, xBar);
buffer.yBarSubReSquareSum += buffer.reminderSquare(y, yBar);

}

@Override
public Object terminatePartial(AggregationBuffer agg) {
LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg;
ArrayList<Double> list = new ArrayList<>(3);
list.add(buffer.xBarSubReMultiplyYBarSubReSum);
list.add(buffer.xBarSubReSquareSum);
list.add(buffer.yBarSubReSquareSum);
return list;
}

/**
* 合并操作
* @param agg init 获得的 agg 对象
* @param param 上一步返回的对象的包装对象
*/
@Override
public void merge(AggregationBuffer agg, Object param) {
LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg;
// terminatePartial 返回的是 List<Double>,所以这里可以直接强转为 List,double 在 Hive 执行过程中会自动转为 DoubleWritable 类型
List<DoubleWritable> list = (List<DoubleWritable>)param;
buffer.xBarSubReMultiplyYBarSubReSum += list.get(0).get();
buffer.xBarSubReSquareSum += list.get(1).get();
buffer.yBarSubReSquareSum += list.get(2).get();
}

@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg;
// 就是图中所示的公式
double value = buffer.xBarSubReMultiplyYBarSubReSum / Math.sqrt(buffer.xBarSubReSquareSum * buffer.yBarSubReSquareSum);
return new DoubleWritable(value);
}

static class LinearRegressionAggregationBuffer extends AbstractAggregationBuffer {

/**
* x bar 指的是 x 的平均值
* sun((x - xBar)*(y - yBar))
*/
double xBarSubReMultiplyYBarSubReSum;

/**
* sum((x - xBar)^2)
*/
double xBarSubReSquareSum;

/**
* sum((y - yBar)^2)
*/
double yBarSubReSquareSum;

double reminderMultiply(double x, double xBar, double y, double yBar) {
return (x - xBar) * (y - yBar);
}

/**
* 求两个数,一个是当前值,一个是平均值,他们相减得到的平方值
*
* @param v value
* @param vBar avg(v)
* @return (v - vBar) * (v - vBar)
*/
double reminderSquare(double v, double vBar) {
return (v - vBar) * (v - vBar);
}

void reset() {
xBarSubReMultiplyYBarSubReSum = 0;
xBarSubReSquareSum = 0;
yBarSubReSquareSum = 0;
}
}
}

}

将上面的类打成 jar 包。可以用 maven 打包(非 shade 方式),名称在 <build> 标签里写 <finalName>这里是你的 jar 的最终名称</finalName名称可以自定义,也可以我取的名字 custom_function_tools.jar。也可以用Build -> Build Artifacts.

上传至 HDFS

1
hadoop fs -put custom_function_tools.jar /tmp

hadoop fs -put custom_function_tools.jar[这里是你打包好的 jar 包] /tmp[这里是你要传到 HDFS 的文件夹路径] 。

如果没有 tmp 文件夹,则执行以下命令

1
hadoop fs -mkdir /tmp

测试相关 SQL

建表语句

我这边想用里面的 ORC 表来使用事务,但是没用,至少 TDH 的没用,之前也测过,在 Spark SQL 的日志上看了,有日志,但没有回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
create table linear(
s string DEFAULT NULL COMMENT '描述'
x double DEFAULT 0 COMMENT '数值'
y double DEFAULT 0 COMMENT '数值'
)
COMMENT '测试线性回归方程相关系数表'
CLUSTERED BY(s)
INTO 23 BUCKETS
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES('serialization.format'='1')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TABLEPROPERTIES(
'numFiles'='23',
'transactional'='true',
'COLUMN_STATS_ACCURATE'='true',
'totalSize'='7075',
'numRows'='1',
'rawDataSize'='1')

插入数据

1
2
3
4
5
6
insert into table linear value('1',5,9);
insert into table linear value('2',4,10);
insert into table linear value('3',12,20);
insert into table linear value('4',59.555,40.2111);
insert into table linear value('5',6.42,34.14);
insert into table linear value('6',9,27);

创建临时函数(二选一,测试时推荐)

可以创建临时函数方便测试,重启后就没的那种,也可以创建永久函数。

1
create TEMPORARY FUNCTION regression as 'me.young1lin.hive.LinearRegressionUDAF' using JAR 'hdfs:///tmp/custom_function_tools.jar'

这里的 hdfs:///tmp/custom_function_tools.jar 是三个 / ,名称是你刚上传的 jar 的名称。

前两个 / 和 hdfs:表示的是协议,最后一个 / 表示根路径,可以加具体的 IP 地址或者域名,因为这个是在同一个集群里面的,所以我这边不用加。

创建永久函数(二选一)

1
create PERMANENT FUNCTION regression as 'me.young1lin.hive.LinearRegressionUDAF' using JAR 'hdfs:///tmp/custom_function_tools.jar'

查询示例

1
select regression(l.x,xx.xbar,l.y,xx.ybar)from linear l,(select avg(x) as xbar,avg(y) ybar from linear) as xx;

查询返回的以下结果。当然你可以 group by 其中一个字段,得出不同的结果,当 x 或者 y 存在相同的数值时,就有了不同效果。

如图所示

image.png