大淘金|DataJoin多数据源的Reduce端链接实现方法

更新时间:2019-08-08    来源:js教程    手机版     字体:

【www.bbyears.com--js教程】

为了完成不同数据源的链接,首先,需要为不同数据源下的每个记录定义一个数据源标签(Tag),接着,为了表示每个数据源下的不同记录并且完成连接处理,需要为每个数据记录设置一个主键(GroupKey),然后,DataJoin类库分别在Map阶段和Reduce阶段提供一个处理框架,仅仅留下一些任务有程序员完成。下面是处理过程:

DataJoin多数据源的Reduce端链接实现方法

DataJoin多数据源的Reduce端链接实现方法

从上述过程可以看到,多数据源的数据会首先被处理成多个数据记录,这些记录是带有标签Tag和主键Group Key的记录,因此使用DataJoin时,我们需要实现generateInputTag(String inputFile)方法和generateTaggedMapOutput(Object value)和generateGroupKey(TaggedMapOutput aRecord)方法,在这个过程中,出现了一个新的类(即带有标签的记录类),因此我们也要实现自定义的记录类。在combine过程中,我们会对笛卡尔积的结果进行整合(这也是为何我们把DataJoin叫做Reduce端连接),因此我们需要实现一个combine(Object[] tags,Object[] values)方法,注意这个combine和MapReduce框架中的combine是两个完全不同的东西,忌混淆。

 

 

 代码如下 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class DataJoin {
    public static class DataJoinMapper extends DataJoinMapperBase {
        public Text generateInputTag(String inputFiles) {
            return new Text(inputFiles); 
        }
        public Text generateGroupKey(TaggedMapOutput aRecord) {
            return new Text(((Text)aRecord.getData()).toString().split(",")[0]);
        }
        public TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable ret = new TaggedWritable((Text)value); 
            ret.setTag(this.inputTag);
            return ret;
        }
    }
    public static class TaggedWritable extends TaggedMapOutput {
        private Writable data;
        public TaggedWritable() {
            this.tag = new Text("");
            this.data = new Text("");
        }
       
        public TaggedWritable(Writable data) {
            this.tag = new Text("");
            this.data = data;
        }
       
        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            this.data.readFields(in);
            this.tag.readFields(in);
        }

        public Writable getData() {
            return data;
        }
        public void setData(Writable data){
            this.data=data;
        }
    }
   
    public static class DataJoinReducer extends DataJoinReducerBase {

        @Override
        public TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) {
                return null;
            }
            StringBuffer joinedStr = new StringBuffer("");
            for (int i = 0; i < values.length; i++) {
                TaggedWritable tw=(TaggedWritable)values[i];
                String str=((Text)tw.getData()).toString();   
                if(i==0)
                    joinedStr.append(str);
                else
                    joinedStr.append(str.split(",",2)[1]);
                if(i                     joinedStr.append(",");
            }
            TaggedWritable ret = new TaggedWritable(new Text(joinedStr.toString()));
            ret.setTag((Text)tags[0]);
            return ret;
        }
    }
   
    public static void main(String[] args) throws Exception {
       
            Configuration conf = new Configuration();
            JobConf job = new JobConf(conf);
            job.setJarByClass(DataJoin.class);
       
            Path in = new Path(args[0]);
            FileInputFormat.addInputPath(job, in);
           
            Path out = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, out);
           
            job.setMapperClass(DataJoinMapper.class);
            job.setReducerClass(DataJoinReducer.class);
           
            job.setInputFormat(TextInputFormat.class);
            job.setOutputFormat(TextOutputFormat.class);
           
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(TaggedWritable.class);
            //设置输出文本中key与value之间的符号,默认为制表符Tab
            job.set("mapred.textoutputformat.separator","=");
           
            JobClient.runJob(job);
        }   
}
 

 

本文来源:http://www.bbyears.com/wangyezhizuo/60775.html

热门标签

更多>>

本类排行