将从joinWithCassandraTable获取的CassandraRow转换为DataFrame

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将从joinWithCassandraTable获取的CassandraRow转换为DataFrame相关的知识,希望对你有一定的参考价值。

case class SourcePartition(id: String, host:String ,bucket: Int)
joinedRDDs =partitions.joinWithCassandraTable("db_name","table_name")
joinedRDDs.values.foreach(println)

我必须使用joinWithCassandraTable,如何将结果CassandraRow转换为DataFrame?或者与DataFrame有任何等价的joinWithCassandraTable?

我必须一次读取很多分区,我知道Datastax Cassandra连接器Predicate按下,但它允许一次只拉一个分区(它似乎不允许IN运算符,只有似乎支持)

答案
val spark: SparkSession = SparkSession.builder().master("local[4]").appName("RDD2DF").getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    val internalJoinRDD = spark.sparkContext.cassandraTable("test", "test_table_1").joinWithCassandraTable("test", "table_table_2")
    internalJoin.toDebugString

    internalJoinRDD.toDF()

你能试试上面的代码片段吗?

如果您有数据架构,则可以使用

def createDataFrame(internalJoinRDD: RDD[Row], schema: StructType): DataFrame

以上是关于将从joinWithCassandraTable获取的CassandraRow转换为DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

巧用数字营销,Livi Bank 解决获客难题

巧用数字营销,Livi Bank 解决获客难题

Base64解码没有结果

将数组传递给组件

GitLab:从回购断开文件

进程_线程 之 --- 生产者消费者