参考了 20 多个中英文博客,以及官网,详细链接在最下面。解决了 UDAF 的开发问题。
执行流程 Model 和 Evaluator 的关系。来源于
Mode 各个阶段对应的 Evaluator 方法调用。
Evaluator 各个阶段下处理 MapReduce 的流程
需解公式
代码 编写 UDAF 函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 package me.young1lin.hive;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;import org.apache.hadoop.hive.serde2.io.DoubleWritable;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;import java.util.ArrayList;import java.util.List;import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;@Description (name = "regression" , value = "_FUNC_(double x,avg(x),double y,avg(y)) - computes the simple linear regression" )public class LinearRegressionUDAF extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getEvaluator (GenericUDAFParameterInfo info) throws SemanticException { ObjectInspector[] inputOIs = info.getParameterObjectInspectors(); if (inputOIs.length != 4 ) { throw new UDFArgumentLengthException("except 4 params,but " + inputOIs.length); } for (ObjectInspector tmp : inputOIs) { if (tmp.getCategory() != ObjectInspector.Category.PRIMITIVE || ((PrimitiveObjectInspector) tmp).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.DOUBLE) { throw new UDFArgumentException("only support double" ); } } return new LinearRegressionUDAFEvaluator(); } private static class LinearRegressionUDAFEvaluator extends GenericUDAFEvaluator { private StandardListObjectInspector javaDoubleListInspector = ObjectInspectorFactory.getStandardListObjectInspector(javaDoubleObjectInspector); private DoubleObjectInspector doubleObjectInspector = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; private DoubleObjectInspector[] originalDataOIs = new DoubleObjectInspector[4 ]; @Override public ObjectInspector init (Mode m, ObjectInspector[] parameters) throws HiveException { super .init(m, parameters); ObjectInspector result; if (Mode.PARTIAL1.equals(m)) { processOriginalDataObjectInspector(parameters); result = javaDoubleListInspector; } else if (Mode.PARTIAL2.equals(m)) { result = javaDoubleListInspector; } else if (Mode.FINAL.equals(m)) { result = doubleObjectInspector; } else { result = doubleObjectInspector; } return result; } private void processOriginalDataObjectInspector (ObjectInspector[] parameters) { originalDataOIs[0 ] = (DoubleObjectInspector) parameters[0 ]; originalDataOIs[1 ] = (DoubleObjectInspector) parameters[1 ]; originalDataOIs[2 ] = (DoubleObjectInspector) parameters[2 ]; originalDataOIs[3 ] = (DoubleObjectInspector) parameters[3 ]; } @Override public AggregationBuffer getNewAggregationBuffer () { LinearRegressionAggregationBuffer buffer = new LinearRegressionAggregationBuffer(); buffer.reset(); return buffer; } @Override public void reset (AggregationBuffer agg) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; buffer.reset(); } @Override public void iterate (AggregationBuffer agg, Object[] inputs) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; double x = PrimitiveObjectInspectorUtils.getDouble(inputs[0 ], originalDataOIs[0 ]); double xBar = PrimitiveObjectInspectorUtils.getDouble(inputs[1 ], originalDataOIs[1 ]); double y = PrimitiveObjectInspectorUtils.getDouble(inputs[2 ], originalDataOIs[2 ]); double yBar = PrimitiveObjectInspectorUtils.getDouble(inputs[3 ], originalDataOIs[3 ]); buffer.xBarSubReMultiplyYBarSubReSum += buffer.reminderMultiply(x, xBar, y, yBar); buffer.xBarSubReSquareSum += buffer.reminderSquare(x, xBar); buffer.yBarSubReSquareSum += buffer.reminderSquare(y, yBar); } @Override public Object terminatePartial (AggregationBuffer agg) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; ArrayList<Double> list = new ArrayList<>(3 ); list.add(buffer.xBarSubReMultiplyYBarSubReSum); list.add(buffer.xBarSubReSquareSum); list.add(buffer.yBarSubReSquareSum); return list; } @Override public void merge (AggregationBuffer agg, Object param) { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; List<DoubleWritable> list = (List<DoubleWritable>)param; buffer.xBarSubReMultiplyYBarSubReSum += list.get(0 ).get(); buffer.xBarSubReSquareSum += list.get(1 ).get(); buffer.yBarSubReSquareSum += list.get(2 ).get(); } @Override public Object terminate (AggregationBuffer agg) throws HiveException { LinearRegressionAggregationBuffer buffer = (LinearRegressionAggregationBuffer) agg; double value = buffer.xBarSubReMultiplyYBarSubReSum / Math.sqrt(buffer.xBarSubReSquareSum * buffer.yBarSubReSquareSum); return new DoubleWritable(value); } static class LinearRegressionAggregationBuffer extends AbstractAggregationBuffer { double xBarSubReMultiplyYBarSubReSum; double xBarSubReSquareSum; double yBarSubReSquareSum; double reminderMultiply (double x, double xBar, double y, double yBar) { return (x - xBar) * (y - yBar); } double reminderSquare (double v, double vBar) { return (v - vBar) * (v - vBar); } void reset () { xBarSubReMultiplyYBarSubReSum = 0 ; xBarSubReSquareSum = 0 ; yBarSubReSquareSum = 0 ; } } } }
将上面的类打成 jar 包。可以用 maven 打包(非 shade 方式),名称在 <build> 标签里写 <finalName>这里是你的 jar 的最终名称</finalName
名称可以自定义,也可以我取的名字 custom_function_tools.jar。也可以用Build -> Build Artifacts
.
上传至 HDFS 1 hadoop fs -put custom_function_tools.jar /tmp
hadoop fs -put custom_function_tools.jar[这里是你打包好的 jar 包] /tmp[这里是你要传到 HDFS 的文件夹路径] 。
如果没有 tmp 文件夹,则执行以下命令
测试相关 SQL 建表语句 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 create table linear( s string DEFAULT NULL COMMENT '描述' x double DEFAULT 0 COMMENT '数值' y double DEFAULT 0 COMMENT '数值' ) COMMENT '测试线性回归方程相关系数表' CLUSTERED BY (s) INTO 23 BUCKETSROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.orc.OrcSerde' WITH SERDEPROPERTIES('serialization.format' ='1' )STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' TABLEPROPERTIES( 'numFiles' ='23' , 'transactional' ='true' , 'COLUMN_STATS_ACCURATE' ='true' , 'totalSize' ='7075' , 'numRows' ='1' , 'rawDataSize' ='1' )
插入数据 1 2 3 4 5 6 insert into table linear value ('1' ,5 ,9 );insert into table linear value ('2' ,4 ,10 );insert into table linear value ('3' ,12 ,20 );insert into table linear value ('4' ,59.555 ,40.2111 );insert into table linear value ('5' ,6.42 ,34.14 );insert into table linear value ('6' ,9 ,27 );
创建临时函数(二选一,测试时推荐) 可以创建临时函数方便测试,重启后就没的那种,也可以创建永久函数。
1 create TEMPORARY FUNCTION regression as 'me.young1lin.hive.LinearRegressionUDAF' using JAR 'hdfs:///tmp/custom_function_tools.jar'
这里的 hdfs:///tmp/custom_function_tools.jar
是三个 / ,名称是你刚上传的 jar 的名称。
前两个 /
和 hdfs:表示的是协议,最后一个 /
表示根路径。
创建永久函数(二选一) 1 create PERMANENT FUNCTION regression as 'me.young1lin.hive.LinearRegressionUDAF' using JAR 'hdfs:///tmp/custom_function_tools.jar'
查询示例 1 select regression(l.x,xx.xbar,l.y,xx.ybar)from linear l,(select avg (x) as xbar,avg (y) ybar from linear) as xx;
查询返回的以下结果。当然你可以 group by 其中一个字段,得出不同的结果,当 x 或者 y 存在相同的数值时 ,就有了不同效果。
如图所示
环境说明 TDH 475 版本,不是 5 以上打成容器的版本,也不是 6 版本的 Kubernetes 上。如果报错,可以去具体 SparkSQL 的日志里面去看什么错误。