自己學習排序和二次排序的知識整理如下。
1.Hadoop的序列化格式介紹:Writable
2.Hadoop的key排序邏輯
3.全排序
4.如何自定義自己的Writable類型
5.如何實現二次排序
1.Hadoop的序列化格式介紹:Writable
要了解和編寫MR實現排序必須要知道的第一個知識點就是Writable相關的接口和類,這些是HADOOP自己的序列化格式。更多的可能是要關注他的Subinterfaces:WritableComparable<T>。他是繼承Writable和Comparable<T>接口,繼而WritableComparable<T>的實現除了具有序列化特性,更重要的是具有了比較的特性,而比較的特性在MapReduce里是很重要的,因為MR中有個基于鍵的排序過程,所以可以作為鍵的類型必須具有Comparable<T>的特性。
除了WritableComparable接口外,還有一個接口RawComparaotor。
WritableComparable和RawComparator兩個接口的區別是:
WritableComparable是需要把數據流反序列化為對象后,然后做對象之間的比較,而RawComparator是直接比較數據流的數據,不需要數據流反序列化成對象,省去了新建對象的開銷。
2.Hadoop的key排序邏輯
Hadoop本身Key的數據類型的排序邏輯其實就是依賴于Hadoop本身的繼承與WritableComparable<T>的基本數據類型和其他類型(相關類型可參考《Hadoop權威指南》第二版的90頁)的compareTo方法的定義。
Key排序的規則:
1.如果調用jobconf的setOutputKeyComparatorClass()設置mapred.output.key.comparator.class
2.否則,使用key已經登記的comparator
3.否則,實現接口WritableComparable的compareTo()函數來操作
例如IntWritable的比較算法如下:
- public int compareTo(Objecto){
- int thisValue= this .value;
- int thatValue=((IntWritable)o).value;
- return (thisValue<thatValue?- 1 :(thisValue==thatValue? 0 : 1 ));
- }
可以修改compareTo來實現自己所需的比較算法。
雖然我們知道是compareTo這個方法實現Key的排序,但其實我們在使用Hadoop的基本數據類型時不需要關注這個排序如何實現,因為Hadoop的框架會自動調用compareTo這個方法實現key的排序。但是這個排序只是局限在map或者reduce內部。針對于map與map,reduce與reduce之間的排序compareTo就管不著了,雖然這種情況不常出現,但是確實存在這種問題的,而且確實有適用場景,比如說全排序。
3.全排序
這里就需要關注Partition這個階段,Partition階段是針對每個Reduce,需要創建一個分區,然后把Map的輸出結果映射到特定的分區中。這個分區中可能會有N個Key對應的數據,但是一個Key的所有數據只能在一個分區中。在實現全排序的過程中,如果只有一個reduce,也就是只有一個Partition,那么所有Map的輸出都會經過一個Partition到一個reduce里,在一個reduce里可以根據compareTo(也可以采用其他比較算法)來排序,實現全排序。但是這種情況就讓MapReduce失去了分布式計算的光環。
所以全排序的大概思路為:確保Partition之間是有序的就OK了,即保證Partition1的最大值小于Partition2的最小值就OK了,即便這樣做也還是有個問題:Partition的分布不均,可能導致某些Partition處理的數據量遠大于其他Partition處理的數據量。而實現全排序的核心步驟為:取樣和Partition。
先“取樣”,保證Partition得更均勻:
1) 對Math.min(10, splits.length)個split(輸入分片)進行隨機取樣,對每個split取10000個樣,總共10萬個樣
2) 10萬個樣排序,根據reducer的數量(n),取出間隔平均的n-1個樣
3) 將這個n-1個樣寫入partitionFile(_partition.lst,是一個SequenceFile),key是取的樣,值是nullValue
4) 將partitionFile寫入DistributedCache
2) 10萬個樣排序,根據reducer的數量(n),取出間隔平均的n-1個樣
3) 將這個n-1個樣寫入partitionFile(_partition.lst,是一個SequenceFile),key是取的樣,值是nullValue
4) 將partitionFile寫入DistributedCache
整個全排序的詳細介紹可參照:
http://www.iteye.com/topic/709986
4.如何自定義自己的Writable類型
自定義自己的Writable類型的場景應該很簡單:Hadoop自帶的數據類型要么在功能上不能滿足需求,要么在性能上滿足需求,畢竟Hadoop還在發展,不是所有情況都考慮的,但是他提供了自主的框架實現我們想要的功能。
定義自己的Writable類型需要實現:
a.重載構造函數
b.實現set和get方法
c.實現接口的方法:write()、readFields()、compareTo()
d.(可選)相當于JAVA構造的對象,重寫java.lang.Object的hashCode()、equals()、toString()。Partition階段默認的hashpartitioner會根據hashCode()來選擇分區,如果不要對自定義類型做key進行分區,hashCode()可不實現
具體例子可參考hadoop的基本類型IntWritable的實現
- public class IntWritable implements WritableComparable{
- private int value;
- public IntWritable(){}
- public IntWritable( int value){set(value);}
- /**SetthevalueofthisIntWritable.*/
- public void set( int value){ this .value=value;}
- /**ReturnthevalueofthisIntWritable.*/
- public int get(){ return value;}
- public void readFields(DataInputin) throws IOException{
- value=in.readInt();
- }
- public void write(DataOutputout) throws IOException{
- out.writeInt(value);
- }
- /**Returnstrueiff<code>o</code>isaIntWritablewiththesamevalue.*/
- public boolean equals(Objecto){
- if (!(o instanceof IntWritable))
- return false ;
- IntWritableother=(IntWritable)o;
- return this .value==other.value;
- }
- public int hashCode(){
- return value;
- }
- /**ComparestwoIntWritables.*/
- public int compareTo(Objecto){
- int thisValue= this .value;
- int thatValue=((IntWritable)o).value;
- return (thisValue<thatValue?- 1 :(thisValue==thatValue? 0 : 1 ));
- }
- public StringtoString(){
- return Integer.toString(value);
- }
- }
5.如何實現二次排序
二次排序的工作原理涉及到如下幾方面:
a.創建key的數據類型,key要包括兩次排序的元素
b.setPartitionerClass(Class<? extends Partitioner> theClass)
hadoop0.20.0以后的函數為setPartitionerClass
c.setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函數為 setSortComparatorClass
d.setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函數為 setGroupingComparatorClass
根據hadoop自己提供的example:org.apache.hadoop.examplesSecondarySort來說明二次排序具體是如何實現的.
SecondarySort實現IntPair、FirstPartitioner、FirstGroupingComparator、MapClass、Reduce這幾個內部類,然后在main函數中調用。先說明下main函數中有哪些地方和普通的MR代碼不同。
不同點是多了這兩個set:
job.setPartitionerClass(FirstPartitioner.class);
設置自定義的Partition操作,在此是調用我們自定義的內部類 FirstPartitioner
job.setGroupingComparatorClass(FirstGroupingComparator.class);
設置哪些value進入哪些key的迭代器中,在此是調用自定義的內部類 FirstGroupingComparator
具體的操作邏輯為:
a.定義一個作為key的類型IntPair,在IntPair中有兩個變量first、second,SecondarySort就是在對first排序后再對second再排序處理
b.定義分區函數類FirstPartitioner,Key的第一次排序。在FirstPartitioner實現如何處理key的first,把key對應的數據劃分到不同的分區中。這樣key中first相同的value會被放在同一個reduce中,在reduce中再做第二次排序
c(代碼沒有實現,其實內部是有處理).key比較函數類,key的第二次排序,是繼承WritableComparator的一個比較器。
setSortComparatorClass可以實現。
為什么沒有使用 setSortComparatorClass()是因為hadoop對key排序的規則(參看 2.Hadoop的key排序邏輯 )決定的。由于我們在IntPair中已經定義了compareTo()函數。
d.定義分組函數類
FirstGroupingComparator,
保證只要key的的第一部分相同,value就進入key的value迭代器中。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
