在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本文中,將針對(duì)一個(gè)具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開(kāi)發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序。
我們先來(lái)看看 Hadoop 自帶的示例程序 WordCount,這個(gè)程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。
見(jiàn)代碼清單1。這個(gè)類實(shí)現(xiàn) Mapper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個(gè)字符串拆成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡(jiǎn)單地將其輸出的 <key,value> 對(duì)往 OutputCollector 中一丟即可,剩余的事框架自會(huì)幫你處理好。
代碼中 LongWritable, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。
代碼清單1
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } |
見(jiàn)代碼清單 2。這個(gè)類實(shí)現(xiàn) Reducer 接口中的 reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個(gè) Iterator, 遍歷這個(gè) Iterator, 就可以得到屬于同一個(gè) key 的所有 value. 此處,key 是一個(gè)單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個(gè)單詞的總的出現(xiàn)次數(shù)。
代碼清單 2
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } |
在 Hadoop 中一次計(jì)算任務(wù)稱之為一個(gè) job, 可以通過(guò)一個(gè) JobConf 對(duì)象設(shè)置如何運(yùn)行這個(gè) job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類, 使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會(huì)處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫到輸出路徑下。
然后將 JobConf 對(duì)象作為參數(shù),調(diào)用 JobClient 的 runJob, 開(kāi)始執(zhí)行這個(gè)計(jì)算任務(wù)。至于 main 方法中使用的 ToolRunner 是一個(gè)運(yùn)行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。
代碼清單 3
public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args[0])); conf.setOutputPath(new Path(args[1])); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); } int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } } |
以上就是 WordCount 程序的全部細(xì)節(jié),簡(jiǎn)單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。
4. 通過(guò) JobConf 定制計(jì)算任務(wù)
通過(guò)上文所述的 JobConf 對(duì)象,程序員可以設(shè)定各種參數(shù),定制如何完成一個(gè)計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個(gè) java 接口,通過(guò)注入這些接口的特定實(shí)現(xiàn),可以定義一個(gè)計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計(jì)算程序時(shí)做到輕車熟路,游刃有余,明白哪些類是需要自己實(shí)現(xiàn)的,哪些類用 Hadoop 的缺省實(shí)現(xiàn)即可。表一是對(duì) JobConf 對(duì)象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說(shuō)明,表中第一列中的參數(shù)在 JobConf 中均會(huì)有相應(yīng)的 get/set 方法,對(duì)程序員來(lái)說(shuō),只有在表中第三列中的缺省值無(wú)法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對(duì)表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會(huì)有一些其它的實(shí)現(xiàn),我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。
表一 JobConf 常用可定制參數(shù)
InputFormat OutputFormat OutputKeyClass OutputValueClass MapperClass CombinerClass ReducerClass InputPath OutputPath MapOutputKeyClass MapOutputValueClass OutputKeyComparator PartitionerClass 參數(shù) 作用 缺省值 其它實(shí)現(xiàn)
將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集 InputSplits, 每一個(gè) InputSplit 將由一個(gè) Mapper 負(fù)責(zé)處理。此外 InputFormat 中還提供一個(gè) RecordReader 的實(shí)現(xiàn), 將一個(gè) InputSplit 解析成 <key,value> 對(duì)提供給 map 函數(shù)。 |
TextInputFormat
(針對(duì)文本文件,按行將文本文件切割成 InputSplits, 并用 LineRecordReader 將 InputSplit 解析成 <key,value> 對(duì),key 是行在文件中的位置,value 是文件中的一行) |
SequenceFileInputFormat |
提供一個(gè) RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果 |
TextOutputFormat
(用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個(gè) <key,value> 對(duì)一行,key 和 value 之間用 tab 分隔) |
SequenceFileOutputFormat |
輸出的最終結(jié)果中 key 的類型 | LongWritable | |
輸出的最終結(jié)果中 value 的類型 | Text | |
Mapper 類,實(shí)現(xiàn) map 函數(shù),完成輸入的 <key,value> 到中間結(jié)果的映射 |
IdentityMapper
(將輸入的 <key,value> 原封不動(dòng)的輸出為中間結(jié)果) |
LongSumReducer,
LogRegexMapper, InverseMapper |
實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并 |
null
(不對(duì)中間結(jié)果中的重復(fù) key 做合并) |
|
Reducer 類,實(shí)現(xiàn) reduce 函數(shù),對(duì)中間結(jié)果做合并,形成最終結(jié)果 |
IdentityReducer
(將中間結(jié)果直接輸出為最終結(jié)果) |
AccumulatingReducer, LongSumReducer |
設(shè)定 job 的輸入目錄, job 運(yùn)行時(shí)會(huì)處理輸入目錄下的所有文件 | null | |
設(shè)定 job 的輸出目錄,job 的最終結(jié)果會(huì)寫入輸出目錄下 | null | |
設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型 | 如果用戶沒(méi)有設(shè)定的話,使用 OutputKeyClass | |
設(shè)定 map 函數(shù)輸出的中間結(jié)果中 value 的類型 | 如果用戶沒(méi)有設(shè)定的話,使用 OutputValuesClass | |
對(duì)結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器 | WritableComparable | |
對(duì)中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個(gè) Reducer 負(fù)責(zé)處理。 |
HashPartitioner
(使用 Hash 函數(shù)做 partition) |
KeyFieldBasedPartitioner PipesPartitioner |
現(xiàn)在你對(duì) Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來(lái)把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標(biāo)點(diǎn)符號(hào)與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出現(xiàn)頻率的降序進(jìn)行排序。
1.修改 Mapper 類,實(shí)現(xiàn)目標(biāo)(1)
實(shí)現(xiàn)很簡(jiǎn)單,見(jiàn)代碼清單4中的注釋。
代碼清單 4
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="[^\\w]"; //正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString().toLowerCase(); //全部轉(zhuǎn)為小寫字母 line = line.replaceAll(pattern, " "); //將非0-9, a-z, A-Z的字符替換為空格 StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } |
用一個(gè)并行計(jì)算任務(wù)顯然是無(wú)法同時(shí)完成單詞詞頻統(tǒng)計(jì)和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個(gè)任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個(gè)任務(wù)(排序)的輸入,順序執(zhí)行兩個(gè)并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個(gè)排序任務(wù)并運(yùn)行之。
在 Hadoop 中要實(shí)現(xiàn)排序是很簡(jiǎn)單的,因?yàn)樵?MapReduce 的過(guò)程中,會(huì)把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給 R 個(gè) Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會(huì)有一個(gè)按 key 進(jìn)行排序的過(guò)程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)按詞頻排序,我們指定使用 InverseMapper 類作為排序任務(wù)的 Mapper 類( sortJob.setMapperClass(InverseMapper.class );),這個(gè)類的 map 函數(shù)簡(jiǎn)單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序的最終結(jié)果。我們無(wú)需指定 Reduce 類,Hadoop 會(huì)使用缺省的 IdentityReducer 類,將中間結(jié)果原樣輸出。
還有一個(gè)問(wèn)題需要解決: 排序任務(wù)中的 Key 的類型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class)), Hadoop 默認(rèn)對(duì) IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個(gè) IntWritableDecreasingComparator 類, 并指定使用這個(gè)自定義的 Comparator 類對(duì)輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)
詳見(jiàn)代碼清單 5 及其中的注釋。
代碼清單 5
public int run(String[] args) throws Exception { Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE))); //定義一個(gè)臨時(shí)目錄 JobConf conf = new JobConf(getConf(), WordCount.class); try { conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args[0])); conf.setOutputPath(tempDir); //先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫到臨時(shí)目 //錄中, 下一個(gè)排序任務(wù)以臨時(shí)目錄為輸入目錄。 conf.setOutputFormat(SequenceFileOutputFormat.class); JobClient.runJob(conf); JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName("sort"); sortJob.setInputPath(tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); //將 Reducer 的個(gè)數(shù)限定為1, 最終輸出的結(jié)果 //文件就是一個(gè)。 sortJob.setOutputPath(new Path(args[1])); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class); JobClient.runJob(sortJob); } finally { FileSystem.get(conf).delete(tempDir); //刪除臨時(shí)目錄 } return 0; } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } |
在 Eclipse 環(huán)境下進(jìn)行開(kāi)發(fā)和調(diào)試
在 Eclipse 環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開(kāi)發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個(gè) Eclipse plugin 可以簡(jiǎn)化開(kāi)發(fā)和部署 Hadoop 并行程序的過(guò)程。基于這個(gè) plugin, 可以在 Eclipse 中創(chuàng)建一個(gè) Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類開(kāi)發(fā)的向?qū)В梢源虬?JAR 文件,部署一個(gè) Hadoop MapReduce 應(yīng)用程序到一個(gè) Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過(guò)一個(gè)專門的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)。
可在 IBM alphaWorks 網(wǎng)站下載這個(gè) MapReduce Tool , 或在本文的下載清單中下載。將下載后的壓縮包解壓到你 Eclipse 安裝目錄,重新啟動(dòng) Eclipse 即可使用了。
點(diǎn)擊 Eclipse 主菜單上 Windows->Preferences, 然后在左側(cè)選擇 Hadoop Home Directory,設(shè)定你的 Hadoop 主目錄,如圖一所示:
圖 1

