Scala:使用火花从 scylla 获取数据

Posted

技术标签:

【中文标题】Scala:使用火花从 scylla 获取数据【英文标题】:Scala: get data from scylla using spark 【发布时间】:2021-07-16 04:22:22 【问题描述】:

scala/spark 新手在这里。我继承了一个旧代码,我已经重构并尝试使用它来从 Scylla 检索数据。代码如下:

val TEST_QUERY = s"SELECT user_id FROM test_table WHERE name = ? AND id_type = 'test_type';"

var selectData = List[Row]()
dataRdd.foreachPartition 
  iter => 
    // Build up a cluster that we can connect to
    // Start a session with the cluster by connecting to it.
    val cluster = ScyllaConnector.getCluster(clusterIpString, scyllaPreferredDc, scyllaUsername, scyllaPassword)
    var batchCounter = 0

    val session = cluster.connect(tableConfig.keySpace)
    val preparedStatement: PreparedStatement = session.prepare(TEST_QUERY)

    iter.foreach 
      case (test_name: String) => 
        // Get results
        val testResults = session.execute(preparedStatement.bind(test_name))
        if (testResults != null)
          val testResult = testResults.one()
          if(testResult != null)
            val user_id = testResult.getString("user_id")
            selectData ::= Row(user_id, test_name)
          
        
      
    
    session.close()
    cluster.close()
  


println("Head is =======> ")
println(selectData.head)

上面没有返回任何数据,并且由于空指针异常而失败,因为selectedData 列表是空的,尽管其中肯定有与 select 语句匹配的数据。我觉得我的做法不正确,但不知道需要改变什么才能解决这个问题,因此非常感谢任何帮助。

PS:我使用列表来保存结果的整个想法是,我可以使用该列表来创建数据框。如果您能在这里指出正确的方向,我将不胜感激。

【问题讨论】:

【参考方案1】:

如果你查看foreachPartition function 的定义,你会发现它的定义不能返回任何东西,因为它的返回类型是void

无论如何,这是从 Spark 中查询 Cassandra/Scylla 数据的一种非常糟糕的方式。为此存在Spark Cassandra Connector,由于协议兼容性,它也应该能够与 Scylla 一起使用。

致 read a dataframe from Cassandra 就行了:

spark.read
  .format("cassandra")
  .option("keyspace", "ksname")
  .option("table", "tab")
  .load()

Documentation 写的很详细,看完就好了。

【讨论】:

以上是关于Scala:使用火花从 scylla 获取数据的主要内容,如果未能解决你的问题,请参考以下文章

如何使用火花流处理实时流数据/日志?

火花避免收集尽可能

遍历火花数据框中的列并计算最小值最大值

火花数据集:如何从列中获取唯一值的出现次数

从火花数据帧中读取结构[重复]

使用 scala 使用布尔运算折叠火花数据框中的列