使用mapreduce将mssql数据类型_使用mapreduce将MSSQL数据导到HDFS实例

更新时间:2020-03-25    来源:Mssql    手机版     字体:

【www.bbyears.com--Mssql】

今天写了一下MapReduce程序从MSSQL SERVER2008数据库里取数据分析。程序发布到hadoop机器上运行报SQLEXCEPTION错误

奇怪了,我的SQL语句中没有LIMIT,这LIMIT哪来的。我翻看了DBInputFormat类的源码,

protected RecordReader createDBRecordReader(DBInputSplit split,

      Configuration conf) throws IOException {

 

    @SuppressWarnings("unchecked")

    Class inputClass = (Class) (dbConf.getInputClass());

    try {

      // use database product name to determine appropriate record reader.

      if (dbProductName.startsWith("ORACLE")) {

        // use Oracle-specific db reader.

        return new OracleDBRecordReader(split, inputClass,

            conf, createConnection(), getDBConf(), conditions, fieldNames,

            tableName);

      } else if (dbProductName.startsWith("MYSQL")) {

        // use MySQL-specific db reader.

        return new MySQLDBRecordReader(split, inputClass,

            conf, createConnection(), getDBConf(), conditions, fieldNames,

            tableName);

      } else {

        // Generic reader.

        return new DBRecordReader(split, inputClass,

            conf, createConnection(), getDBConf(), conditions, fieldNames,

            tableName);

      }

    } catch (SQLException ex) {

      throw new IOException(ex.getMessage());

    }

  }

DBRecordReader的源码

protected String getSelectQuery() {

    StringBuilder query = new StringBuilder();

 

    // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.

    if(dbConf.getInputQuery() == null) {

      query.append("SELECT ");

 

      for (int i = 0; i < fieldNames.length; i++) {

        query.append(fieldNames[i]);

        if (i != fieldNames.length -1) {

          query.append(", ");

        }

      }

 

      query.append(" FROM ").append(tableName);

      query.append(" AS ").append(tableName); //in hsqldb this is necessary

      if (conditions != null && conditions.length() > 0) {

        query.append(" WHERE (").append(conditions).append(")");

      }

 

      String orderBy = dbConf.getInputOrderBy();

      if (orderBy != null && orderBy.length() > 0) {

        query.append(" ORDER BY ").append(orderBy);

      }

    } else {

      //PREBUILT QUERY

      query.append(dbConf.getInputQuery());

    }

        

    try {

      query.append(" LIMIT ").append(split.getLength()); //问题所在

      query.append(" OFFSET ").append(split.getStart());

    } catch (IOException ex) {

      // Ignore, will not throw.

    }

 

    return query.toString();

  }

终于找到原因了。

原来,hadoop只实现了Mysql的DBRecordReader(MySQLDBRecordReader)和ORACLE的DBRecordReader(OracleDBRecordReader)。

原因找到了,我参考着OracleDBRecordReader实现了MSSQL SERVER的DBRecordReader代码如下:

MSSQLDBInputFormat的代码:

/**
 *
 */
package org.apache.hadoop.mapreduce.lib.db;

import java.io.IOException;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;

/**
 * @author summer
 *  MICROSOFT SQL SERVER
 */
public class MSSQLDBInputFormat extends DBInputFormat {

    public static void setInput(Job job,
              Class inputClass,
              String inputQuery, String inputCountQuery,String rowId) {
            job.setInputFormatClass(MSSQLDBInputFormat.class);
            DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
            dbConf.setInputClass(inputClass);
            dbConf.setInputQuery(inputQuery);
            dbConf.setInputCountQuery(inputCountQuery);
            dbConf.setInputFieldNames(new String[]{rowId});
          }
    
    @Override
    protected RecordReader createDBRecordReader(
            org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split,
            Configuration conf) throws IOException {
        
         @SuppressWarnings("unchecked")
            Class inputClass = (Class) (dbConf.getInputClass());
            try {
             
                return new MSSQLDBRecordReader(split, inputClass,
                    conf, createConnection(), getDBConf(), conditions, fieldNames,
                    tableName);
            
            } catch (SQLException ex) {
              throw new IOException(ex.getMessage());
            }
        
        
    }

    
}

MSSQLDBRecordReader的代码:

