亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

hadoop核心邏輯shuffle代碼分析-map端

系統 3906 0

一直對書和各種介紹不太滿意, 終于看到一篇比較好的了,迅速轉載.

首先要推薦一下: http://www.alidata.org/archives/1470

阿里的大牛在上面的文章中比較詳細的介紹了shuffle過程中mapper和reduce的每個過程,強烈推薦先讀一下。

?

不過,上文沒有寫明一些實現的細節,比如:spill的過程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一樣在看了上面的文章后還是有很多疑問,我也是帶著疑問花了很久的看了cdh4.1.0版本 shuffle的邏輯,整理成本文,為以后回顧所用。

?

?首先用一張圖展示下map的流程:

?
在上圖中,我們假設此次mapreduce有多個mapper和2個reducer,p0 p1分別代表該數據應該分配到哪個reducer端。我將mapper的過程大致分為5個過程。
?
1.prepare Input。
Mapreduce程序都需要指定輸入文件,輸入的格式有很多種,最常見的是保存在hdfs 上的文本文件。在用戶提交job到jobtrack(ResourceManager)前的job就會根據用戶的輸入文件計算出需要多少mapper,多 少reducer,mapper的輸入InputSplit有多大,block塊名稱等。mapper在prepare input階段只需要根據inputFormat類型創建對應的RecordReader打開對應的inputSplit分片即可。如果job配置了 combiner還需初始化combiner。代碼見MapTask類run方法
?
2.mapper process
這里的mapper指用戶使用或自己繼承的mapper類,這也是所有初學mapreduce的同學首先看到的類。
  1. <span?style= ???*?Called?once?for?each?key/value?pair?in?the?input?split.?Most?applications?
  2. ???*?should?override?this,?but?the?default?is?the?identity?function.?
  3. ???*/ ( )??
  4. protected void throws </span>??
可以看到mapper默認的map方法就是取出key,value并放到context對象中。context對象包裝了一個內存中的buf,下面會介紹。
  1. <span?style= public void throws while ??}</span>??
run方法就是mapper實際運行的過程:不停的從context的inputSplit對象中取出keyvalue對,通過map方法處理再保存到context包裝的內存buf中。
?
3.buffer in memery
key value在寫入context中后實際是寫入MapOutputBuffer類中。在第一個階段的初始化過程中,MapOutputBuffer類會根據配置文件初始化內存buffer,我們來看下都有哪些參數:
  1. <span?style= ??
  2. final float float 0.8 final int );??
  3. if float 1.0 float 0.0 throw new if )?!=?sortmb)?{??
  4. throw new ,??
  5. class class ),?job);</span>??
partition:mapper的數據需要分配到reduce端的個數,由用戶的job指定,默認為1.
spillper:內存buf使用到此比例就會觸發spill,將內存中的數據flush成一個文件。默認為0.8
sortmb:內存buf的大小,默認100MB
indexCacheMemoryLimit:內存index的大小。默認為1024*1024
sorter:對mapper輸出的key的排序,默認是快排
?
內存buffer比較復雜,貼一張圖介紹一下這塊內存buf的結構:
當一對keyvalue寫入時首先會從wrap buf的右側開始往左寫,同時,會把一條keyvalue的meta信息(partition,keystart,valuestart)寫入到最左邊的 index區域。當wrap buf大小達到spill的觸發比例后會block寫入,挖出一部分數據開始spill,直到spill完成后才能繼續寫,不過寫入位置不會置零,而是類 似循環buf那樣,在spill掉數據后可以重復利用內存中的buf區域。
?
這里單獨講一下partition:
  1. <span?style= ??
  2. public void throws ????}</span>??

在keyvalue對寫入MapOutputBuffer時會調用 partitioner.getPartition方法計算partition即應該分配到哪個reducer,這里的partition只是在內存的 buf的index區寫入一條記錄而已,和下一個部分的partition不一樣哦。看下默認的partitioner:HashPartition

  1. <span?style= ??
  2. public int int return ??}</span>??

HashPartition只是把key hash后按reduceTask的個數取模, 因此一般來說,不同的key分配到哪個reducer是隨即的!所以,reducer內的所有數據是有序的,但reducer之間的數據卻是亂序的!要想數據整體排序,要不只設一個reducer,要不使用TotalOrderPartitioner!

?
4.Partition Sort Store
在第四步中,partition是和sort一起做的,負責Spill的線程在拿到一段內存buf后會調用QuickSort的sort方法進行內存中的快排。
  1. <span?style= this ,?mstart,?mend,?reporter);</span>??
排序的算法是先按keyvalue記錄的partition排序后按key的compare方法:
  1. <span?style= public int final int final int final int final int final int final int ??
  2. if return ??
  3. return ????}</span>??
