亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

在map端使用關(guān)聯(lián)數(shù)組實現(xiàn)wordcount

系統(tǒng) 2100 0

  今天看Data-Intensive Text Processing with MapReduce 這本書的第三章的時候,里面有寫到在map端優(yōu)化wordcount。

  對數(shù)據(jù)密集型數(shù)據(jù)進(jìn)行分布式處理的時候,影響數(shù)據(jù)處理速度的非常重要的一個方面就是map的輸出中間結(jié)果,在傳送到reduce的過程中,很多的中間數(shù)據(jù)需要進(jìn)行交換以及包括一些相應(yīng)的處理,然后再交給相應(yīng)的reduce。其中中間數(shù)據(jù)需要在網(wǎng)絡(luò)中傳輸,另外中間數(shù)據(jù)在發(fā)送到網(wǎng)絡(luò)上之前還要寫到本地磁盤上,因為網(wǎng)絡(luò)帶寬和磁盤I/O是非常耗時的相比與其他的操作,所以減少中間數(shù)據(jù)的傳輸將會增加算法的執(zhí)行效率,通過使用combiner函數(shù)或者其他的方式減少key-value對的個數(shù)。下面是一個改進(jìn)的wordcount算法。

 基本的思想是:

  在map處理的時候定義一個關(guān)聯(lián)數(shù)組,然后對文檔進(jìn)行處理,將<word,次數(shù)>加入到關(guān)聯(lián)數(shù)組中,word存在,則將相應(yīng)的次數(shù)加1,不存在則直接加入到關(guān)聯(lián)數(shù)組中。所有的map任務(wù)結(jié)束后,然后再在run函數(shù)中輸出處理結(jié)果。

偽代碼:

class Mapper

  method Map(docid a,doc d)

??????????? H =new AssociativeArray

     for all term t 屬于doc? d? do

???????????????????? H{t}=H{t}+1;

???????????????? for all term t 屬于 H do

??????????????? EMIT(term t,count H{t})

class REDUCER

???? method REDUCE(term t,counts[c1,c2,...])

??????????????? sum=0

?????????????? for? all count c 屬于 counts[c1,c2,...]? do

?????????????????? sum+=c

???????????? EMIT(term t,count sum)

代碼如下:

      
        import
      
       java.io.IOException;
      
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;


public class Mapper extends
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {

int c;
HashMap<String,IntWritable> map= new HashMap<String,IntWritable>();
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String str=value.toString();
StringTokenizer token= new StringTokenizer(str);
while (token.hasMoreTokens()){
String value1=token.nextToken();
if (map.containsKey(value1)){
// System.out.println("ni");
int p=map.get(value1).get()+1;
map.remove(value1);
map.put(value1, new IntWritable(p));
}
else {
// System.out.println("ni");
map.put(value1, new IntWritable(1));
}
}
// TODO Auto-generated method stub

c++;
System.out.println(c);



}
@Override
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println("cleanup");
super .cleanup(context);
}

@Override
public void run(Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
super .run(context);
System.out.println("run");
Iterator it=map.entrySet().iterator();
while (it.hasNext()){
// System.out.println("nihe");
Map.Entry<String, IntWritable> entry=(Map.Entry<String, IntWritable>) it.next();
// System.out.println("nihe");
context.write( new Text(entry.getKey()), entry.getValue());

}

}

@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// System.out.println(context.getInputSplit().toString());
? ? ? ? // System.out.println(context.getJobID());
? // FileSplit input=(FileSplit)context.getInputSplit();
// String path=input.getPath().toString();
// Configuration conf= new Configuration();
? // System.out.println(input.getPath().toString());
?? // FileSystem fs=FileSystem.get(URI.create(path), conf);
// FSDataInputStream filein=fs.open(input.getPath());
?? //? LineReader in= new LineReader(filein,conf);
// Text line= new Text();
// ? int cd=in.readLine(line);
//?? System.out.println(line);
???? }
?}



      
        import
      
       java.io.IOException;
      

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;


public class Reducer extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum=0;
for (IntWritable it:values){
sum+=it.get();
}
context.write(key, new IntWritable(sum));
}




}



      
        import
      
       java.io.IOException;
      
import java.net.URI;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Word {

/**
*
@param args
*
@throws IOException
*
@throws ClassNotFoundException
*
@throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
Job job= new Job();
Configuration conf= new Configuration();

Path in= new Path(args[0]);
Path out= new Path(args[1]);

FileSystem fs=FileSystem.get(URI.create(args[1]), conf);
fs.delete(out);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(Mapper. class );
job.setMapOutputKeyClass(Text. class );
job.setMapOutputValueClass(IntWritable. class );



job.waitForCompletion( false );



}

}




在map端使用關(guān)聯(lián)數(shù)組實現(xiàn)wordcount


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 狠狠色噜噜狠狠狠狠69 | 国产亚洲在线 | 亚洲视频免费在线看 | 久久久久久久久综合 | 四虎精品 | 在线不卡免费视频 | 丁香久久 | 99久久这里只精品麻豆 | 亚洲国产欧美久久香综合 | 毛片高清 | 在线视频免费国产成人 | 国产精品亚洲精品观看不卡 | 亚洲欧美日韩一区二区在线观看 | jazzjazz国产精品久久 | 久久精品免看国产成 | 99久久做夜夜爱天天做精品 | 精品综合久久久久久97超人该 | 5060网午夜一级毛片在线看 | 黄色影院在线观看视频 | 九九热精品视频在线观看 | 色人阁网站 | 国产精品麻豆久久久 | 久久精品美女 | 中文字幕亚洲综合 | 一级毛片大全 | 国产日产欧产麻豆精品精品推荐 | 成人看片免费无限观看视频 | 日韩女同视频 | 色一情 | 这里只有精品22 | 青青草好吊色 | 日韩视频区 | 婷婷色九月综合激情丁香 | 色骚综合 | aaaa级片| 久久久精品波多野结衣 | 99久久精品国产麻豆 | 国产 日韩 欧美 亚洲 | 亚洲观看视频 | 综合亚洲网 | 国产精品久久精品福利网站 |