getSplits(JobContextjob)throwsIOException{longminSize=Math.max(getFormatMinSplitSize(),getMinSplitSize(job));longmaxSize=getMaxSplitSize(job);Listsplits=newArrayList();Listfiles=listStatus(job);for(FileStatus" />

亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Yarn下Map數(shù)控制

系統(tǒng) 1736 0
      
        public
      
       List<InputSplit> getSplits(JobContext job) 
      
        throws
      
      
         IOException {

        
      
      
        long
      
       minSize =
      
         Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

        
      
      
        long
      
       maxSize =
      
         getMaxSplitSize(job);



        List splits 
      
      = 
      
        new
      
      
         ArrayList();

        List files 
      
      =
      
         listStatus(job);

        
      
      
        for
      
      
         (FileStatus file : files) {

            Path path 
      
      =
      
         file.getPath();

            
      
      
        long
      
       length =
      
         file.getLen();

            
      
      
        if
      
       (length != 0L
      
        ) {

                FileSystem fs 
      
      =
      
         path.getFileSystem(job.getConfiguration());

                BlockLocation[] blkLocations 
      
      =
      
         fs.getFileBlockLocations(file,

                        
      
      0L
      
        , length);

                
      
      
        if
      
      
         (isSplitable(job, path)) {

                    
      
      
        long
      
       blockSize =
      
         file.getBlockSize();

                    
      
      
        long
      
       splitSize =
      
         computeSplitSize(blockSize, minSize,

                            maxSize);



                    
      
      
        long
      
       bytesRemaining =
      
         length;

                    
      
      
        while
      
       (bytesRemaining / splitSize > 1.1D
      
        ) {

                        
      
      
        int
      
       blkIndex =
      
         getBlockIndex(blkLocations, length

                                
      
      -
      
         bytesRemaining);

                        splits.add(makeSplit(path, length 
      
      -
      
         bytesRemaining,

                                splitSize, blkLocations[blkIndex].getHosts()));



                        bytesRemaining 
      
      -=
      
         splitSize;

                    }



                    
      
      
        if
      
       (bytesRemaining != 0L
      
        ) {

                        
      
      
        int
      
       blkIndex =
      
         getBlockIndex(blkLocations, length

                                
      
      -
      
         bytesRemaining);

                        splits.add(makeSplit(path, length 
      
      -
      
         bytesRemaining,

                                bytesRemaining,

                                blkLocations[blkIndex].getHosts()));

                    }

                } 
      
      
        else
      
      
         {

                    splits.add(makeSplit(path, 
      
      0L
      
        , length,

                            blkLocations[
      
      0
      
        ].getHosts()));

                }

            } 
      
      
        else
      
      
         {

                splits.add(makeSplit(path, 
      
      0L, length, 
      
        new
      
       String[0
      
        ]));

            }

        }



        job.getConfiguration().setLong(

                
      
      "mapreduce.input.fileinputformat.numinputfiles"
      
        , files.size());

        LOG.debug(
      
      "Total # of splits: " +
      
         splits.size());

        
      
      
        return
      
      
         splits;

    }
      
    

?Yarn 下好像沒了1*下的由用戶設(shè)置預(yù)期的Map數(shù)

      核心代碼



long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));



getFormatMinSplitSize 默認(rèn)返回1,getMinSplitSize 為用戶設(shè)置的最小分片數(shù), 如果用戶設(shè)置的大于1,則為用戶設(shè)置的最小分片數(shù)

long maxSize = getMaxSplitSize(job);



getMaxSplitSize為用戶設(shè)置的最大分片數(shù),默認(rèn)最大為9223372036854775807L



long splitSize = computeSplitSize(blockSize, minSize,

                            maxSize);



protected long computeSplitSize(long blockSize, long minSize, long maxSize) {

        return Math.max(minSize, Math.min(maxSize, blockSize));

    }


    

?

測試 文件大小 297M(311349250)

塊大小128M

測試代碼

測試1

?? FileInputFormat.setMinInputSplitSize(job, 301349250);
?? FileInputFormat.setMaxInputSplitSize(job, 10000);

測試后Map個(gè)數(shù)為1,由上面分片公式算出分片大小為301349250, 比?311349250小, 理論應(yīng)該為兩個(gè)map,? 再看分片函數(shù)

while (bytesRemaining / splitSize > 1.1D) {
?? ??? ??? ??? ??? ??? ?int blkIndex = getBlockIndex(blkLocations, length
?? ??? ??? ??? ??? ??? ??? ??? ?- bytesRemaining);
?? ??? ??? ??? ??? ??? ?splits.add(makeSplit(path, length - bytesRemaining,
?? ??? ??? ??? ??? ??? ??? ??? ?splitSize, blkLocations[blkIndex].getHosts()));

?? ??? ??? ??? ??? ??? ?bytesRemaining -= splitSize;
?? ??? ??? ??? ??? ?}

只要剩余的文件大小不超過分片大小的1.1倍, 則會(huì)分到一個(gè)分片中,避免開兩個(gè)MAP, 其中一個(gè)運(yùn)行數(shù)據(jù)太小,浪費(fèi)資源。

?

測試2

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);

FileInputFormat.setMaxInputSplitSize(job, 10000);

MAP 數(shù)為2

測試3

在原有的輸入目錄下,添加一個(gè)很小的文件,幾K,測試是否會(huì)合并

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);

Map數(shù)變?yōu)榱?

看源代碼

for (FileStatus file : files) {

..

}

原來輸入是按照文件名來分片的,這個(gè)按照常理也能知道, 不同的文件內(nèi)容格式不同

?

總結(jié),分片過程大概為,先遍歷目標(biāo)文件,過濾部分不符合要求的文件, 然后添加到列表,然后按照文件名來切分分片 (大小為前面計(jì)算分片大小的公式, 最后有個(gè)文件尾可能合并,其實(shí)常寫網(wǎng)絡(luò)程序的都知道), 然后添加到分片列表,然后每個(gè)分片讀取自身對應(yīng)的部分給MAP處理

?

?

?

?

?

Yarn下Map數(shù)控制


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦?。?!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 国产日产欧美一区二区三区 | 色综合久久中文字幕网 | 久综合网| 性做久久久久久久久男女 | 亚1洲二区三区四区免费 | 精品国偷自产在线 | 深夜成人影院 | 奇米777色| 一级特黄色大片 | 在线精品亚洲欧洲第一页 | 国产精品欧美一区二区 | 久久久久久网站 | 四虎网址在线观看 | 一区二区三区在线 | 网站 | 国产一区私人高清影院 | 色婷婷综合久久久久中文 | 在线国产中文字幕 | 亚洲精品一级一区二区三区 | 女人十八毛片免费特黄 | 神马影院888不卡院 神马影院不卡不卡在线观看 | 男人的网站在线观看 | 嫩草影院麻豆久久视频 | 国产成人精品男人免费 | 亚洲色婷婷综合开心网 | 在线观看99 | 天天综合天天干 | 国产精品无码久久av | 99久久国产综合精品成人影院 | 日本一区二区三区免费在线观看 | 久久影院在线 | 国产伦理久久精品久久久久 | 日韩va亚洲va欧美va浪潮 | 欧美综合社区 | 天天碰免费视频 | 女人一级毛片免费观看 | 99久久综合 | 青青久草在线视频 | 男人天堂视频在线观看 | 欧美成人毛片 | 久久免费视频1 | 日本一级毛片大片免费 |