首先要推薦一下:
http://www.alidata.org/archives/1470
阿里的大牛在上面的文章中比較詳細的介紹了shuffle過程中mapper和reduce的每個過程,強烈推薦先讀一下。
?
不過,上文沒有寫明一些實現的細節,比如:spill的過程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一樣在看了上面的文章后還是有很多疑問,我也是帶著疑問花了很久的看了cdh4.1.0版本 shuffle的邏輯,整理成本文,為以后回顧所用。
?
?首先用一張圖展示下map的流程:
在上圖中,我們假設此次mapreduce有多個mapper和2個reducer,p0 p1分別代表該數據應該分配到哪個reducer端。我將mapper的過程大致分為5個過程。
?
1.prepare Input。
Mapreduce程序都需要指定輸入文件,輸入的格式有很多種,最常見的是保存在hdfs 上的文本文件。在用戶提交job到jobtrack(ResourceManager)前的job就會根據用戶的輸入文件計算出需要多少mapper,多 少reducer,mapper的輸入InputSplit有多大,block塊名稱等。mapper在prepare input階段只需要根據inputFormat類型創建對應的RecordReader打開對應的inputSplit分片即可。如果job配置了 combiner還需初始化combiner。代碼見MapTask類run方法
?
2.mapper process
這里的mapper指用戶使用或自己繼承的mapper類,這也是所有初學mapreduce的同學首先看到的類。
-
<span?style=
???*?Called?once?for?each?key/value?pair?in?the?input?split.?Most?applications?
-
???*?should?override?this,?but?the?default?is?the?identity?function.?
-
???*/
(
)??
-
protected
void
throws
</span>??
可以看到mapper默認的map方法就是取出key,value并放到context對象中。context對象包裝了一個內存中的buf,下面會介紹。
-
<span?style=
public
void
throws
while
??}</span>??
run方法就是mapper實際運行的過程:不停的從context的inputSplit對象中取出keyvalue對,通過map方法處理再保存到context包裝的內存buf中。
?
3.buffer in memery
key value在寫入context中后實際是寫入MapOutputBuffer類中。在第一個階段的初始化過程中,MapOutputBuffer類會根據配置文件初始化內存buffer,我們來看下都有哪些參數:
-
<span?style=
??
-
final
float
float
0.8
final
int
);??
-
if
float
1.0
float
0.0
throw
new
if
)?!=?sortmb)?{??
-
throw
new
,??
-
class
class
),?job);</span>??
partition:mapper的數據需要分配到reduce端的個數,由用戶的job指定,默認為1.
spillper:內存buf使用到此比例就會觸發spill,將內存中的數據flush成一個文件。默認為0.8
sortmb:內存buf的大小,默認100MB
indexCacheMemoryLimit:內存index的大小。默認為1024*1024
sorter:對mapper輸出的key的排序,默認是快排
?
內存buffer比較復雜,貼一張圖介紹一下這塊內存buf的結構:
當一對keyvalue寫入時首先會從wrap buf的右側開始往左寫,同時,會把一條keyvalue的meta信息(partition,keystart,valuestart)寫入到最左邊的 index區域。當wrap buf大小達到spill的觸發比例后會block寫入,挖出一部分數據開始spill,直到spill完成后才能繼續寫,不過寫入位置不會置零,而是類 似循環buf那樣,在spill掉數據后可以重復利用內存中的buf區域。
?
這里單獨講一下partition:
-
<span?style=
??
-
public
void
throws
????}</span>??
在keyvalue對寫入MapOutputBuffer時會調用 partitioner.getPartition方法計算partition即應該分配到哪個reducer,這里的partition只是在內存的 buf的index區寫入一條記錄而已,和下一個部分的partition不一樣哦。看下默認的partitioner:HashPartition
-
<span?style=
??
-
public
int
int
return
??}</span>??
HashPartition只是把key hash后按reduceTask的個數取模,
因此一般來說,不同的key分配到哪個reducer是隨即的!所以,reducer內的所有數據是有序的,但reducer之間的數據卻是亂序的!要想數據整體排序,要不只設一個reducer,要不使用TotalOrderPartitioner!
?
4.Partition Sort Store
在第四步中,partition是和sort一起做的,負責Spill的線程在拿到一段內存buf后會調用QuickSort的sort方法進行內存中的快排。
-
<span?style=
this
,?mstart,?mend,?reporter);</span>??
排序的算法是先按keyvalue記錄的partition排序后按key的compare方法:
-
<span?style=
public
int
final
int
final
int
final
int
final
int
final
int
final
int
??
-
if
return
??
-
return
????}</span>??
因此,mapper輸出的keyvalue首先是按partition聚合。而我們如果指定key的compare方法會在這里生效并進行排序。最后,一次spill的輸出文件類似下圖。
在對內存中的buf排序后開始寫文件。
-
<span?style=
for
int
;?i?<?partitions;?++i)?{??
-
null
try
long
new
if
null
??
-
new
while
final
int
else
int
while
??
-
??
-
if
new
????????????}</span>??
如果job沒有定義combiner則直接寫文件,
如果有combiner則在這里進行combine。
在生成spill文件后還會將此次spillRecord的記錄寫在一個index文件中。
-
<span?style=
??????????spillRec.writeToFile(indexFilename,?job);</span>??
-
<span?style=
????????????spillRec.putIndex(rec,?i);</span>??
?
5.merge
當mapper執行完畢后,就進入merge階段。首先看下相關的配置參數:
-
<span?style=
int
);</span>??
mergeFactor:同時merge的文件數。
?
merge階段的目的是將多個spill生成的中間文件合并為一個輸出文件,這里的合并不同 于combiner,無論有沒有配置combiner這里的merge都會執行。merge階段的輸出是一個數據文件 MapFinalOutputFile和一個index文件。看下相關代碼:
-
<span?style=
new
null
??
-
long
new
if
null
else
??????????}</span>??
說下merge的算法。每個spill生成的文件中keyvalue都是有序的,但不同的文 件卻是亂序的,類似多個有序文件的多路歸并算法。Merger分別取出需要merge的spillfile的最小的keyvalue,放入一個內存堆中, 每次從堆中取出一個最小的值,并把此值保存到merge的輸出文件中。這里和hbase中scan的算法非常相似,在分布式系統中多路歸并排序真是當紅小 生啊!
這里merge時不同的partition的key是不會比較的,只有相同的partition的keyvalue才會進行排序和合并。最后的輸出文件類似下圖。
如果用戶定義了combiner,在merge的過程中也會進行combine,因為雖然第 四步中combine過但那只是部分輸入的combine,在merge時仍然需要combine。這里有人問了,既然這里有combiner,為啥在 spill輸出時還要combine納,我認為是因為每次combine都會大大減少輸出文件的大小,spill時就combine能減少一定的IO操 作。
?
在merge完后會把不同partition的信息保存進一個index文件以便之后reducer來拉自己部分的數據。
-
<span?style=
??
-
??????????spillRec.putIndex(rec,?parts);</span>??
最后,我們再對mapper過程中的要點總結一下:
1.對map輸出<key,value>的分區(partition)是在寫入內存buf前就做好的了,方法是對key的hash。我們可以通過繼承Partitioner類自己實現分區,將自己想要的數據分到同一個reducer中。
2.寫入內存buf速度是非常快的,但spill過程會block寫入。因此,對內存buf相關參數的調優是mapreduce調優的重點之一。
3.對數據的排序是基于MapOutKey排序的,因此,我們可以重載對應的方法實現customize的排序順序
4.combine在spill和merge中都是進行。多次的combine會減少mapreduce中的IO操作,如果使用得當會很好的提高性能。但需要注意的是要深刻理解combine的意義,比如平均值就不適合用combine。