通过 Spark SQL 查询 Cassandra UDT

Posted

技术标签:

【中文标题】通过 Spark SQL 查询 Cassandra UDT【英文标题】:Query Cassandra UDT via Spark SQL 【发布时间】:2019-05-13 15:44:21 【问题描述】:

我们想通过 SparkSQL 从 Cassandra DB 查询数据。问题是数据作为 UDT 存储在 cassandra 中。 UDT 的结构嵌​​套很深,它包含可变长度的数组,因此很难将数据分解为平面结构。 我找不到任何工作示例如何通过 SparkSQL 查询此类 UDT - 特别是根据 UDT 值过滤结果。

或者,您能否建议不同的 ETL 管道(查询引擎、存储引擎等),哪个更适合我们的用例?

我们的 ETL 管道:

Kafka(重复事件)-> Spark 流式传输-> Cassandra(去重以仅存储最新事件)

到目前为止我们尝试过的解决方案:

1) Kafka -> Spark -> Parquet

一切正常,我们可以查询和过滤数组和嵌套数据结构。

问题:无法删除重复数据(使用最新事件重写 parquet 文件)

2) Kafka -> Spark -> Cassandra

通过重复数据删除解决了问题 1)。

问题:Presto 不支持 UDT 类型(presto doc、presto issue)

我们的主要要求是:

支持重复数据删除。我们可能会收到许多具有相同 ID 的事件,我们只需要存储最新的一个。 用数组存储深度嵌套的数据结构 分布式存储,可扩展以适应未来扩展 具有类似 SQL 查询支持的分布式查询引擎(用于与 Zeppelin、Tableau、Qlik 等连接)。查询不必实时运行,几分钟的延迟是可以接受的。 支持模式演变(AVRO 风格)

感谢您的任何建议

【问题讨论】:

【参考方案1】:

您可以只使用点语法对嵌套元素执行查询。例如,如果我有以下 CQL 定义:

cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, id: 1, t1: id: 1, t: 't1');
cqlsh:test> insert into nudt (id, t2) values (2, id: 2, t1: id: 2, t: 't2');
cqlsh:test> SELECT * from nudt;

 id | t2
----+-------------------------------
  1 | id: 1, t1: id: 1, t: 't1'
  2 | id: 2, t1: id: 2, t: 't2'

(2 rows)

然后我可以按如下方式加载该数据:

scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
     options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
|  2|[2,[2,t2]]|
+---+----------+

然后查询数据只选择UDT中字段的特定值:

scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> res.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
+---+----------+

您可以使用spark.sql 或相应的.filter 函数 - 取决于您的编程风格。这种技术适用于任何结构类型的数据,来自不同的来源,如 JSON 等。

但请注意,您不会像通过分区键/集群列查询时那样从 Cassandra 连接器获得优化

【讨论】:

您好 Alex,感谢您的回答,这正是我想要的。当我启动 Spark Thrift Server 时,我什至可以通过 JDBC(与分析工具连接)进行查询。我很担心你关于性能的最后一句话。当我在选择中包含分区键(或分区键列表)时,我希望 Spark 将这些“下推”到 Cassandra ? 是的,如果您将分区键和/或集群列包含在条件中,那么连接器会将过滤器下推到 Cassandra 中,并且 UDT 的过滤将在缩减的数据集上执行,这与 UDT 上的过滤相反只有这样才需要读取所有数据并在 Spark 级别执行过滤。 @AlexOtt 如果我执行“选择 t2.t1.id”并且我想从那里创建数据框,这个示例似乎可以工作。有什么解决方法 这段代码是在真机上执行的。你能把它作为一个单独的问题发布,你得到的错误,实际代码等。

以上是关于通过 Spark SQL 查询 Cassandra UDT的主要内容,如果未能解决你的问题,请参考以下文章

用于 Cassandra 的 Spark2 会话,sql 查询

Spark SQL下推Cassandra UDF?

将 Spark SQL Hive 服务器连接到 Cassandra?

SPARK SQL 和 Cassandra 之间的时区不匹配

Spark SQL cassandra 删除记录

14.4 Spark-SQL基于Cassandra数据分析编程实例