http://www.cnblogs.com/MengYan-LongYou/p/3360613.html
?
在做這個 Join 查詢的時候,必然涉及數據,我這里設計了 2 張表,分別較 data.txt 和 info.txt ,字段之間以 /t 劃分。
data.txt 內容如下:
201001????1003????abc
201002????1005????def
201003????1006????ghi
201004????1003????jkl
201005????1004????mno
201006????1005????pqr
?
info.txt 內容如下:
?
1003????kaka
1004????da
1005????jue
1006????zhao
?
期望輸出結果:
1003????201001????abc????kaka
1003????201004????jkl????kaka
1004????201005????mno????da
1005????201002????def????jue
1005????201006????pqr????jue
1006????201003????ghi????zhao
?
四、 Map 代碼
首先是 map 的代碼,我貼上,然后簡要說說
?
public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
????????@Override
????????protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
?
????????????// 獲取輸入文件的全路徑和名稱
????????????String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
?
????????????if (pathName.contains("data.txt")) {
????????????????String values[] = value.toString().split("/t");
????????????????if (values.length < 3) {
????????????????????// data數據格式不規范,字段小于3,拋棄數據
????????????????????return;
????????????????} else {
????????????????????// 數據格式規范,區分標識為1
????????????????????TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
????????????????????context.write(tp, new Text(values[0] + "/t" + values[2]));
????????????????}
????????????}
????????????if (pathName.contains("info.txt")) {
????????????????String values[] = value.toString().split("/t");
????????????????if (values.length < 2) {
????????????????????// data數據格式不規范,字段小于2,拋棄數據
????????????????????return;
????????????????} else {
????????????????????// 數據格式規范,區分標識為0
????????????????????TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
????????????????????context.write(tp, new Text(values[1]));
????????????????}
????????????}
????????}
????}
?
這里需要注意以下部分:
A 、 pathName 是文件在 HDFS 中的全路徑 ( 例如: hdfs://M1:9000/MengYan/join/data/info.txt) ,可以以 endsWith() 的方法來判斷。
B 、資料表,也就是這里的 info.txt 需要放在前面,也就是標識號是 0. 否則無法輸出理想結果。
C 、 Map 執行完成之后,輸出的中間結果如下:
1003,0????kaka
1004,0????da
1005,0????jue
1006,0????zhao
1003,1????201001????abc
1003,1????201004????jkl
1004,1????201005????mon
1005,1????201002????def
1005,1????201006????pqr
1006,1????201003????ghi
?
五、分區和分組
1 、 map 之后的輸出會進行一些分區的操作,代碼貼出來:
public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
????????@Override
????????public int getPartition(TextPair key, Text value, int numParititon) {
????????????return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
????????}
????}
分區我在以前的文檔中寫過,這里不做描述了,就說是按照 map 輸出的符合 key 的第一個字段做分區關鍵字。分區之后,相同 key 會劃分到一個 reduce 中去處理(如果 reduce 設置是 1 ,那么就是分區有多個,但是還是在一個 reduce 中處理。但是結果會按照分區的原則排序)。分區后結果大致如下:
?
同一區:
1003,0????kaka
1003,1????201001????abc
1003,1????201004????jkl
?
?
同一區:
1004,0????da
1004,1????201005????mon
?
?
同一區:
1005,0????jue
1005,1????201002????def
1005,1????201006????pqr
?
?
同一區:
1006,0????zhao
1006,1????201003????ghi
?
2 、分組操作,代碼如下
?
public static class Example_Join_01_Comparator extends WritableComparator {
?
????????public Example_Join_01_Comparator() {
????????????super(TextPair.class, true);
????????}
?
????????@SuppressWarnings("unchecked")
????????public int compare(WritableComparable a, WritableComparable b) {
????????????TextPair t1 = (TextPair) a;
????????????TextPair t2 = (TextPair) b;
????????????return t1.getFirst().compareTo(t2.getFirst());
????????}
????}
?
分組操作就是把在相同分區的數據按照指定的規則進行分組的操作,就以上來看,是按照復合 key 的第一個字段做分組原則,達到忽略復合 key 的第二個字段值的目的,從而讓數據能夠迭代在一個 reduce 中。輸出后結果如下:
?
同一組:
1003,0????kaka
1003,0????201001????abc
1003,0????201004????jkl
?
同一組:
1004,0????da
1004,0????201005????mon
?
同一組:
1005,0????jue
1005,0????201002????def
1005,0????201006????pqr
?
同一組:
1006,0????zhao
1006,0????201003????ghi
?
六、 reduce 操作
貼上代碼如下:
public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
????????protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,
????????????????InterruptedException {
????????????Text pid = key.getFirst();
????????????String desc = values.iterator().next().toString();
????????????while (values.iterator().hasNext()) {
????????????????context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));
????????????}
????????}
????}
1 、代碼比較簡單,首先獲取關鍵的 ID 值,就是 key 的第一個字段。
2 、獲取公用的字段,通過排組織后可以看到,一些共有字段是在第一位,取出來即可。
3 、遍歷余下的結果,輸出。
七、其他的支撐代碼
1 、首先是 TextPair 代碼,沒有什么可以細說的,貼出來:
public class TextPair implements WritableComparable<TextPair> {
????private Text first;
????private Text second;
?
????public TextPair() {
????????set(new Text(), new Text());
????}
?
????public TextPair(String first, String second) {
????????set(new Text(first), new Text(second));
????}
?
????public TextPair(Text first, Text second) {
????????set(first, second);
????}
?
????public void set(Text first, Text second) {
????????this.first = first;
????????this.second = second;
????}
?
????public Text getFirst() {
????????return first;
????}
?
????public Text getSecond() {
????????return second;
????}
?
????public void write(DataOutput out) throws IOException {
????????first.write(out);
????????second.write(out);
????}
?
????public void readFields(DataInput in) throws IOException {
????????first.readFields(in);
????????second.readFields(in);
????}
?
????public int compareTo(TextPair tp) {
????????int cmp = first.compareTo(tp.first);
????????if (cmp != 0) {
????????????return cmp;
????????}
????????return second.compareTo(tp.second);
????}
}
2 、 Job 的入口函數
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
????????Configuration conf = new Configuration();
????????GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
????????String[] otherArgs = parser.getRemainingArgs();
????????if (agrs.length < 3) {
????????????System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");
????????????System.exit(2);
????????}
?
????????//conf.set("hadoop.job.ugi", "root,hadoop");
?
????????Job job = new Job(conf, "Example_Join_01");
????????// 設置運行的job
????????job.setJarByClass(Example_Join_01.class);
????????// 設置Map相關內容
????????job.setMapperClass(Example_Join_01_Mapper.class);
????????// 設置Map的輸出
????????job.setMapOutputKeyClass(TextPair.class);
????????job.setMapOutputValueClass(Text.class);
????????// 設置partition
????????job.setPartitionerClass(Example_Join_01_Partitioner.class);
????????// 在分區之后按照指定的條件分組
????????job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
????????// 設置reduce
????????job.setReducerClass(Example_Join_01_Reduce.class);
????????// 設置reduce的輸出
????????job.setOutputKeyClass(Text.class);
????????job.setOutputValueClass(Text.class);
????????// 設置輸入和輸出的目錄
????????FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
????????FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
????????FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
????????// 執行,直到結束就退出
????????System.exit(job.waitForCompletion(true) ? 0 : 1);
?
????}
?
八、總結
1 、這是個簡單的 join 查詢,可以看到,我在處理輸入源的時候是在 map 端做來源判斷。其實在 0.19 可以用 MultipleInputs.addInputPath() 的方法,但是它用了 JobConf 做參數。這個方法原理是多個數據源就采用多個 map 來處理。方法各有優劣。
2 、對于資源表,如果我們采用 0 和 1 這樣的模式來區分,資源表是需要放在前的。例如本例中 info.txt 就是資源表,所以標識位就是 0. 如果寫為 1 的話,可以試下,在分組之后,資源表對應的值放在了迭代器最后一位,無法追加在最后所有的結果集合中。
3 、關于分區,并不是所有的 map 都結束才開始的,一部分數據完成就會開始執行。同樣,分組操作在一個分區內執行,如果分區完成,分組將會開始執行,也不是等所有分區完成才開始做分組的操作。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
