亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Hive UDAF開發(fā)之同時(shí)計(jì)算最大值與最小值

系統(tǒng) 4539 0

卷首語

前一篇文章 hive UDAF開發(fā)入門和運(yùn)行過程詳解(轉(zhuǎn)) 里面講過UDAF的開發(fā)過程,其中說到如果要深入理解UDAF的執(zhí)行,可以看看求平均值的UDF的源碼

本人在看完源碼后,也還是沒能十分理解里面的內(nèi)容,于是動(dòng)手再自己開發(fā)一個(gè)新的函數(shù),試圖多實(shí)踐中理解它

?

函數(shù)功能介紹

函數(shù)的功能比較蛋疼,我們都知道Hive中有幾個(gè)常用的聚合函數(shù):sum,max,min,avg

現(xiàn)在要用一個(gè)函數(shù)來同時(shí)實(shí)現(xiàn)倆個(gè)不同的功能,對(duì)于同一個(gè)key,要求返回指定value集合中的最大值與最小值

這里面涉及到一個(gè)難點(diǎn),函數(shù)接收到的數(shù)據(jù)只有一個(gè),但是要同時(shí)產(chǎn)生出倆個(gè)新的數(shù)據(jù)出來,且具備一定的邏輯關(guān)系

語言描述這東西我不大懂,想了好久,還是直接上代碼得了。。。。。。。。。。。。。

?

源碼

?

      
        package
      
      
         org.juefan.udaf;


      
      
        import
      
      
         java.util.ArrayList;


      
      
        import
      
      
         org.apache.commons.logging.Log;

      
      
        import
      
      
         org.apache.commons.logging.LogFactory;

      
      
        import
      
      
         org.apache.hadoop.hive.ql.exec.Description;

      
      
        import
      
      
         org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;

      
      
        import
      
      
         org.apache.hadoop.hive.ql.metadata.HiveException;

      
      
        import
      
      
         org.apache.hadoop.hive.ql.parse.SemanticException;

      
      
        import
      
      
         org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;

      
      
        import
      
      
         org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.io.DoubleWritable;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.StructField;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;

      
      
        import
      
      
         org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

      
      
        import
      
      
         org.apache.hadoop.io.LongWritable;

      
      
        import
      
      
         org.apache.hadoop.io.Text;

      
      
        import
      
      
         org.apache.hadoop.util.StringUtils;


      
      
        /**
      
      
        
 * GenericUDAFMaxMin.
 
      
      
        */
      
      
        
