如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?

Posted

技术标签:

【中文标题】如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?【英文标题】:How to get Cassandra cql string given a Apache Spark Dataframe in 2.2.0? 【发布时间】:2018-01-25 12:25:29 【问题描述】:

我正在尝试获取给定 Dataframe 的 cql 字符串。我遇到了这个function

我可以在哪里做这样的事情

TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()

在我看来,该库使用第一列作为分区键并且不关心集群键,那么我如何指定使用 Dataframe 的特定列集作为 PartitionKey 和 ParticularSet 列作为集群键?

看起来我可以创建一个新的 TableDef,但是我必须自己完成整个映射,并且在某些情况下,Java 中无法访问像 ColumnType 这样的必要函数。例如,我尝试创建一个新的 ColumnDef,如下所示

new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)

目标:从 Spark DataFrame 中获取 CQL 创建语句。

输入 我的数据框可以有任意数量的列以及它们各自的 Spark 类型。所以说我有一个包含 100 列的 Spark 数据框,其中我的数据框的 col8、col9 对应于 cassandra partitionKey 列,我的 column10 对应于 cassandra 聚类键列

col1| col2| ...|col100

现在我想使用 spark-cassandra-connector 库给我一个 CQL 创建表语句给定上面的信息。

期望的输出

create table if not exists test.hello (
   col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
   col2 varchar,
   col3 double,
   ...
   ...
   col100 bigint,
   primary key(col8,col9)
) WITH CLUSTERING ORDER BY (col10 DESC);

【问题讨论】:

【参考方案1】:

由于必需的组件(PartitionKeyColumnColumnType 的实例)是 Scala 中的对象,因此您需要使用以下语法来访问它们的实例:

// imports
import com.datastax.spark.connector.cql.ColumnDef;
import com.datastax.spark.connector.cql.PartitionKeyColumn$;
import com.datastax.spark.connector.types.TextType$;

// actual code
ColumnDef a = new ColumnDef("col5",  
      PartitionKeyColumn$.MODULE$, TextType$.MODULE$);

查看ColumnRole 和PrimitiveTypes 的代码以查找对象/类名称的完整列表。

附加要求后更新:代码很长,但应该可以工作...

SparkSession spark = SparkSession.builder()
                .appName("Java Spark SQL example").getOrCreate();

Set<String> partitionKeys = new TreeSet<String>() 
                add("col1");
                add("col2");
        ;
Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>() 
                put("col8", 0);
                put("col9", 1);
        ;

Dataset<Row> df = spark.read().json("my-test-file.json");
TableDef td = TableDef.fromDataFrame(df, "test", "hello", 
                ProtocolVersion.NEWEST_SUPPORTED);

List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();

scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
while (iter.hasNext()) 
        ColumnDef col = iter.next();
        String colName = col.columnName();
        if (partitionKeys.contains(colName)) 
                partKeyList.add(new ColumnDef(colName, 
                                PartitionKeyColumn$.MODULE$, col.columnType()));
         else if (clustereingKeys.containsKey(colName)) 
                int idx = clustereingKeys.get(colName);
                clusterColumnList.add(new ColumnDef(colName, 
                                new ClusteringColumn(idx), col.columnType()));
         else 
                regColulmnList.add(new ColumnDef(colName, 
                                RegularColumn$.MODULE$, col.columnType()));
        


TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(), 
                (scala.collection.Seq<ColumnDef>) partKeyList,
                (scala.collection.Seq<ColumnDef>) clusterColumnList, 
                (scala.collection.Seq<ColumnDef>) regColulmnList,
                td.indexes(), td.isView());
String cql = newTd.cql();
System.out.println(cql);

【讨论】:

非常感谢亚历克斯!我不是 Scala 人,但现在你说的很有意义!那么我是否应该假设在给定数据帧的情况下没有简单的方法来获取 cql 字符串?因为我觉得 TabelDef.fromDataFrame 差不多了!! 我需要进一步了解...据我了解,您想重用现有定义? 所以我的 spark 数据框中有大约 100 列(col1、col2、...col100)。现在我想从中获取一个 cql create 语句..我可以使用TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql() 来获取一个 CQL create 语句,但唯一的问题是我没有看到一种方法来指定说使用 col8、col9 作为 ParitionKey 和使用 col10 作为 clusterKey。如果我需要创建新的 TableDef,那么我需要为我所有的 100 列创建一个 ColDef,这有点乏味,我会作为最后的手段。如果您能告诉我,那将是很大的帮助! 您能否更新问题,详细说明您最终想要实现的目标?我正确理解您想从表定义中生成 CQL 语句? 嗨,亚历克斯!我刚刚编辑了我的问题并添加了所有详细信息。

以上是关于如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?的主要内容,如果未能解决你的问题,请参考以下文章

如何在执行 sp_executesql 中获取输出参数的值?

在 spark 版本 2.2.0 中使用 python(pyspark) 从 mqtt 获取数据流

如何使用带有 sql server 2000 的 vb6 为表或 sp 获取“创建表”脚本

如何在 Apache 2.2.9 中使用 mod_ssl 将 openssl 0.9.8 升级到 1.0.2

如何从数据库表中获取数据并将该数据更新到休眠中的另一个表中?我已经在 J​​SP 中完成了,但我想在 Hibernate 中完成

如何在 Apache livy 中提交 pyspark 作业?