亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Hadoop 編寫(xiě)WordCount

系統(tǒng) 2424 0

本文發(fā)表于本人 博客

????前面幾次講了關(guān)于Hadoop的環(huán)境搭建、HDFS操作,今天接著繼續(xù)。本來(lái)Hadoop源碼中就有一個(gè)例子WordCount,但是今天我們來(lái)自己實(shí)現(xiàn)一個(gè)加深對(duì)這個(gè)Mapper、Reducer的理解,如有不對(duì)歡迎指正。

????我們先來(lái)梳理一下思路,對(duì)于自定義Mapper以及Reducer,我們先要覆蓋其map以及reduce函數(shù),然后按照相關(guān)步驟比如設(shè)置輸入文件目錄、輸入文件格式化類、設(shè)置自定義Mapper、分區(qū)、排序、分組、規(guī)約、設(shè)置自定義Reducer等等。這里我們把輸入文件的使用空格分割(也可以用制表符來(lái)),下面是自定義Mapper類MyMapper:

    import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;



public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    

    @Override

    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

        String[] splied = value.toString().split(" ");

        for (int i = 0; i < splied.length; i++) {

            String lineWord = splied[i];

            context.write(new Text(lineWord), new LongWritable(1));

        }

    }

}
  

這里我選擇的是新的API,相關(guān)庫(kù)基本是在org.apache.hadoop.mapreduce下,舊API是在org.apache.hadoop.mapred下,包括一些引用庫(kù)也是這樣。自定義MyMapper是泛型繼承Mapper,其中參數(shù) key\value 是Hadoop內(nèi)部類型,它不支持java的基本類型這里我們需要注意下為什么不選擇java的基本類型呢,原因是不需要其它額外是操作,而且本身需要序列化反序列化并提升其性能所以加入了hadoop的類型放棄java的基本類型。關(guān)于hadoop key\value 跟java基本類型相互轉(zhuǎn)換的問(wèn)題也很簡(jiǎn)單,從java基本類型轉(zhuǎn)換至hadoop的 key\value 的話直接new帶參就可以了,從hadoop的key\value類型轉(zhuǎn)換至java的基本類型使用get方法就可以了!如:

    LongWritable lw = new LongWritable(1L);

long temp = lw.get();
  

接下來(lái)繼續(xù)看自定義Reducer類MyReduce:

    import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;



public class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {



    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        long count = 0L;

        for(LongWritable value: values) {

            count += value.get();

        }

        context.write(key, new LongWritable(count));

    }

}
  

這個(gè)跟上面類似了,再來(lái)看看main方法的如何執(zhí)行的!

        

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;



import com.sun.org.apache.xpath.internal.axes.HasPositionalPredChecker;



public class Test {

    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/output/";

    static final String INPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/input/test.txt";

    

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, Test.class.getSimpleName());        

        deleteOutputFile(OUTPUT_DIR);

        

        //1設(shè)置輸入目錄

        FileInputFormat.setInputPaths(job, INPUT_DIR);

        //2設(shè)置輸入格式化類

        job.setInputFormatClass(TextInputFormat.class);

        //3設(shè)置自定義Mapper以及鍵值類型

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(LongWritable.class);

        //4分區(qū)

        job.setPartitionerClass(HashPartitioner.class);

        job.setNumReduceTasks(1);

        //5排序分組

        //6設(shè)置在自定義Reduce以及鍵值類型

        job.setReducerClass(MyReduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        //7設(shè)置輸出目錄

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));

        //8提交job

        job.waitForCompletion(true);

    }

    

    static void deleteOutputFile(String path) throws Exception{

        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);

        if(fs.exists(new Path(path))){

            fs.delete(new Path(path));

        }

    }

}
  

執(zhí)行的時(shí)候先會(huì)輸出上次執(zhí)行過(guò)的輸出目錄。然后就按照步驟:

    1.設(shè)置輸入文件目錄;

2.輸入文件格式化類;

3.設(shè)置自定義Mapper以及其鍵值類型;

4.分區(qū);

5.排序;

6.分組;

7.規(guī)約;

8.設(shè)置自定義Reducer以及其鍵值類型;

9.設(shè)置輸出目錄;

10.代碼提交至JobTracker。
  

當(dāng)然這過(guò)程中有些是可以省略的比如輸出文件格式化類。從這個(gè)例子我們可以得出:既然可以設(shè)置自定義Mapper以及自定義Reducer,那么也應(yīng)該可以設(shè)置自定義的輸入文件格式化類以及分區(qū)、排序、分組、規(guī)約等等,這個(gè)以后會(huì)有相關(guān)的筆記現(xiàn)在這里只是寫(xiě)個(gè)簡(jiǎn)單的例子。我們編寫(xiě)一個(gè)文件如下并把它上傳至hdfs://hadoop-master:9000/mapreduce/input/test.txt:

    luoliang me