創(chuàng)立一個(gè) MapReduce Project
點(diǎn)擊 Eclipse 主菜單上 File->New->Project, 在彈出的對(duì)話框中選擇 MapReduce Project, 輸入 project name 如 wordcount, 然后點(diǎn)擊 Finish 即可。,如圖 2 所示:
圖 2

此后,你就可以象一個(gè)普通的 Eclipse Java project 那樣,添加入 Java 類,比如你可以定義一個(gè) WordCount 類,然后將本文代碼清單1,2,3中的代碼寫到此類中,添加入必要的 import 語(yǔ)句 ( Eclipse 快捷鍵 ctrl+shift+o 可以幫你),即可形成一個(gè)完整的 wordcount 程序。
在我們這個(gè)簡(jiǎn)單的 wordcount 程序中,我們把全部的內(nèi)容都放在一個(gè) WordCount 類中。實(shí)際上 IBM MapReduce tools 還提供了幾個(gè)實(shí)用的向?qū)?( wizard ) 工具,幫你創(chuàng)建單獨(dú)的 Mapper 類,Reducer 類,MapReduce Driver 類(就是代碼清單3中那部分內(nèi)容),在編寫比較復(fù)雜的 MapReduce 程序時(shí),將這些類獨(dú)立出來(lái)是非常有必要的,也有利于在不同的計(jì)算任務(wù)中重用你編寫的各種 Mapper 類和 Reducer 類。
如圖三所示,設(shè)定程序的運(yùn)行參數(shù):輸入目錄和輸出目錄之后,你就可以在 Eclipse 中運(yùn)行 wordcount 程序了,當(dāng)然,你也可以設(shè)定斷點(diǎn),調(diào)試程序。
圖 3

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
