public class WordCountApp { // 可以指定目錄,目錄下如果有二級目錄的話,是不會執(zhí)行的,只會執(zhí)行一級目錄. private static final String INPUT_PATH = "hdfs://hadoop1:9000/abd"; // 輸入路徑 private static final String OUT_PATH = "hdfs://hadoop1:9000/out"; // 輸出路徑,reduce作業(yè)輸出的結(jié)果是一個(gè)目錄 // _SUCCESS:在linux中,帶下劃線的這些文件一般都是被忽略不去處理的.表示作業(yè)執(zhí)行成功. // _logs:產(chǎn)生的日志文件. // part-r-00000:產(chǎn)生的是我們的輸出的文件.開始以part開始.r:reduce輸出的結(jié)果,map輸出的結(jié)果是m,00000是序號 public static void main(String[] args) { Configuration conf = new Configuration(); // 配置對象 try { FileSystem fileSystem = FileSystem.get( new URI(OUT_PATH), conf); fileSystem.delete( new Path(OUT_PATH), true ); Job job = new Job(conf, WordCountApp. class .getSimpleName()); // jobName:作業(yè)名稱 job.setJarByClass(WordCountApp. class ); FileInputFormat.setInputPaths(job, INPUT_PATH); // 指定數(shù)據(jù)的輸入 job.setMapperClass(MyMapper. class ); // 指定自定義map類 job.setMapOutputKeyClass(Text. class ); // 指定map輸出key的類型 job.setMapOutputValueClass(LongWritable. class ); // 指定map輸出value的類型 job.setReducerClass(MyReducer. class ); // 指定自定義Reduce類 job.setOutputKeyClass(Text. class ); // 設(shè)置Reduce輸出key的類型 job.setOutputValueClass(LongWritable. class ); // 設(shè)置Reduce輸出的value類型 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); // Reduce輸出完之后,就會產(chǎn)生一個(gè)最終的輸出,指定最終輸出的位置 job.waitForCompletion( true ); // 提交給jobTracker并等待結(jié)束 } catch (Exception e) { e.printStackTrace(); } } /** * 輸入的key標(biāo)示偏移量:這一行開始的字節(jié). 輸入的value:當(dāng)前的行文本的內(nèi)容. MapReduce執(zhí)行過程: * 在這里邊,我們的數(shù)據(jù)輸入來自于原始文件,數(shù)據(jù)輸出寫出到hdfs, 中間的一堆都是map輸出產(chǎn)生的臨時(shí)結(jié)果.存放在map運(yùn)行的linux磁盤上的, * 當(dāng)經(jīng)過shuffle時(shí),reduce就會通過http把map端的對應(yīng)數(shù)據(jù)給取過來. * mapred-default.xml中mapredcue.jobtracker * .root.dir,mapred.tmp.dir存儲map產(chǎn)生的結(jié)果. 作業(yè)運(yùn)行時(shí)產(chǎn)生這個(gè)目錄,作業(yè)運(yùn)行完之后它會刪除目錄. */ public static class MyMapper extends Mapper <LongWritable, Text, Text, LongWritable> { // 源文件有兩行記錄,解析源文件會產(chǎn)生兩個(gè)鍵值對.分別是<0,hello you>,<10,hello me>,所以map函數(shù)會被調(diào)用兩次. // 在計(jì)算機(jī)存儲的時(shí)候,是一維的結(jié)構(gòu). @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 為什么要把hadoop類型轉(zhuǎn)換為java類型? String line = value.toString(); String[] splited = line.split("\t" ); // 使用hashMap寫出去的優(yōu)勢:減少鍵值對出現(xiàn)的個(gè)數(shù). Map<String, Integer> hashMap = new HashMap<String, Integer> (); for (String word : splited) { // 在for循環(huán)體內(nèi),臨時(shí)變量word出現(xiàn)的此時(shí)是常量1 context.write( new Text(word), new LongWritable(1)); // 把每個(gè)單詞出現(xiàn)的次數(shù)1寫出去. } } } // map函數(shù)執(zhí)行結(jié)束后,map輸出的<k,v>一共有4個(gè).<hello,1>,<you,1>,<hello,1>,<me,1> // map把數(shù)據(jù)處理完之后,就會進(jìn)入reduce. // 在進(jìn)入shuffle之前,數(shù)據(jù)需要先進(jìn)行分區(qū).默認(rèn)只有一個(gè)區(qū). // 對每個(gè)不同分區(qū)中的數(shù)據(jù)進(jìn)行排序,分組. // 排序后的結(jié)果:<hello,1>,<hello,1>,<me,1>,<you,1> // 分組后的結(jié)果(相同key的value放在一個(gè)集合中):<hello,{1,1}>,<me,{1}>,<you,{1}> // 規(guī)約(可選) // map中的數(shù)據(jù)分發(fā)到reduce的過程稱作shuffle public static class MyReducer extends Reducer <Text, LongWritable, Text, LongWritable> { // 每一組調(diào)用一次reduce函數(shù),一共調(diào)用了三次 @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // count標(biāo)示單詞key在整個(gè)文件出現(xiàn)的次數(shù) // 分組的數(shù)量與reduce函數(shù)調(diào)用次數(shù)是相等的. // reduce函數(shù)調(diào)用次數(shù)與產(chǎn)生的<k,v>的數(shù)量拋開業(yè)務(wù),沒有任何關(guān)系! long count = 0L ; for (LongWritable times : values) { count += times.get(); } context.write(key, new LongWritable(count)); } } }
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
