介紹
hive的用戶自定義聚合函數(UDAF)是一個很好的功能,集成了先進的數據處理。hive有兩種UDAF:簡單和通用。顧名思義,簡單的UDAF,寫的相當簡單的,但因為使用Java反射導致性能損失,而且有些特性不能使用,如可變長度參數列表。通用UDAF可以使用??所有功能,但是UDAF就寫的比較復雜,不直觀。
本文只介紹通用UDAF。
UDAF是需要在hive的sql語句和group by聯合使用,hive的group by對于每個分組,只能返回一條記錄,這點和mysql不一樣,切記。
?
UDAF開發概覽
開發通用UDAF有兩個步驟,第一個是編寫resolver類,第二個是編寫 evaluator 類。 resolver負責類型檢查,操作符重載。 evaluator真正實現UDAF的邏輯。通常來說,頂層UDAF類繼承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 里面編寫嵌套類 evaluator ?實現UDAF的邏輯。
?本文以Hive的內置UDAF sum函數的源代碼作為示例講解。
?
實現?resolver
resolver通常繼承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 ,但是我們更建議繼承 AbstractGenericUDAFResolver,隔離將來hive接口的變化。
GenericUDAFResolver和GenericUDAFResolver2接口的區別是,后面的允許evaluator實現可以訪問更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。
public class GenericUDAFSum extends AbstractGenericUDAFResolver { static final Log LOG = LogFactory.getLog(GenericUDAFSum. class .getName()); @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { // Type-checking goes here! return new GenericUDAFSumLong();
}
public static class GenericUDAFSumLong extends GenericUDAFEvaluator { // UDAF logic goes here! } }
這個就是 UDAF的代碼骨架,第一行創建LOG對象,用來寫入警告和錯誤到hive的log。 GenericUDAFResolver只需要重寫一個方法: getEvaluator, 它根據SQL傳入的參數類型,返回正確的evaluator。這里最主要是實現操作符的重載。
getEvaluator的完整代碼如下:
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { if (parameters.length != 1 ) { throw new UDFArgumentTypeException(parameters.length - 1 , "Exactly one argument is expected." ); } if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(0 , "Only primitive type arguments are accepted but " + parameters[0].getTypeName() + " is passed." ); } switch (((PrimitiveTypeInfo) parameters[0 ]).getPrimitiveCategory()) { case BYTE: case SHORT: case INT: case LONG: case TIMESTAMP: return new GenericUDAFSumLong(); case FLOAT: case DOUBLE: case STRING: return new GenericUDAFSumDouble(); case BOOLEAN: default : throw new UDFArgumentTypeException(0 , "Only numeric or string type arguments are accepted but " + parameters[0].getTypeName() + " is passed." ); }
這里做了類型檢查,如果不是原生類型(即符合類型,array,map此類),則拋出異常,還實現了操作符重載,對于整數類型,使用GenericUDAFSumLong實現UDAF的邏輯,對于浮點類型,使用GenericUDAFSumDouble實現UDAF的邏輯。
?
實現evaluator
所有 evaluators必須繼承抽象類org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子類必須實現它的一些抽象方法,實現UDAF的邏輯。
GenericUDAFEvaluator有一個嵌套類Mode,這個類很重要,它表示了udaf在mapreduce的各個階段,理解Mode的含義,就可以理解了hive的UDAF的運行流程。
public static enum Mode { /** * PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合 * 將會調用iterate()和terminatePartial() */ PARTIAL1, /** * PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合并map的數據::從部分數據聚合到部分數據聚合: * 將會調用merge() 和 terminatePartial() */ PARTIAL2, /** * FINAL: mapreduce的reduce階段:從部分數據的聚合到完全聚合 * 將會調用merge()和terminate() */ FINAL, /** * COMPLETE: 如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了:從原始數據直接到完全聚合 * 將會調用 iterate()和terminate() */ COMPLETE };
一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),如果還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。
下面以GenericUDAFSumLong的evaluator實現講解
public static class GenericUDAFSumLong extends GenericUDAFEvaluator { private PrimitiveObjectInspector inputOI; private LongWritable result; // 這個方法返回了UDAF的返回類型,這里確定了sum自定義函數的返回類型是Long類型 @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { assert (parameters.length == 1 ); super .init(m, parameters); result = new LongWritable(0 ); inputOI = (PrimitiveObjectInspector) parameters[0 ]; return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } /** 存儲sum的值的類 */ static class SumLongAgg implements AggregationBuffer { boolean empty; long sum; } // 創建新的聚合計算的需要的內存,用來存儲mapper,combiner,reducer運算過程中的相加總和。 @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { SumLongAgg result = new SumLongAgg(); reset(result); return result; } // mapreduce支持mapper和reducer的重用,所以為了兼容,也需要做內存的重用。 @Override public void reset(AggregationBuffer agg) throws HiveException { SumLongAgg myagg = (SumLongAgg) agg; myagg.empty = true ; myagg.sum = 0 ; } private boolean warned = false ; // map階段調用,只要把保存當前和的對象agg,再加上輸入的參數,就可以了。 @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert (parameters.length == 1 ); try { merge(agg, parameters[ 0 ]); } catch (NumberFormatException e) { if (! warned) { warned = true ; LOG.warn(getClass().getSimpleName() + " " + StringUtils.stringifyException(e)); } } } // mapper結束要返回的結果,還有combiner結束返回的結果 @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { return terminate(agg); } // combiner合并map返回的結果,還有reducer合并mapper或combiner返回的結果。 @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null ) { SumLongAgg myagg = (SumLongAgg) agg; myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI); myagg.empty = false ; } } // reducer返回結果,或者是只有mapper,沒有reducer時,在mapper端返回結果。 @Override public Object terminate(AggregationBuffer agg) throws HiveException { SumLongAgg myagg = (SumLongAgg) agg; if (myagg.empty) { return null ; } result.set(myagg.sum); return result; } }
除了GenericUDAFSumLong,還有重載的GenericUDAFSumDouble,以上代碼都在hive的源碼:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。
?
注意
terminate()返回的數據類型要跟輸入時的數據類型保持一致,不然會報錯!
修改方法注冊
修改 ? ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入編寫的 UDAF類,并注冊名字。
FunctionRegistry類包含了hive的所有內置自定義函數。想要更好學習hive的UDAF,建議多看看里面的UDAF。
?
總結
本文的目的是為初學者入門學習udaf,所以介紹了udaf的概覽,尤其是udaf的運行過程,這對初學者是比較大的檻。
考慮入門,本文簡單介紹了sum的UDAF實現,但是如果想要更好理解UDAF的運行過程,建議再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF對hive的運行流程要控制的更加精細,并判斷當前運行的Mode做一定的邏輯處理。
?
參考? https://cwiki.apache.org/Hive/genericudafcasestudy.html
轉自? http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
