亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

Map/Reduce中Join查詢實現

系統 2041 0

http://www.cnblogs.com/MengYan-LongYou/p/3360613.html

?

在做這個 Join 查詢的時候,必然涉及數據,我這里設計了 2 張表,分別較 data.txt info.txt ,字段之間以 /t 劃分。

data.txt 內容如下:

201001????1003????abc

201002????1005????def

201003????1006????ghi

201004????1003????jkl

201005????1004????mno

201006????1005????pqr

?

info.txt 內容如下:

?

1003????kaka

1004????da

1005????jue

1006????zhao

?

期望輸出結果:

1003????201001????abc????kaka

1003????201004????jkl????kaka

1004????201005????mno????da

1005????201002????def????jue

1005????201006????pqr????jue

1006????201003????ghi????zhao

?

四、 Map 代碼

首先是 map 的代碼,我貼上,然后簡要說說

?

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {

????????@Override

????????protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

?

????????????// 獲取輸入文件的全路徑和名稱

????????????String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

?

????????????if (pathName.contains("data.txt")) {

????????????????String values[] = value.toString().split("/t");

????????????????if (values.length < 3) {

????????????????????// data數據格式不規范,字段小于3,拋棄數據

????????????????????return;

????????????????} else {

????????????????????// 數據格式規范,區分標識為1

????????????????????TextPair tp = new TextPair(new Text(values[1]), new Text("1"));

????????????????????context.write(tp, new Text(values[0] + "/t" + values[2]));

????????????????}

????????????}

????????????if (pathName.contains("info.txt")) {

????????????????String values[] = value.toString().split("/t");

????????????????if (values.length < 2) {

????????????????????// data數據格式不規范,字段小于2,拋棄數據

????????????????????return;

????????????????} else {

????????????????????// 數據格式規范,區分標識為0

????????????????????TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

????????????????????context.write(tp, new Text(values[1]));

????????????????}

????????????}

????????}

????}

?

這里需要注意以下部分:

A pathName 是文件在 HDFS 中的全路徑 ( 例如: hdfs://M1:9000/MengYan/join/data/info.txt) ,可以以 endsWith() 的方法來判斷。

B 、資料表,也就是這里的 info.txt 需要放在前面,也就是標識號是 0. 否則無法輸出理想結果。

C Map 執行完成之后,輸出的中間結果如下:

1003,0????kaka

1004,0????da

1005,0????jue

1006,0????zhao

1003,1????201001????abc

1003,1????201004????jkl

1004,1????201005????mon

1005,1????201002????def

1005,1????201006????pqr

1006,1????201003????ghi

?

五、分區和分組

1 map 之后的輸出會進行一些分區的操作,代碼貼出來:

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

????????@Override

????????public int getPartition(TextPair key, Text value, int numParititon) {

????????????return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

????????}

????}

分區我在以前的文檔中寫過,這里不做描述了,就說是按照 map 輸出的符合 key 的第一個字段做分區關鍵字。分區之后,相同 key 會劃分到一個 reduce 中去處理(如果 reduce 設置是 1 ,那么就是分區有多個,但是還是在一個 reduce 中處理。但是結果會按照分區的原則排序)。分區后結果大致如下:

?

同一區:

1003,0????kaka

1003,1????201001????abc

1003,1????201004????jkl

?

?

同一區:

1004,0????da

1004,1????201005????mon

?

?

同一區:

1005,0????jue

1005,1????201002????def

1005,1????201006????pqr

?

?

同一區:

1006,0????zhao

1006,1????201003????ghi

?

2 、分組操作,代碼如下

?

public static class Example_Join_01_Comparator extends WritableComparator {

?

????????public Example_Join_01_Comparator() {

????????????super(TextPair.class, true);

????????}

?

????????@SuppressWarnings("unchecked")

????????public int compare(WritableComparable a, WritableComparable b) {

????????????TextPair t1 = (TextPair) a;

????????????TextPair t2 = (TextPair) b;

????????????return t1.getFirst().compareTo(t2.getFirst());

????????}

????}

?

分組操作就是把在相同分區的數據按照指定的規則進行分組的操作,就以上來看,是按照復合 key 的第一個字段做分組原則,達到忽略復合 key 的第二個字段值的目的,從而讓數據能夠迭代在一個 reduce 中。輸出后結果如下:

?

同一組:

1003,0????kaka

1003,0????201001????abc

1003,0????201004????jkl

?

同一組:

1004,0????da

1004,0????201005????mon

?

同一組:

1005,0????jue

1005,0????201002????def

1005,0????201006????pqr

?

同一組:

1006,0????zhao

1006,0????201003????ghi

?

六、 reduce 操作

貼上代碼如下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

????????protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,

????????????????InterruptedException {

????????????Text pid = key.getFirst();

????????????String desc = values.iterator().next().toString();

????????????while (values.iterator().hasNext()) {

????????????????context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));

????????????}

????????}

????}

