将从joinWithCassandraTable获取的CassandraRow转换为DataFrame
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将从joinWithCassandraTable获取的CassandraRow转换为DataFrame相关的知识,希望对你有一定的参考价值。
case class SourcePartition(id: String, host:String ,bucket: Int)
joinedRDDs =partitions.joinWithCassandraTable("db_name","table_name")
joinedRDDs.values.foreach(println)
我必须使用joinWithCassandraTable,如何将结果CassandraRow转换为DataFrame?或者与DataFrame有任何等价的joinWithCassandraTable?
我必须一次读取很多分区,我知道Datastax Cassandra连接器Predicate按下,但它允许一次只拉一个分区(它似乎不允许IN运算符,只有似乎支持)
答案
val spark: SparkSession = SparkSession.builder().master("local[4]").appName("RDD2DF").getOrCreate()
val sc: SparkContext = spark.sparkContext
import spark.implicits._
val internalJoinRDD = spark.sparkContext.cassandraTable("test", "test_table_1").joinWithCassandraTable("test", "table_table_2")
internalJoin.toDebugString
internalJoinRDD.toDF()
你能试试上面的代码片段吗?
如果您有数据架构,则可以使用
def createDataFrame(internalJoinRDD: RDD[Row], schema: StructType): DataFrame
以上是关于将从joinWithCassandraTable获取的CassandraRow转换为DataFrame的主要内容,如果未能解决你的问题,请参考以下文章