/**
 *
 */
package org.apache.hadoop.mapreduce.lib.db;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;


/**
 * @author summer
 *
 */
public class MSSQLDBRecordReader extends DBRecordReader{

    public MSSQLDBRecordReader(DBInputFormat.DBInputSplit split,
              Class inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
              String cond, String [] fields, String table) throws SQLException {
        super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
        
    }

    @Override
    protected String getSelectQuery() {
         StringBuilder query = new StringBuilder();
            DBConfiguration dbConf = getDBConf();
            String conditions = getConditions();
            String tableName = getTableName();
            String [] fieldNames = getFieldNames();

            // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
            if(dbConf.getInputQuery() == null) {
              query.append("SELECT ");
          
              for (int i = 0; i < fieldNames.length; i++) {
                query.append(fieldNames[i]);
                if (i != fieldNames.length -1) {
                  query.append(", ");
                }
              }
          
              query.append(" FROM ").append(tableName);
              if (conditions != null && conditions.length() > 0)
                query.append(" WHERE ").append(conditions);
              String orderBy = dbConf.getInputOrderBy();
              if (orderBy != null && orderBy.length() > 0) {
                query.append(" ORDER BY ").append(orderBy);
              }
            } else {
              //PREBUILT QUERY
              query.append(dbConf.getInputQuery());
            }
                
            try {
              DBInputFormat.DBInputSplit split = getSplit();
              if (split.getLength() > 0){
                String querystring = query.toString();
                String id = fieldNames[0];
                query = new StringBuilder();
                query.append("SELECT TOP "+split.getLength()+"* FROM ( ");
                query.append(querystring);
                query.append(" ) a WHERE " + id +" NOT IN (SELECT TOP ").append(split.getEnd());
                query.append(" "+id +" FROM (");
                query.append(querystring);
                query.append(" ) b");
                query.append(" )");
                System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
                System.out.println(query.toString());
                System.out.println("----------------------MICROSOFT SQL SERVER QUERY STRING---------------------------");
              }
            } catch (IOException ex) {
              // ignore, will not throw.
            }              

            return query.toString();
    }
    
    

}

mapreduce的代码

/**
 *
 */
package com.nltk.sns.mapreduce;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.MSSQLDBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;






import com.nltk.sns.ETLUtils;

/**
 * @author summer
 *
 */
public class LawDataEtl {

    public static class CaseETLMapper extends
        Mapper{

        static final int step = 6;
        
        LongWritable key = new LongWritable(1);
        Text value = new Text();
        
        @Override
        protected void map(
                LongWritable key,
                LawCaseRecord lawCaseRecord,
                Mapper.Context context)
                throws IOException, InterruptedException {
            
            System.out.println("-----------------------------"+lawCaseRecord+"------------------------------");
            
            key.set(lawCaseRecord.id);
            String source = ETLUtils.format(lawCaseRecord.source);
            List words = ETLUtils.split(source, step);
            for(String w:words){
                value.set(w);
                context.write(key, value);
            }
        }
    }
                
    static final String driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
    static final String dbUrl = "jdbc:sqlserver://192.168.0.100:1433;DatabaseName=lawdb";
    static final String uid = "sa";
    static final String pwd = "cistjava";
    static final String inputQuery = "select sid,source from LawDB.dbo.case_source where sid<1000";
    static final String inputCountQuery = "select count(1) from LawDB.dbo.case_source where sid<1000";
    static final String jarClassPath = "/user/lib/sqljdbc4.jar";
    static final String outputPath = "hdfs://ubuntu:9000/user/lawdata";
    static final String rowId = "sid";
    
    public static Job configureJob(Configuration conf) throws Exception{
        
        String jobName = "etlcase";
        Job job =  Job.getInstance(conf, jobName);

        job.addFileToClassPath(new Path(jarClassPath));
        MSSQLDBInputFormat.setInput(job, LawCaseRecord.class, inputQuery, inputCountQuery,rowId);
        job.setJarByClass(LawDataEtl.class);
        
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(CaseETLMapper.class);
        
        return job;
    }
    