1 、代碼比較簡單,首先獲取關鍵的 ID 值,就是 key 的第一個字段。

2 、獲取公用的字段,通過排組織后可以看到,一些共有字段是在第一位,取出來即可。

3 、遍歷余下的結果,輸出。

七、其他的支撐代碼

1 、首先是 TextPair 代碼,沒有什么可以細說的,貼出來:

public class TextPair implements WritableComparable<TextPair> {

????private Text first;

????private Text second;

?

????public TextPair() {

????????set(new Text(), new Text());

????}

?

????public TextPair(String first, String second) {

????????set(new Text(first), new Text(second));

????}

?

????public TextPair(Text first, Text second) {

????????set(first, second);

????}

?

????public void set(Text first, Text second) {

????????this.first = first;

????????this.second = second;

????}

?

????public Text getFirst() {

????????return first;

????}

?

????public Text getSecond() {

????????return second;

????}

?

????public void write(DataOutput out) throws IOException {

????????first.write(out);

????????second.write(out);

????}

?

????public void readFields(DataInput in) throws IOException {

????????first.readFields(in);

????????second.readFields(in);

????}

?

????public int compareTo(TextPair tp) {

????????int cmp = first.compareTo(tp.first);

????????if (cmp != 0) {

????????????return cmp;

????????}

????????return second.compareTo(tp.second);

????}

}

2 Job 的入口函數

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

????????Configuration conf = new Configuration();

????????GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

????????String[] otherArgs = parser.getRemainingArgs();

????????if (agrs.length < 3) {

????????????System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

????????????System.exit(2);

????????}

?

????????//conf.set("hadoop.job.ugi", "root,hadoop");

?

????????Job job = new Job(conf, "Example_Join_01");

????????// 設置運行的job

????????job.setJarByClass(Example_Join_01.class);

????????// 設置Map相關內容

????????job.setMapperClass(Example_Join_01_Mapper.class);

????????// 設置Map的輸出

????????job.setMapOutputKeyClass(TextPair.class);

????????job.setMapOutputValueClass(Text.class);

????????// 設置partition

????????job.setPartitionerClass(Example_Join_01_Partitioner.class);

????????// 在分區之后按照指定的條件分組

????????job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

????????// 設置reduce

????????job.setReducerClass(Example_Join_01_Reduce.class);

????????// 設置reduce的輸出

????????job.setOutputKeyClass(Text.class);

????????job.setOutputValueClass(Text.class);

????????// 設置輸入和輸出的目錄

????????FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

????????FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

????????FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

????????// 執行,直到結束就退出

????????System.exit(job.waitForCompletion(true) ? 0 : 1);

?

????}

?

八、總結

1 、這是個簡單的 join 查詢,可以看到,我在處理輸入源的時候是在 map 端做來源判斷。其實在 0.19 可以用 MultipleInputs.addInputPath() 的方法,但是它用了 JobConf 做參數。這個方法原理是多個數據源就采用多個 map 來處理。方法各有優劣。

2 、對于資源表,如果我們采用 0 1 這樣的模式來區分,資源表是需要放在前的。例如本例中 info.txt 就是資源表,所以標識位就是 0. 如果寫為 1 的話,可以試下,在分組之后,資源表對應的值放在了迭代器最后一位,無法追加在最后所有的結果集合中。

3 、關于分區,并不是所有的 map 都結束才開始的,一部分數據完成就會開始執行。同樣,分組操作在一個分區內執行,如果分區完成,分組將會開始執行,也不是等所有分區完成才開始做分組的操作。

Map/Reduce中Join查詢實現


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 午夜国产精品影院在线观看 | 老子影院午夜伦不卡亚洲 | 狠狠色伊人亚洲综合成人 | 伊人久久香蕉 | 香蕉免费一区二区三区在线观看 | 国产综合成人亚洲区 | 久久精品在 | 国产综合精品在线 | 在线观看中文字幕国产 | 91欧美| 久久99综合 | 亚洲欧美日韩国产精品第不页 | 四虎黄色影院 | 日韩在线观看中文字幕 | 二级毛片在线观看 | 欧美日韩中文亚洲v在线综合 | 国产亚洲欧美一区二区三区 | 日韩一区二区久久久久久 | 视频播放在线观看精品视频 | 精品久久网站 | 成人国产精品一区二区网站 | 欧美日本激情 | 日韩精品一区二区三区中文在线 | 亚洲国产二区 | 欧美ⅹxxxx18性欧美 | 成人在线亚洲 | 国产资源精品一区二区免费 | 色拍999| 91在线网| 精品一区二区三区的国产在线观看 | 久久人人精品 | 久久国产欧美另类久久久 | 亚洲伊人色一综合网 | 国产精品福利自产拍网站 | 亚洲黄色成人 | 久久亚洲综合中文字幕 | 亚洲午夜一区二区三区 | 尹人久久久香蕉精品 | 真实偷清晰对白在线视频 | 成人久草 | 国产美女午夜精品福利视频 |