@Description(name 
      
      = "maxmin", value = "_FUNC_(x) - Returns the max and min value of a set of numbers"
      
        )

      
      
        public
      
      
        class
      
       GenericUDAFMaxMin 
      
        extends
      
      
         AbstractGenericUDAFResolver {

    
      
      
        static
      
      
        final
      
       Log LOG = LogFactory.getLog(GenericUDAFMaxMin.
      
        class
      
      
        .getName());

    @Override
    
      
      
        public
      
      
         GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
            
      
      
        throws
      
      
         SemanticException {
        
      
      
        if
      
       (parameters.length != 1
      
        ) {
            
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(parameters.length - 1
      
        ,
                    
      
      "Exactly one argument is expected."
      
        );
        }

        
      
      
        if
      
       (parameters[0].getCategory() !=
      
         ObjectInspector.Category.PRIMITIVE) {
            
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(0
      
        ,
                    
      
      "Only primitive type arguments are accepted but "
                            + parameters[0].getTypeName() + " is passed."
      
        );
        }
        
      
      
        switch
      
       (((PrimitiveTypeInfo) parameters[0
      
        ]).getPrimitiveCategory()) {
        
      
      
        case
      
      
         BYTE:
        
      
      
        case
      
      
         SHORT:
        
      
      
        case
      
      
         INT:
        
      
      
        case
      
      
         LONG:
        
      
      
        case
      
      
         FLOAT:
        
      
      
        case
      
      
         DOUBLE:
        
      
      
        case
      
      
         STRING:
        
      
      
        case
      
      
         TIMESTAMP:
            
      
      
        return
      
      
        new
      
      
         GenericUDAFMaxMinEvaluator();
        
      
      
        case
      
      
         BOOLEAN:
        
      
      
        default
      
      
        :
            
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(0
      
        ,
                    
      
      "Only numeric or string type arguments are accepted but "
                            + parameters[0].getTypeName() + " is passed."
      
        );
        }
    }

    
      
      
        /**
      
      
        
     * GenericUDAFMaxMinEvaluator.
     *
     
      
      
        */
      
      
        public
      
      
        static
      
      
        class
      
       GenericUDAFMaxMinEvaluator 
      
        extends
      
      
         GenericUDAFEvaluator {

        
      
      
        //
      
      
         For PARTIAL1 and COMPLETE
      
      
                PrimitiveObjectInspector inputOI;

        
      
      
        //
      
      
         For PARTIAL2 and FINAL
      
      
                StructObjectInspector soi;
        
      
      
        //
      
      
         封裝好的序列化數(shù)據(jù)接口,存儲(chǔ)計(jì)算過程中的最大值與最小值
      
      
                StructField maxField;
        StructField minField;
        
      
      
        //
      
      
         存儲(chǔ)數(shù)據(jù),利用get()可直接返回double類型值
      
      
                DoubleObjectInspector maxFieldOI;
        DoubleObjectInspector minFieldOI;

        
      
      
        //
      
      
         For PARTIAL1 and PARTIAL2
        
      
      
        //
      
      
         存儲(chǔ)中間的結(jié)果
      
      
                Object[] partialResult;

        
      
      
        //
      
      
         For FINAL and COMPLETE
        
      
      
        //
      
      
         最終輸出的數(shù)據(jù)
      
      
                Text result;

        @Override
        
      
      
        public
      
      
         ObjectInspector init(Mode m, ObjectInspector[] parameters)
                
      
      
        throws
      
      
         HiveException {
            
      
      
        assert
      
       (parameters.length == 1
      
        );
            
      
      
        super
      
      
        .init(m, parameters);

            
      
      
        //
      
      
         初始化數(shù)據(jù)輸入過程
      
      
        if
      
       (m == Mode.PARTIAL1 || m ==
      
         Mode.COMPLETE) {
                inputOI 
      
      = (PrimitiveObjectInspector) parameters[0
      
        ];
            } 
      
      
        else
      
      
         {
                
      
      
        //
      
      
         如果接收到的數(shù)據(jù)是中間數(shù)據(jù),則轉(zhuǎn)換成相應(yīng)的結(jié)構(gòu)體
      
      
                soi = (StructObjectInspector) parameters[0
      
        ];
                
      
      
        //
      
      
         獲取指定字段的序列化數(shù)據(jù)
      
      
                maxField = soi.getStructFieldRef("max"
      
        );
                minField 
      
      = soi.getStructFieldRef("min"
      
        );
                
      
      
        //
      
      
         獲取指定字段的實(shí)際數(shù)據(jù)
      
      
                maxFieldOI =
      
         (DoubleObjectInspector) maxField.getFieldObjectInspector();
                minFieldOI 
      
      =
      
         (DoubleObjectInspector) minField.getFieldObjectInspector();
            }

            
      
      
        //
      
      
         初始化數(shù)據(jù)輸出過程
      
      
        if
      
       (m == Mode.PARTIAL1 || m ==
      
         Mode.PARTIAL2) {
                
      
      
        //
      
      
         輸出的數(shù)據(jù)是一個(gè)結(jié)構(gòu)體,其中包含了max和min的值
                
      
      
        //
      
      
         存儲(chǔ)結(jié)構(gòu)化數(shù)據(jù)類型
      
      
                ArrayList<ObjectInspector> foi = 
      
        new
      
       ArrayList<ObjectInspector>
      
        ();
                foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
                foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
                
      
      
        //
      
      
         存儲(chǔ)結(jié)構(gòu)化數(shù)據(jù)的字段名稱
      
      
                ArrayList<String> fname = 
      
        new
      
       ArrayList<String>
      
        ();
                fname.add(
      
      "max"
      
        );
                fname.add(
      
      "min"
      
        );
                partialResult 
      
      = 
      
        new
      
       Object[2
      
        ];
                partialResult[
      
      0] = 
      
        new
      
       DoubleWritable(0
      
        );
                partialResult[
      
      1] = 
      
        new
      
       DoubleWritable(0
      
        );
                
      
      
        return
      
      
         ObjectInspectorFactory.getStandardStructObjectInspector(fname,
                        foi);

            } 
      
      
        else
      
      
         {
                
      
      
        //
      
      
         如果執(zhí)行到了最后一步,則指定相應(yīng)的輸出數(shù)據(jù)類型
      
      
                result = 
      
        new
      
       Text(""
      
        );
                
      
      
        return
      
      
         PrimitiveObjectInspectorFactory.writableStringObjectInspector;
            }
        }

        
      
      
        static
      
      
        class
      
       AverageAgg 
      
        implements
      
      
         AggregationBuffer {
            
      
      
        double
      
      
         max;
            
      
      
        double
      
      
         min;
        };

        @Override
        
      
      
        public
      
       AggregationBuffer getNewAggregationBuffer() 
      
        throws
      
      
         HiveException {
            AverageAgg result 
      
      = 
      
        new
      
      
         AverageAgg();
            reset(result);
            
      
      
        return
      
      
         result;
        }

        @Override
        
      
      
        public
      
      
        void
      
       reset(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
            AverageAgg myagg 
      
      =
      
         (AverageAgg) agg;
            myagg.max 
      
      =
      
         Double.MIN_VALUE;
            myagg.min 
      
      =
      
         Double.MAX_VALUE;
        }

        
      
      
        boolean
      
       warned = 
      
        false
      
      
        ;

        @Override
        
      
      
        public
      
      
        void
      
      
         iterate(AggregationBuffer agg, Object[] parameters)
                
      
      
        throws
      
      
         HiveException {
            
      
      
        assert
      
       (parameters.length == 1
      
        );
            Object p 
      
      = parameters[0
      
        ];
            
      
      
        if
      
       (p != 
      
        null
      
      
        ) {
                AverageAgg myagg 
      
      =
      
         (AverageAgg) agg;
                
      
      
        try
      
      
         {
                    
      
      
        //
      
      
         獲取輸入數(shù)據(jù),并進(jìn)行相應(yīng)的大小判斷
      
      
        double
      
       v =
      
         PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
                    
      
      
        if
      
      (myagg.max <
      
         v){
                        myagg.max 
      
      =
      
         v;
                    }
                    
      
      
        if
      
      (myagg.min >
      
         v){
                        myagg.min 
      
      =
      
         v;
                    }
                } 
      
      
        catch
      
      
         (NumberFormatException e) {
                    
      
      
        if
      
       (!
      
        warned) {
                        warned 
      
      = 
      
        true
      
      
        ;
                        LOG.warn(getClass().getSimpleName() 
      
      + " "
                                +
      
         StringUtils.stringifyException(e));
                        LOG.warn(getClass().getSimpleName()
                                
      
      + " ignoring similar exceptions."
      
        );
                    }
                }
            }
        }

        @Override
        
      
      
        public
      
       Object terminatePartial(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
            
      
      
        //
      
      
         將中間計(jì)算出的結(jié)果封裝好返回給下一步操作
      
      
            AverageAgg myagg =
      
         (AverageAgg) agg;
            ((DoubleWritable) partialResult[
      
      0
      
        ]).set(myagg.max);
            ((DoubleWritable) partialResult[
      
      1
      
        ]).set(myagg.min);
            
      
      
        return
      
      
         partialResult;
        }

        @Override
        
      
      
        public
      
      
        void
      
      
         merge(AggregationBuffer agg, Object partial)
                
      
      
        throws
      
      
         HiveException {
            
      
      
        if
      
       (partial != 
      
        null
      
      
        ) {
                
      
      
        //
      
      
        此處partial接收到的是terminatePartial的輸出數(shù)據(jù)
      
      
                AverageAgg myagg =
      
         (AverageAgg) agg;
                Object partialmax 
      
      =
      
         soi.getStructFieldData(partial, maxField);
                Object partialmin 
      
      =
      
         soi.getStructFieldData(partial, minField);
                
      
      
        if
      
      (myagg.max <
      
         maxFieldOI.get(partialmax)){
                    myagg.max 
      
      =
      
         maxFieldOI.get(partialmax);
                }
                
      
      
        if
      
      (myagg.min >
      
         minFieldOI.get(partialmin)){
                    myagg.min 
      
      =
      
         minFieldOI.get(partialmin);
                }
            }
        }

        @Override
        
      
      
        public
      
       Object terminate(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
            
      
      
        //
      
      
         將最終的結(jié)果合并成字符串后輸出
      
      
            AverageAgg myagg =
      
         (AverageAgg) agg;
            
      
      
        if
      
       (myagg.max == 0
      
        ) {
                
      
      
        return
      
      
        null
      
      
        ;
            } 
      
      
        else
      
      
         {
                result.set(myagg.max 
      
      + "\t" +
      
         myagg.min);
                
      
      
        return
      
      
         result;
            }
        }
    }

}
      
    

