public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List splits = new ArrayList(); List files = listStatus(job); for (FileStatus file : files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0L ) { FileSystem fs = path.getFileSystem(job.getConfiguration()); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0L , length); if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (bytesRemaining / splitSize > 1.1D ) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0L ) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); } } else { splits.add(makeSplit(path, 0L , length, blkLocations[ 0 ].getHosts())); } } else { splits.add(makeSplit(path, 0L, length, new String[0 ])); } } job.getConfiguration().setLong( "mapreduce.input.fileinputformat.numinputfiles" , files.size()); LOG.debug( "Total # of splits: " + splits.size()); return splits; }
?Yarn 下好像沒了1*下的由用戶設(shè)置預(yù)期的Map數(shù)
核心代碼 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); getFormatMinSplitSize 默認(rèn)返回1,getMinSplitSize 為用戶設(shè)置的最小分片數(shù), 如果用戶設(shè)置的大于1,則為用戶設(shè)置的最小分片數(shù) long maxSize = getMaxSplitSize(job); getMaxSplitSize為用戶設(shè)置的最大分片數(shù),默認(rèn)最大為9223372036854775807L long splitSize = computeSplitSize(blockSize, minSize, maxSize); protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
?
測試 文件大小 297M(311349250)
塊大小128M
測試代碼
測試1
?? FileInputFormat.setMinInputSplitSize(job, 301349250);
?? FileInputFormat.setMaxInputSplitSize(job, 10000);
測試后Map個(gè)數(shù)為1,由上面分片公式算出分片大小為301349250, 比?311349250小, 理論應(yīng)該為兩個(gè)map,? 再看分片函數(shù)
while (bytesRemaining / splitSize > 1.1D) {
?? ??? ??? ??? ??? ??? ?int blkIndex = getBlockIndex(blkLocations, length
?? ??? ??? ??? ??? ??? ??? ??? ?- bytesRemaining);
?? ??? ??? ??? ??? ??? ?splits.add(makeSplit(path, length - bytesRemaining,
?? ??? ??? ??? ??? ??? ??? ??? ?splitSize, blkLocations[blkIndex].getHosts()));
?? ??? ??? ??? ??? ??? ?bytesRemaining -= splitSize;
?? ??? ??? ??? ??? ?}
只要剩余的文件大小不超過分片大小的1.1倍, 則會(huì)分到一個(gè)分片中,避免開兩個(gè)MAP, 其中一個(gè)運(yùn)行數(shù)據(jù)太小,浪費(fèi)資源。
?
測試2
FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);
MAP 數(shù)為2
測試3
在原有的輸入目錄下,添加一個(gè)很小的文件,幾K,測試是否會(huì)合并
FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);
Map數(shù)變?yōu)榱?
看源代碼
for (FileStatus file : files) {
..
}
原來輸入是按照文件名來分片的,這個(gè)按照常理也能知道, 不同的文件內(nèi)容格式不同
?
總結(jié),分片過程大概為,先遍歷目標(biāo)文件,過濾部分不符合要求的文件, 然后添加到列表,然后按照文件名來切分分片 (大小為前面計(jì)算分片大小的公式, 最后有個(gè)文件尾可能合并,其實(shí)常寫網(wǎng)絡(luò)程序的都知道), 然后添加到分片列表,然后每個(gè)分片讀取自身對應(yīng)的部分給MAP處理
?
?
?
?
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