因此,mapper輸出的keyvalue首先是按partition聚合。而我們如果指定key的compare方法會在這里生效并進行排序。最后,一次spill的輸出文件類似下圖。
在對內存中的buf排序后開始寫文件。
  1. <span?style= for int ;?i?<?partitions;?++i)?{??
  2. null try long new if null ??
  3. new while final int else int while ??
  4. ??
  5. if new ????????????}</span>??
如果job沒有定義combiner則直接寫文件, 如果有combiner則在這里進行combine。
在生成spill文件后還會將此次spillRecord的記錄寫在一個index文件中。
  1. <span?style= ??????????spillRec.writeToFile(indexFilename,?job);</span>??
  1. <span?style= ????????????spillRec.putIndex(rec,?i);</span>??
?
5.merge
當mapper執行完畢后,就進入merge階段。首先看下相關的配置參數:
  1. <span?style= int );</span>??
mergeFactor:同時merge的文件數。
?
merge階段的目的是將多個spill生成的中間文件合并為一個輸出文件,這里的合并不同 于combiner,無論有沒有配置combiner這里的merge都會執行。merge階段的輸出是一個數據文件 MapFinalOutputFile和一個index文件。看下相關代碼:
  1. <span?style= new null ??
  2. long new if null else ??????????}</span>??
說下merge的算法。每個spill生成的文件中keyvalue都是有序的,但不同的文 件卻是亂序的,類似多個有序文件的多路歸并算法。Merger分別取出需要merge的spillfile的最小的keyvalue,放入一個內存堆中, 每次從堆中取出一個最小的值,并把此值保存到merge的輸出文件中。這里和hbase中scan的算法非常相似,在分布式系統中多路歸并排序真是當紅小 生啊!
這里merge時不同的partition的key是不會比較的,只有相同的partition的keyvalue才會進行排序和合并。最后的輸出文件類似下圖。
如果用戶定義了combiner,在merge的過程中也會進行combine,因為雖然第 四步中combine過但那只是部分輸入的combine,在merge時仍然需要combine。這里有人問了,既然這里有combiner,為啥在 spill輸出時還要combine納,我認為是因為每次combine都會大大減少輸出文件的大小,spill時就combine能減少一定的IO操 作。
?
在merge完后會把不同partition的信息保存進一個index文件以便之后reducer來拉自己部分的數據。
  1. <span?style= ??
  2. ??????????spillRec.putIndex(rec,?parts);</span>??

最后,我們再對mapper過程中的要點總結一下:
1.對map輸出<key,value>的分區(partition)是在寫入內存buf前就做好的了,方法是對key的hash。我們可以通過繼承Partitioner類自己實現分區,將自己想要的數據分到同一個reducer中。
2.寫入內存buf速度是非常快的,但spill過程會block寫入。因此,對內存buf相關參數的調優是mapreduce調優的重點之一。
3.對數據的排序是基于MapOutKey排序的,因此,我們可以重載對應的方法實現customize的排序順序
4.combine在spill和merge中都是進行。多次的combine會減少mapreduce中的IO操作,如果使用得當會很好的提高性能。但需要注意的是要深刻理解combine的意義,比如平均值就不適合用combine。

hadoop核心邏輯shuffle代碼分析-map端


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 香蕉在线精品亚洲第一区 | 色婷婷中文网 | 欧美另类交视频 | 国产精品久久久亚洲动漫 | 一级毛片私人影院老司机 | 久久精品国产400部免费看 | 韩国一级理黄论片 | 国产日韩欧美91 | 另类日本人xxxxbbbb | 一级女毛片 | 日本国产一区 | 精品96在线观看影院 | 国产精品视频成人 | 亚洲人成自拍网站在线观看忄 | 日本不卡视频免费 | 四虎影视永久免费 | 久久久精品2021免费观看 | 中国免费毛片 | 国内视频自拍在线视频 | 久草视频网 | 欧美精品免费在线观看 | 亚洲午夜片子大全精品 | 日本一级在线观看视频播放 | 亚洲午夜久久久久中文字幕久 | 一区二区三区四区在线观看视频 | 噜鲁射图片| 亚洲欧美精品中字久久99 | 欧美日韩国产精品va | 欧美wwwwww| 亚洲欧美日韩专区一 | 久久免费国产视频 | 国产精品成人观看视频免费 | 精品在线观看一区 | 国产精品成人va | 99在线观看精品视频 | 色人阁在线 | 9热在线精品视频观看 | 亚洲毛片网站 | 黑人特级粗α级毛片 | 人成午夜免费大片在线观看 | 日本不卡一区二区 |