使用Java Spark将数据集保存到Cassandra中
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Java Spark将数据集保存到Cassandra中相关的知识,希望对你有一定的参考价值。
我正在尝试使用Java Spark将数据集保存到cassandra db。我可以使用以下代码将数据成功读取到数据集中]
Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra") .option("keyspace","dbname") .option("table","tablename") .load();
但是当我尝试编写数据集时,我得到的是[[IOException:无法加载或查找表,在键空间中找到了类似的表
Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();
我在sparksession中设置主机和端口问题是我能够以覆盖和附加模式编写,但无法创建表
我正在使用的版本如下:火花java 2.0spark cassandra连接器2.3
尝试了不同的jar版本,但没有任何效果我也经历了不同的堆栈溢出和github链接
非常感谢您的帮助。
我正在尝试使用Java Spark将数据集保存到cassandra db。我可以使用下面的代码Dataset
readdf = sparkSession.read()。format(“ org.apache ....
write
操作没有可以为您自动创建表的模式-这样做的原因有多种。其中之一是您需要为表定义一个主键,否则,如果您设置了错误的主键,则可能会覆盖数据。因此,Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure,但您需要提供分区和群集键列的列表。在Java中,它将如下所示(完整代码为here):DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
partitionSeqlist, clusteringSeqlist, connector);
以上是关于使用Java Spark将数据集保存到Cassandra中的主要内容,如果未能解决你的问题,请参考以下文章
如何将具有值的列添加到 Spark Java 中的新数据集?
java.sql.SQLException:将 Spark 数据帧保存到 Sybase 时找不到类型“TIMESTAMP”