Pig into Cassandra - 使用 python UDF 和 CqlStorage 传递列表对象

Posted

技术标签:

【中文标题】Pig into Cassandra - 使用 python UDF 和 CqlStorage 传递列表对象【英文标题】:Pig into Cassandra - pass list objects using python UDF and CqlStorage 【发布时间】:2014-01-06 15:49:57 【问题描述】:

我正在处理一个数据流,包括 Pig 中的一些聚合步骤以及将步骤存储到 Cassandra 中。我已经能够传递相对简单的数据类型,例如整数、长整数或日期,但找不到如何使用 CqlStorage 将某种列表、集合或元组从 Pig 传递到 Cassandra。

我使用的是 Pig 0.9.2,所以我不能使用 FLATTEN 方法。

问题

如何填充包含 Pig 0.9.2 中的集合或列表等复杂数据类型的 Cassandra 表?

我的具体应用概述:

我根据描述创建了相应的 Cassandra 表:

CREATE TABLE mycassandracf (
my_id int,
date timestamp,
my_count bigint,
grouped_ids list<bigint>,
PRIMARY KEY (my_id, date)); 

以及带有预准备语句的 STORE 指令:

STORE CassandraAggregate
INTO 'cql://test/mycassandracf?output_query=UPDATE+test.mycassandracf+set+my_count+%3D+%3F%2C+grouped_ids+%3D+%3F'
USING CqlStorage;

从“GROUP BY”关系中,我以 cql 友好格式(例如元组)“生成”一个关系,并将其存储到 Cassandra 中。

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), $1.grouped_id);

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(60128490006325819),(62726281032786005)))
(((my_id,30165),(date,1357084800000)),(1,(60128411174143024)))
(((my_id,30376),(date,1357084800000)),(4,(60128411146211875),(63645100121476995),(60128411146211875),(63645100121476995)))

不出所料,对这种关系使用 STORE 指令会引发异常:

java.lang.ClassCastException: org.apache.pig.data.DefaultDataBag 无法转换为 org.apache.pig.data.DataByteArray

因此,我添加了一个用 python 编写的 UDF,以在 grouped_id 包上应用一些扁平化:

@outputSchema("flat_bag:bag")
def flattenBag(bag):
    return tuple([long(item) for tup in bag for item in tup])

我使用元组是因为使用 python 集和 python 列表最终会出现转换错误。

将它添加到我的管道中,我有:

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(60128411146211875,63645100121476995,6012841114621187563645100121476995)))

对最后一个关系使用 STORE 指令会引发带有错误堆栈的异常:

java.io.IOException: java.io.IOException: org.apache.thrift.transport.TTransportException
at     org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:428)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:408)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:262)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:260)
Caused by: java.io.IOException: org.apache.thrift.transport.TTransportException
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:248)
Caused by: org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TiostreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.cassandra.thrift.Cassandra$Client.recv_execute_prepared_cql3_query(Cassandra.java:1724)
at org.apache.cassandra.thrift.Cassandra$Client.execute_prepared_cql3_query(Cassandra.java:1709)
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:232)

我用简单的数据类型测试了完全相同的工作流程,并且运行良好。我真正在寻找的是用复杂类型(例如 Pig 中的集合或列表)填充 cassandra 表的方法。

非常感谢

【问题讨论】:

【参考方案1】:

经过进一步调查,我在这里找到了解决方案:

https://issues.apache.org/jira/browse/CASSANDRA-5867

基本上,CqlStorage 支持复杂类型。为此,该类型应由元组中的一个元组表示,将数据类型作为字符串作为第一个元素。对于列表,这是这样做的:

# python
@outputSchema("flat_bag:bag")
def flattenBag(bag):
    return ('list',) + tuple([long(item) for tup in bag for item in tup])

因此,咕哝着:

# pig
CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(list, 60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411146211875,63645100121476995,6012841114621187563645100121476995)))

然后使用经典的编码准备语句将其存储到 cassandra。

希望这会有所帮助。

【讨论】:

我使用的是 DSE3.2.4,其中是 Pig0.9.2,我在 pig 中没有 ISOToUnix()。那么将日期加载到cassandra的最佳方法是什么?我的日期格式是'yyyy/MM/dd' @sudheer ISOToUnix 实际上是来自 piggybank 的 UDF,而不是内置方法。您需要添加以下内容才能访问该方法:REGISTER /path-to-your-jar/piggybank.jar; DEFINE ISOToUnix org.apache.pig.piggybank.evaluation.datetime.convert.ISOToUnix(); 抱歉,不清楚。 对不起,我找不到 Piggybank.jar,这就是我在上一条消息中试图传达的内容。我正在使用 DSE3.2.4,通常我在 Cloudera 的 contrib 文件夹中找到它,是我在安装过程中遗漏了什么还是 Datastax 没有提供存钱罐? 我不知道 DSE 将它的 pig lib 文件夹放在哪里,但如果你能找到它,请在以下位置查找 jar:/path-to-dse-pig-libs/contrib/ piggybank/java/piggybank.jar 我使用手动安装 hadoop/pig 并且可以在以下位置找到我的猪库:/usr/local/lib/pig-*

以上是关于Pig into Cassandra - 使用 python UDF 和 CqlStorage 传递列表对象的主要内容,如果未能解决你的问题,请参考以下文章

使用 pig 将小数导入 cassandra

将 PIG 与 cassandra 一起使用的限制

在 Pig 中生成 Cassandra 友好关系

如何使用 Pig 从 Cassandra 加载 CF/TABLE

当我尝试运行 pig + cassandra 时出现错误?请帮助

Pig Cassandra 使用 oozie 处理非常缓慢(心跳)