asura asura.com luoliang

me
  

然后執(zhí)行main函數(shù),將會(huì)在hdfs://hadoop-master:9000/mapreduce/output/目錄下輸出一個(gè)類似part-*的文件,我們可以使用如下命令查看:

    hadoop fs -text /output/part-*
  

此時(shí)會(huì)輸出:

    asura 1

asura.com 1

luoliang 2

me 2
  

現(xiàn)在文件是輸出了也對(duì)比下是正確,但是腦子還是一片空白,不知道其怎么做到的,那么這個(gè)就是關(guān)于mapreduce的原理了,下面我也說(shuō)說(shuō)大概其原理:從把代碼提交至JobTracker開(kāi)始,它就會(huì)從指定的輸入文件路徑去獲取文件,這里支持多個(gè)文件以及二級(jí)目錄下的多個(gè)文件,這里獲取就是使用的HDFS api來(lái)操作了!把所有文件讀取出來(lái)之后按照指定的大小進(jìn)行分割I(lǐng)nputSplit,把分割好后的鍵值FileSplit(比如:<0,"luoliang me">,<13,"asura asura.com luoliang">)再轉(zhuǎn)化為RecordReader(比如<"luoliang",1>,<"luoliang",1>),此時(shí)全部轉(zhuǎn)換完畢后會(huì)每個(gè)都調(diào)用map函數(shù),map函數(shù)把數(shù)據(jù)寫(xiě)入到Mapper.Context中,再會(huì)對(duì)數(shù)據(jù)進(jìn)行分區(qū)排序分組規(guī)約,最后通過(guò)shuffle到達(dá)reduce端,這其中每個(gè)map的輸出數(shù)量是等于reduce的輸入數(shù)量。到達(dá)reduce端數(shù)據(jù)已經(jīng)發(fā)生了質(zhì)變了不在是<"luoliang",1>而是類似變成<"luoliang",{1,1}>這樣的鍵值數(shù)據(jù),這是我們需要迭代獲取總數(shù)量并在寫(xiě)會(huì)context中,計(jì)算完后輸出到指定的目錄。在這里由于有重復(fù)的單詞所以map函數(shù)的調(diào)用次數(shù)跟reduce函數(shù)調(diào)用次數(shù)是不同的。規(guī)約這個(gè)其實(shí)就是自定義reduce,但是這個(gè)不是必須有的因?yàn)槿绻墙y(tǒng)計(jì)關(guān)于類似平均數(shù)的問(wèn)題,數(shù)據(jù)在map端進(jìn)行規(guī)約了,雖然傳送時(shí)間以及處理時(shí)間減少性能提升了但是對(duì)于最終結(jié)果可能會(huì)有影響,所以這個(gè)規(guī)約要看具體情況才能使用。至于這個(gè) shuffle 一步還不是怎么了解需要多多再看看。

這次先到這里。堅(jiān)持記錄點(diǎn)點(diǎn)滴滴!

Hadoop 編寫(xiě)WordCount


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 真人一级一级特黄高清毛片 | 97久久人人爽人人爽人人 | 亚洲成在人线免费视频 | 亚洲黄色视屏 | 日韩a在线播放 | 中文字幕一区2区 | 国产一区二区在线观看免费 | 欧美男女视频 | 丁香色婷婷 | 热久久国产精品 | 欧美成人久久一级c片免费 欧美成人剧情中文字幕 | 欧美同房免姿势108费视频 | 尤物视频在线播放 | 国产欧美自拍 | 久久精品国产主播一区二区 | 国产精品自在线拍国产 | 国产日韩在线看 | 色久综合| jizz美女| 久久这里只有精品6 | 久久久久一| 亚洲aa在线 | 久久99精品综合国产首页 | 一级高清毛片 | 免费一级成人免费观看 | 亚洲一级毛片免费观看 | 国产成人一区二区三区精品久久 | 日本a毛片在线播放 | 久久免费观看视频 | 亚洲国产清纯 | 欧美爱爱视频网站 | 亚洲性夜夜综合久久麻豆 | 国产一区亚洲二区三区 | 日日噜噜夜夜狠狠久久aⅴ 日日噜噜夜夜狠狠久久丁香 | 色噜噜五月综合激情久久爱 | 欧美日韩一二三 | 日本精品久久久中文字幕 | 黑人一级毛片 | 99精品国产一区二区三区 | 日本高清一级做a爱过程免费视频 | 国产精品98福利小视频 |