一种从Mysql读取表数据到Pig的方法
Posted
技术标签:
【中文标题】一种从Mysql读取表数据到Pig的方法【英文标题】:A way to read table data from Mysql to Pig 【发布时间】:2012-06-08 03:30:19 【问题描述】:大家都知道小猪支持DBStorage, 但它们只支持从 Pig 到 mysql 的加载结果
STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');
但是请告诉我如何从 mysql 中读取表格
data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');
这是我的代码
public class DBLoader extends LoadFunc
private final Log log = LogFactory.getLog(getClass());
private ArrayList mProtoTuple = null;
private Connection con;
private String jdbcURL;
private String user;
private String pass;
private int batchSize;
private int count = 0;
private String query;
ResultSet result;
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
public DBLoader()
public DBLoader(String driver, String jdbcURL, String user, String pass,
String query)
try
Class.forName(driver);
catch (ClassNotFoundException e)
log.error("can't load DB driver:" + driver, e);
throw new RuntimeException("Can't load DB Driver", e);
this.jdbcURL = jdbcURL;
this.user = user;
this.pass = pass;
this.query = query;
@Override
public InputFormat getInputFormat() throws IOException
// TODO Auto-generated method stub
return new TextInputFormat();
@Override
public Tuple getNext() throws IOException
// TODO Auto-generated method stub
boolean next = false;
try
next = result.next();
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
if (!next)
return null;
int numColumns = 0;
// Get result set meta data
ResultSetMetaData rsmd;
try
rsmd = result.getMetaData();
numColumns = rsmd.getColumnCount();
catch (SQLException e)
// TODO Auto-generated catch block
e.printStackTrace();
for (int i = 0; i < numColumns; i++)
try
Object field = result.getObject(i);
switch (DataType.findType(field))
case DataType.NULL:
mProtoTuple.add(null);
break;
case DataType.BOOLEAN:
mProtoTuple.add((Boolean) field);
break;
case DataType.INTEGER:
mProtoTuple.add((Integer) field);
break;
case DataType.LONG:
mProtoTuple.add((Long) field);
break;
case DataType.FLOAT:
mProtoTuple.add((Float) field);
break;
case DataType.DOUBLE:
mProtoTuple.add((Double) field);
break;
case DataType.BYTEARRAY:
byte[] b = ((DataByteArray) field).get();
mProtoTuple.add(b);
break;
case DataType.CHARARRAY:
mProtoTuple.add((String) field);
break;
case DataType.BYTE:
mProtoTuple.add((Byte) field);
break;
case DataType.MAP:
case DataType.TUPLE:
case DataType.BAG:
throw new RuntimeException("Cannot store a non-flat tuple "
+ "using DbStorage");
default:
throw new RuntimeException("Unknown datatype "
+ DataType.findType(field));
catch (Exception ee)
throw new RuntimeException(ee);
Tuple t = mTupleFactory.newTuple(mProtoTuple);
mProtoTuple.clear();
return t;
@Override
public void prepareToRead(RecordReader arg0, PigSplit arg1)
throws IOException
con = null;
if (query == null)
throw new IOException("SQL Insert command not specified");
try
if (user == null || pass == null)
con = DriverManager.getConnection(jdbcURL);
else
con = DriverManager.getConnection(jdbcURL, user, pass);
con.setAutoCommit(false);
result = con.createStatement().executeQuery(query);
catch (SQLException e)
log.error("Unable to connect to JDBC @" + jdbcURL);
throw new IOException("JDBC Error", e);
count = 0;
@Override
public void setLocation(String location, Job job) throws IOException
// TODO Auto-generated method stub
//TextInputFormat.setInputPaths(job, location);
class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>
@Override
public RecordReader<NullWritable, NullWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException
// TODO Auto-generated method stub
return null;
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException
// TODO Auto-generated method stub
return null;
我多次尝试写UDF但没有成功.....
【问题讨论】:
【参考方案1】:如你所说,DBStorage
仅支持将结果保存到数据库。
要从 MySQL 加载数据,您可以查看名为 sqoop 的项目(将数据从数据库复制到 HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS。这两种方式都需要一些交互,并且不能在 Pig 内部直接使用。
第三种选择是考虑编写 Pig LoadFunc(您说您尝试编写 UDF)。这应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您也可以使用一些结果集元数据检查来自动生成模式。
【讨论】:
嗨,谢谢。但正如我之前提到的,我只想从 Pig 内部直接加载使用。正如您所说,我发布了从 LoadFunc 扩展的代码。当我从 big 运行代码时,它总是抛出异常。 那么请添加您看到的异常 @phuongdo 您是否成功编写了用于从 mysql 加载数据的 Pig LoadFunc?以上是关于一种从Mysql读取表数据到Pig的方法的主要内容,如果未能解决你的问题,请参考以下文章