通过 Spark SQL 查询 Cassandra UDT
Posted
技术标签:
【中文标题】通过 Spark SQL 查询 Cassandra UDT【英文标题】:Query Cassandra UDT via Spark SQL 【发布时间】:2019-05-13 15:44:21 【问题描述】:我们想通过 SparkSQL 从 Cassandra DB 查询数据。问题是数据作为 UDT 存储在 cassandra 中。 UDT 的结构嵌套很深,它包含可变长度的数组,因此很难将数据分解为平面结构。 我找不到任何工作示例如何通过 SparkSQL 查询此类 UDT - 特别是根据 UDT 值过滤结果。
或者,您能否建议不同的 ETL 管道(查询引擎、存储引擎等),哪个更适合我们的用例?
我们的 ETL 管道:
Kafka(重复事件)-> Spark 流式传输-> Cassandra(去重以仅存储最新事件)
到目前为止我们尝试过的解决方案:
1) Kafka -> Spark -> Parquet
一切正常,我们可以查询和过滤数组和嵌套数据结构。
问题:无法删除重复数据(使用最新事件重写 parquet 文件)
2) Kafka -> Spark -> Cassandra
通过重复数据删除解决了问题 1)。
问题:Presto 不支持 UDT 类型(presto doc、presto issue)
我们的主要要求是:
支持重复数据删除。我们可能会收到许多具有相同 ID 的事件,我们只需要存储最新的一个。 用数组存储深度嵌套的数据结构 分布式存储,可扩展以适应未来扩展 具有类似 SQL 查询支持的分布式查询引擎(用于与 Zeppelin、Tableau、Qlik 等连接)。查询不必实时运行,几分钟的延迟是可以接受的。 支持模式演变(AVRO 风格)感谢您的任何建议
【问题讨论】:
【参考方案1】:您可以只使用点语法对嵌套元素执行查询。例如,如果我有以下 CQL 定义:
cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, id: 1, t1: id: 1, t: 't1');
cqlsh:test> insert into nudt (id, t2) values (2, id: 2, t1: id: 2, t: 't2');
cqlsh:test> SELECT * from nudt;
id | t2
----+-------------------------------
1 | id: 1, t1: id: 1, t: 't1'
2 | id: 2, t1: id: 2, t: 't2'
(2 rows)
然后我可以按如下方式加载该数据:
scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
| 2|[2,[2,t2]]|
+---+----------+
然后查询数据只选择UDT中字段的特定值:
scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> res.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
+---+----------+
您可以使用spark.sql
或相应的.filter
函数 - 取决于您的编程风格。这种技术适用于任何结构类型的数据,来自不同的来源,如 JSON 等。
但请注意,您不会像通过分区键/集群列查询时那样从 Cassandra 连接器获得优化
【讨论】:
您好 Alex,感谢您的回答,这正是我想要的。当我启动 Spark Thrift Server 时,我什至可以通过 JDBC(与分析工具连接)进行查询。我很担心你关于性能的最后一句话。当我在选择中包含分区键(或分区键列表)时,我希望 Spark 将这些“下推”到 Cassandra ? 是的,如果您将分区键和/或集群列包含在条件中,那么连接器会将过滤器下推到 Cassandra 中,并且 UDT 的过滤将在缩减的数据集上执行,这与 UDT 上的过滤相反只有这样才需要读取所有数据并在 Spark 级别执行过滤。 @AlexOtt 如果我执行“选择 t2.t1.id”并且我想从那里创建数据框,这个示例似乎可以工作。有什么解决方法 这段代码是在真机上执行的。你能把它作为一个单独的问题发布,你得到的错误,实际代码等。以上是关于通过 Spark SQL 查询 Cassandra UDT的主要内容,如果未能解决你的问题,请参考以下文章
用于 Cassandra 的 Spark2 会话,sql 查询
将 Spark SQL Hive 服务器连接到 Cassandra?