皮尔逊相关系数简介 概述 皮尔逊相关系数(Pearson correlation coefficient)也称皮尔逊积矩相关系数(Pearson product-moment correlation coefficient),是一种线性相关系数。皮尔森相关系数是用来反映两个变量线性相关程度的统计量。
其中相关系数用 r 表示,n为样本量,分别为两个变量的观测值和均值。r 描述的是两个变量间线性相关强弱的程度。r 的绝对值越大、表明相关性越强。可以简单理解相关系数r为分别对x和y基于自身总体标准化后计算空间向量的余弦夹角。
这里的 1/n-1 可以约掉,所以是更下面的图。
公式 原公式
分子分母相约后的公式
环境说明 TDH 475 版本,不是 5 以上容器的版本,也不是 6 版本的 Kubernetes 上。如果报错,可以去具体 Spark SQL 的日志里面去看什么错误。
Hive UDAF 执行流程 Model 和 Evaluator 的关系。来源于
Mode 各个阶段对应的 Evaluator 方法调用。
Evaluator 各个阶段下处理 MapReduce 的流程
代码 编写 UDAF 函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 package me.young1lin.hive;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;import org.apache.hadoop.hive.serde2.io.DoubleWritable;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;import java.util.ArrayList;import java.util.List;import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;@Description (name = "regression" , value = "_FUNC_(double x,avg(x),double y,avg(y)) - computes the simple linear regression" )public class LinearRegressionUDAF extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getEvaluator (GenericUDAFParameterInfo info) throws SemanticException { ObjectInspector[] inputOIs = info.getParameterObjectInspectors(); if (inputOIs.length != 4 ) { throw new UDFArgumentLengthException("except 4 params,but " + inputOIs.length); } for (ObjectInspector tmp : inputOIs) { if (tmp.getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) tmp).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.DOUBLE) { throw new UDFArgumentException("only support double" ); } } return new LinearRegressionUDAFEvaluator(); } private static class LinearRegressionUDAFEvaluator extends GenericUDAFEvaluator { private StandardListObjectInspector javaDoubleListInspector = ObjectInspectorFactory.getStandardListObjectInspector(javaDoubleObjectInspector); private DoubleObjectInspector doubleObjectInspector = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; private DoubleObjectInspector[] originalDataOIs = new DoubleObjectInspector[4 ]; @Override public ObjectInspector init (Mode m, ObjectInspector[] parameters) throws HiveException { super .init(m, parameters); ObjectInspector result; if (Mode.PARTIAL1.equals(m)) { processOriginalDataObjectInspector(parameters); result = javaDoubleListInspector; } else if (Mode.PARTIAL2.equals(m)) { result = javaDoubleListInspector; } else if (Mode.FINAL.equals(m)) { result = doubleObjectInspector; } else { result = doubleObjectInspector; } return result; } private void processOriginalDataObjectInspector (ObjectInspector[] parameters) { originalDataOIs[0 ] = (DoubleObjectInspector) parameters[0 ]; originalDataOIs[1 ] = (DoubleObjectInspector) parameters[1 ]; originalDataOIs[2 ] = (DoubleObjectInspector) parameters[2 ]; originalDataOIs[3 ] = (DoubleObjectInspector) parameters[3 ]; } @Override public AggregationBuffer getNewAggregationBuffer () { LinearRegressionAggregationBuffer buffer = new LinearRegressionAggregationBuffer(); buffer.reset(); return buffer; } @Override public void reset (AggregationBuffer agg) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; buffer.reset(); } @Override public void iterate (AggregationBuffer agg, Object[] inputs) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; double x = PrimitiveObjectInspectorUtils.getDouble(inputs[0 ], originalDataOIs[0 ]); double xBar = PrimitiveObjectInspectorUtils.getDouble(inputs[1 ], originalDataOIs[1 ]); double y = PrimitiveObjectInspectorUtils.getDouble(inputs[2 ], originalDataOIs[2 ]); double yBar = PrimitiveObjectInspectorUtils.getDouble(inputs[3 ], originalDataOIs[3 ]); buffer.xBarSubReMultiplyYBarSubReSum += buffer.reminderMultiply(x, xBar, y, yBar); buffer.xBarSubReSquareSum += buffer.reminderSquare(x, xBar); buffer.yBarSubReSquareSum += buffer.reminderSquare(y, yBar); } @Override public Object terminatePartial (AggregationBuffer agg) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; ArrayList<Double> list = new ArrayList<>(3 ); list.add(buffer.xBarSubReMultiplyYBarSubReSum); list.add(buffer.xBarSubReSquareSum); list.add(buffer.yBarSubReSquareSum); return list; } @Override public void merge (AggregationBuffer agg, Object param) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; List<DoubleWritable> list = (List<DoubleWritable>)param; buffer.xBarSubReMultiplyYBarSubReSum += list.get(0 ).get(); buffer.xBarSubReSquareSum += list.get(1 ).get(); buffer.yBarSubReSquareSum += list.get(2 ).get(); } @Override public Object terminate (AggregationBuffer agg) throws HiveException { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; double value = buffer.xBarSubReMultiplyYBarSubReSum / Math.sqrt(buffer.xBarSubReSquareSum * buffer.yBarSubReSquareSum); return new DoubleWritable(value); } static class LinearRegressionAggregationBuffer extends AbstractAggregationBuffer { double xBarSubReMultiplyYBarSubReSum; double xBarSubReSquareSum; double yBarSubReSquareSum; double reminderMultiply (double x, double xBar, double y, double yBar) { return (x - xBar) * (y - yBar); } double reminderSquare (double v, double vBar) { return (v - vBar) * (v - vBar); } void reset () { xBarSubReMultiplyYBarSubReSum = 0 ; xBarSubReSquareSum = 0 ; yBarSubReSquareSum = 0 ; } } } }
将上面的类打成 jar 包。可以用 maven 打包(非 shade 方式),名称在 <build> 标签里写 <finalName>这里是你的 jar 的最终名称</finalName
名称可以自定义,也可以我取的名字 custom_function_tools.jar。也可以用Build -> Build Artifacts
.
上传至 HDFS 1 hadoop fs -put custom_function_tools.jar /tmp
hadoop fs -put custom_function_tools.jar[这里是你打包好的 jar 包] /tmp[这里是你要传到 HDFS 的文件夹路径] 。
如果没有 tmp 文件夹,则执行以下命令
测试相关 SQL 建表语句 我这边想用里面的 ORC 表来使用事务,但是没用,至少 TDH 的没用,之前也测过,在 Spark SQL 的日志上看了,有日志,但没有回滚。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 create table linear( s string DEFAULT NULL COMMENT '描述' x double DEFAULT 0 COMMENT '数值' y double DEFAULT 0 COMMENT '数值' ) COMMENT '测试线性回归方程相关系数表' CLUSTERED BY (s) INTO 23 BUCKETSROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES('serialization.format' ='1' )STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' TABLEPROPERTIES( 'numFiles' ='23' , 'transactional' ='true' , 'COLUMN_STATS_ACCURATE' ='true' , 'totalSize' ='7075' , 'numRows' ='1' , 'rawDataSize' ='1' )
插入数据 1 2 3 4 5 6 insert into table linear value ('1' ,5 ,9 );insert into table linear value ('2' ,4 ,10 );insert into table linear value ('3' ,12 ,20 );insert into table linear value ('4' ,59.555 ,40.2111 );insert into table linear value ('5' ,6.42 ,34.14 );insert into table linear value ('6' ,9 ,27 );
创建临时函数(二选一,测试时推荐) 可以创建临时函数方便测试,重启后就没的那种,也可以创建永久函数。
1 create TEMPORARY FUNCTION regression as 'me.young1lin.hive.LinearRegressionUDAF' using JAR 'hdfs:///tmp/custom_function_tools.jar'
这里的 hdfs:///tmp/custom_function_tools.jar
是三个 / ,名称是你刚上传的 jar 的名称。
前两个 /
和 hdfs:表示的是协议,最后一个 /
表示根路径,可以加具体的 IP 地址或者域名,因为这个是在同一个集群里面的,所以我这边不用加。
创建永久函数(二选一) 1 create PERMANENT FUNCTION regression as 'me.young1lin.hive.LinearRegressionUDAF' using JAR 'hdfs:///tmp/custom_function_tools.jar'
查询示例 1 select regression(l.x,xx.xbar,l.y,xx.ybar)from linear l,(select avg (x) as xbar,avg (y) ybar from linear) as xx;
查询返回的以下结果。当然你可以 group by 其中一个字段,得出不同的结果,当 x 或者 y 存在相同的数值时 ,就有了不同效果。
如图所示