?

?

?

寫完后還是覺得沒有怎么理解透整個(gè)過程,所以上面的注釋也就將就著看了,不保證一定正確的!

下午加上一些輸出跟蹤一下執(zhí)行過程才行,不過代碼的邏輯是沒有問題的了,本人運(yùn)行過!

Hive UDAF開發(fā)之同時(shí)計(jì)算最大值與最小值


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦!??!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 天天谢天天干 | 久久精品亚洲一级毛片 | 日韩欧美国产偷亚洲清高 | 波多野结衣国产一区二区三区 | a一级免费视频 | 午夜一级毛片免费视频 | 在线免费观看一级毛片 | 四虎影院视频在线观看 | 欧美日韩视频一区二区 | 中文字幕一区婷婷久久 | 97影院秋霞国产精品 | 中文字幕三级理论影院 | 国产一区精品在线 | 91精品国产综合久久久久 | 毛片网站免费观看 | 欧美一级视频在线观看 | 成人a视频高清在线观看 | 99久久久久国产精品免费 | 亚洲精品在线网站 | 久久久91精品国产一区二区三区 | 亚洲国产精品成人午夜在线观看 | 奇米77777| 一区二区免费看 | 亚洲日韩在线视频 | 欧美日韩亚 | 人人干夜夜操 | 2021国产精品久久 | 日韩a毛片 | 国产伦精品一区二区三区无广告 | 在线91精品亚洲网站精品成人 | 精品国产网 | 欧美日韩亚洲国产精品 | 日韩精品久久久久久 | a v在线男人的天堂观看免费 | 成人影院在线免费观看 | 97se亚洲综合 | 欧美成人精品福利在线视频 | 中国产一级毛片 | 欧美性猛交xxxxx按摩欧美 | 日韩高清不卡在线 | 无毒不卡在线观看 |