一种从Mysql读取表数据到Pig的方法

Posted

技术标签:

【中文标题】一种从Mysql读取表数据到Pig的方法【英文标题】:A way to read table data from Mysql to Pig 【发布时间】:2012-06-08 03:30:19 【问题描述】:

大家都知道小猪支持DBStorage, 但它们只支持从 Pig 到 mysql 的加载结果

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

但是请告诉我如何从 mysql 中读取表格

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

这是我的代码

public class DBLoader extends LoadFunc 
    private final Log log = LogFactory.getLog(getClass());
    private ArrayList mProtoTuple = null;
    private Connection con;
    private String jdbcURL;
    private String user;
    private String pass;
    private int batchSize;
    private int count = 0;
    private String query;
    ResultSet result;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() 
    

    public DBLoader(String driver, String jdbcURL, String user, String pass,
            String query) 

        try 
            Class.forName(driver);
         catch (ClassNotFoundException e) 
            log.error("can't load DB driver:" + driver, e);
            throw new RuntimeException("Can't load DB Driver", e);
        
        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    

    @Override
    public InputFormat getInputFormat() throws IOException 
        // TODO Auto-generated method stub
        return new TextInputFormat();
    

    @Override
    public Tuple getNext() throws IOException 
        // TODO Auto-generated method stub
        boolean next = false;

        try 
            next = result.next();
         catch (SQLException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        

        if (!next)
            return null;
        int numColumns = 0;
        // Get result set meta data
        ResultSetMetaData rsmd;
        try 
            rsmd = result.getMetaData();
            numColumns = rsmd.getColumnCount();
         catch (SQLException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        

        for (int i = 0; i < numColumns; i++) 

            try 
                Object field = result.getObject(i);

                switch (DataType.findType(field)) 
                case DataType.NULL:

                    mProtoTuple.add(null);

                    break;

                case DataType.BOOLEAN:
                    mProtoTuple.add((Boolean) field);

                    break;

                case DataType.INTEGER:
                    mProtoTuple.add((Integer) field);

                    break;

                case DataType.LONG:
                    mProtoTuple.add((Long) field);

                    break;

                case DataType.FLOAT:
                    mProtoTuple.add((Float) field);

                    break;

                case DataType.DOUBLE:
                    mProtoTuple.add((Double) field);

                    break;

                case DataType.BYTEARRAY:
                    byte[] b = ((DataByteArray) field).get();
                    mProtoTuple.add(b);

                    break;
                case DataType.CHARARRAY:
                    mProtoTuple.add((String) field);

                    break;
                case DataType.BYTE:
                    mProtoTuple.add((Byte) field);

                    break;

                case DataType.MAP:
                case DataType.TUPLE:
                case DataType.BAG:
                    throw new RuntimeException("Cannot store a non-flat tuple "
                            + "using DbStorage");

                default:
                    throw new RuntimeException("Unknown datatype "
                            + DataType.findType(field));

                

             catch (Exception ee) 
                throw new RuntimeException(ee);
            
        

        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1)
            throws IOException 

        con = null;
        if (query == null) 
            throw new IOException("SQL Insert command not specified");
        
        try 
            if (user == null || pass == null) 
                con = DriverManager.getConnection(jdbcURL);
             else 
                con = DriverManager.getConnection(jdbcURL, user, pass);
            
            con.setAutoCommit(false);
            result = con.createStatement().executeQuery(query);
         catch (SQLException e) 
            log.error("Unable to connect to JDBC @" + jdbcURL);
            throw new IOException("JDBC Error", e);
        
        count = 0;
    

    @Override
    public void setLocation(String location, Job job) throws IOException 
        // TODO Auto-generated method stub

        //TextInputFormat.setInputPaths(job, location);

    

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>

        @Override
        public RecordReader<NullWritable, NullWritable> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                InterruptedException 
            // TODO Auto-generated method stub
            return null;
        

        @Override
        public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                InterruptedException 
            // TODO Auto-generated method stub
            return null;
        

    


我多次尝试写UDF但没有成功.....

【问题讨论】:

【参考方案1】:

如你所说,DBStorage 仅支持将结果保存到数据库。

要从 MySQL 加载数据,您可以查看名为 sqoop 的项目(将数据从数据库复制到 HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS。这两种方式都需要一些交互,并且不能在 Pig 内部直接使用。

第三种选择是考虑编写 Pig LoadFunc(您说您尝试编写 UDF)。这应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您也可以使用一些结果集元数据检查来自动生成模式。

【讨论】:

嗨,谢谢。但正如我之前提到的,我只想从 Pig 内部直接加载使用。正如您所说,我发布了从 LoadFunc 扩展的代码。当我从 big 运行代码时,它总是抛出异常。 那么请添加您看到的异常 @phuongdo 您是否成功编写了用于从 mysql 加载数据的 Pig LoadFunc?

以上是关于一种从Mysql读取表数据到Pig的方法的主要内容,如果未能解决你的问题,请参考以下文章

一种从一个表中选择字段并插入到另一个表的方法?

使用 PIG 将数据写入 HIVE 外部表

hash连接

Pig - 读取存储为 Avro 的 Hive 表

将 JSON 格式表加载到 Pig 中

在单个实例中将 pig 输出存储到 Hive 表中