使用Java Spark将数据集保存到Cassandra中

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Java Spark将数据集保存到Cassandra中相关的知识,希望对你有一定的参考价值。

我正在尝试使用Java Spark将数据集保存到cassandra db。我可以使用以下代码将数据成功读取到数据集中]

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

但是当我尝试编写数据集时,我得到的是[[IOException:无法加载或查找表,在键空间中找到了类似的表

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra") .option("keyspace","dbname") .option("table","tablename") .save();
我在sparksession中设置主机和端口问题是我能够以覆盖和附加模式编写,但无法创建表

我正在使用的版本如下:火花java 2.0spark cassandra连接器2.3

尝试了不同的jar版本,但没有任何效果我也经历了不同的堆栈溢出和github链接

非常感谢您的帮助。

我正在尝试使用Java Spark将数据集保存到cassandra db。我可以使用下面的代码Dataset

readdf = sparkSession.read()。format(“ org.apache ....

答案
Spark中的write操作没有可以为您自动创建表的模式-这样做的原因有多种。其中之一是您需要为表定义一个主键,否则,如果您设置了错误的主键,则可能会覆盖数据。因此,Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure,但您需要提供分区和群集键列的列表。在Java中,它将如下所示(完整代码为here):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset); Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer( Arrays.asList("part")).seq()); Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer( Arrays.asList("clust", "col2")).seq()); CassandraConnector connector = new CassandraConnector( CassandraConnectorConf.apply(spark.sparkContext().getConf())); dfFunctions.createCassandraTable("test", "widerows6", partitionSeqlist, clusteringSeqlist, connector);

以上是关于使用Java Spark将数据集保存到Cassandra中的主要内容,如果未能解决你的问题,请参考以下文章

将字段附加到 JSON 数据集 Java-Spark

如何将具有值的列添加到 Spark Java 中的新数据集?

java.sql.SQLException:将 Spark 数据帧保存到 Sybase 时找不到类型“TIMESTAMP”

在 Spark 中,将数据集写入数据库时​​,保存操作需要一些预先假定的时间

Spark SQL - 无法将所有记录写入配置单元表

将模式应用于 Java 对象的 Spark 数据集