如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?
Posted
技术标签:
【中文标题】如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?【英文标题】:How to get Cassandra cql string given a Apache Spark Dataframe in 2.2.0? 【发布时间】:2018-01-25 12:25:29 【问题描述】:我正在尝试获取给定 Dataframe 的 cql 字符串。我遇到了这个function
我可以在哪里做这样的事情
TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()
在我看来,该库使用第一列作为分区键并且不关心集群键,那么我如何指定使用 Dataframe 的特定列集作为 PartitionKey 和 ParticularSet 列作为集群键?
看起来我可以创建一个新的 TableDef,但是我必须自己完成整个映射,并且在某些情况下,Java 中无法访问像 ColumnType 这样的必要函数。例如,我尝试创建一个新的 ColumnDef,如下所示
new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)
目标:从 Spark DataFrame 中获取 CQL 创建语句。
输入 我的数据框可以有任意数量的列以及它们各自的 Spark 类型。所以说我有一个包含 100 列的 Spark 数据框,其中我的数据框的 col8、col9 对应于 cassandra partitionKey 列,我的 column10 对应于 cassandra 聚类键列
col1| col2| ...|col100
现在我想使用 spark-cassandra-connector 库给我一个 CQL 创建表语句给定上面的信息。
期望的输出
create table if not exists test.hello (
col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
col2 varchar,
col3 double,
...
...
col100 bigint,
primary key(col8,col9)
) WITH CLUSTERING ORDER BY (col10 DESC);
【问题讨论】:
【参考方案1】:由于必需的组件(PartitionKeyColumn
和ColumnType
的实例)是 Scala 中的对象,因此您需要使用以下语法来访问它们的实例:
// imports
import com.datastax.spark.connector.cql.ColumnDef;
import com.datastax.spark.connector.cql.PartitionKeyColumn$;
import com.datastax.spark.connector.types.TextType$;
// actual code
ColumnDef a = new ColumnDef("col5",
PartitionKeyColumn$.MODULE$, TextType$.MODULE$);
查看ColumnRole 和PrimitiveTypes 的代码以查找对象/类名称的完整列表。
附加要求后更新:代码很长,但应该可以工作...
SparkSession spark = SparkSession.builder()
.appName("Java Spark SQL example").getOrCreate();
Set<String> partitionKeys = new TreeSet<String>()
add("col1");
add("col2");
;
Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>()
put("col8", 0);
put("col9", 1);
;
Dataset<Row> df = spark.read().json("my-test-file.json");
TableDef td = TableDef.fromDataFrame(df, "test", "hello",
ProtocolVersion.NEWEST_SUPPORTED);
List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();
scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
while (iter.hasNext())
ColumnDef col = iter.next();
String colName = col.columnName();
if (partitionKeys.contains(colName))
partKeyList.add(new ColumnDef(colName,
PartitionKeyColumn$.MODULE$, col.columnType()));
else if (clustereingKeys.containsKey(colName))
int idx = clustereingKeys.get(colName);
clusterColumnList.add(new ColumnDef(colName,
new ClusteringColumn(idx), col.columnType()));
else
regColulmnList.add(new ColumnDef(colName,
RegularColumn$.MODULE$, col.columnType()));
TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(),
(scala.collection.Seq<ColumnDef>) partKeyList,
(scala.collection.Seq<ColumnDef>) clusterColumnList,
(scala.collection.Seq<ColumnDef>) regColulmnList,
td.indexes(), td.isView());
String cql = newTd.cql();
System.out.println(cql);
【讨论】:
非常感谢亚历克斯!我不是 Scala 人,但现在你说的很有意义!那么我是否应该假设在给定数据帧的情况下没有简单的方法来获取 cql 字符串?因为我觉得 TabelDef.fromDataFrame 差不多了!! 我需要进一步了解...据我了解,您想重用现有定义? 所以我的 spark 数据框中有大约 100 列(col1、col2、...col100)。现在我想从中获取一个 cql create 语句..我可以使用TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()
来获取一个 CQL create 语句,但唯一的问题是我没有看到一种方法来指定说使用 col8、col9 作为 ParitionKey 和使用 col10 作为 clusterKey。如果我需要创建新的 TableDef,那么我需要为我所有的 100 列创建一个 ColDef
,这有点乏味,我会作为最后的手段。如果您能告诉我,那将是很大的帮助!
您能否更新问题,详细说明您最终想要实现的目标?我正确理解您想从表定义中生成 CQL 语句?
嗨,亚历克斯!我刚刚编辑了我的问题并添加了所有详细信息。以上是关于如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?的主要内容,如果未能解决你的问题,请参考以下文章
如何在执行 sp_executesql 中获取输出参数的值?
在 spark 版本 2.2.0 中使用 python(pyspark) 从 mqtt 获取数据流
如何使用带有 sql server 2000 的 vb6 为表或 sp 获取“创建表”脚本
如何在 Apache 2.2.9 中使用 mod_ssl 将 openssl 0.9.8 升级到 1.0.2
如何从数据库表中获取数据并将该数据更新到休眠中的另一个表中?我已经在 JSP 中完成了,但我想在 Hibernate 中完成