    public static void main(String[] args) throws Exception{
        
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        fs.delete(new Path(outputPath), true);
        
        DBConfiguration.configureDB(conf, driverClass, dbUrl, uid, pwd);
        conf.set(MRJobConfig.NUM_MAPS, String.valueOf(10));
        Job job = configureJob(conf);
        System.out.println("------------------------------------------------");
        System.out.println(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
        System.out.println(conf.get(DBConfiguration.URL_PROPERTY));
        System.out.println(conf.get(DBConfiguration.USERNAME_PROPERTY));
        System.out.println(conf.get(DBConfiguration.PASSWORD_PROPERTY));
        System.out.println("------------------------------------------------");
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
        
    }
}

辅助类的代码:

/**
 *
 */
package com.nltk.sns;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;





/**
 * @author summer
 *
 */
public class ETLUtils {

    public final static String NULL_CHAR = "";
    public final static String PUNCTUATION_REGEX = "[(\\pP)&&[^\\|\\{\\}\\#]]+";
    public final static String WHITESPACE_REGEX = "[\\p{Space}]+";
    
    public static String format(String s){
        
        return s.replaceAll(PUNCTUATION_REGEX, NULL_CHAR).replaceAll(WHITESPACE_REGEX, NULL_CHAR);
    }
    
    public static List split(String s,int stepN){
        
        List splits = new ArrayList();
        if(StringUtils.isEmpty(s) || stepN<1)
            return splits;
        int len = s.length();
        if(len<=stepN)
            splits.add(s);
        else{
            for(int j=1;j<=stepN;j++)
                for(int i=0;i<=len-j;i++){
                    String key = StringUtils.mid(s, i,j);
                    if(StringUtils.isEmpty(key))
                        continue;
                    splits.add(key);
                }
        }
        return splits;
        
    }
    
    public static void main(String[] args){
        
        String s="谢婷婷等与姜波等";
        int stepN = 2;
        List splits = split(s,stepN);
        System.out.println(splits);
    }
}

运行成功了


代码初略的实现,主要是为了满足我的需求,大家可以根据自己的需要进行修改。

实际上DBRecordReader作者实现的并不好,我们来看DBRecordReader、MySQLDBRecordReader和OracleDBRecordReader源码,DBRecordReader和MySQLDBRecordReader耦合度太高。一般而言,就是对于没有具体实现的数据库DBRecordReader也应该做到运行不报异常,无非就是采用单一的SPLIT和单一的MAP。




使用MapReduce将HDFS数据导入到HBase

package com.bank.service;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 使用MapReduce批量导入Hbase(没有Reduce函数的MapReduce)
 * @author mengyao
 *
 */
public class DataImportToHbase extends Configured implements Tool {

    static class DataImportToHbaseMapper extends Mapper {
        private static String familyName = "info";
        private static String[] qualifiers = {"gzh", "currency", "version", "valuta", "qfTime", "flag", "machineID"};
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] values = line.split("\t");
            if (values.length == 7 && values.length == qualifiers.length) {
                String row = values[0]+"_"+values[1]+"_"+values[2]+"_"+values[3];
                long timestamp = System.currentTimeMillis();
                ImmutableBytesWritable immutable = new ImmutableBytesWritable(Bytes.toBytes(row));
                Put put = new Put(Bytes.toBytes(row));
                for (int i = 0; i < values.length; i++) {
                    String qualifier = qualifiers[i];
                    String val = values[i];
                    put.add(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), timestamp, Bytes.toBytes(val));
                }
                context.write(immutable, put);
            } else {
                System.err.println(" ERROR: value length must equale qualifier length ");
            }
        }
    }

    @Override
    public int run(String[] arg0) throws Exception {
        Job job = Job.getInstance(getConf(), DataImportToHbase.class.getSimpleName());
        job.setJarByClass(DataImportToHbase.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(arg0[0]));
        
        job.setMapperClass(DataImportToHbaseMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
        TableMapReduceUtil.initTableReducerJob(arg0[1], null, job);        
        job.setNumReduceTasks(0);
        TableMapReduceUtil.addDependencyJars(job);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("dfs.socket.timeout", "3600000");
        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" ERROR: ");
            System.exit(2);
        }
        int status = ToolRunner.run(conf, new DataImportToHbase(), otherArgs);
        System.exit(status);
    }
}

本文来源:http://www.bbyears.com/shujuku/89465.html

热门标签

更多>>

本类排行