Scala:使用火花从 scylla 获取数据
Posted
技术标签:
【中文标题】Scala:使用火花从 scylla 获取数据【英文标题】:Scala: get data from scylla using spark 【发布时间】:2021-07-16 04:22:22 【问题描述】:scala/spark 新手在这里。我继承了一个旧代码,我已经重构并尝试使用它来从 Scylla 检索数据。代码如下:
val TEST_QUERY = s"SELECT user_id FROM test_table WHERE name = ? AND id_type = 'test_type';"
var selectData = List[Row]()
dataRdd.foreachPartition
iter =>
// Build up a cluster that we can connect to
// Start a session with the cluster by connecting to it.
val cluster = ScyllaConnector.getCluster(clusterIpString, scyllaPreferredDc, scyllaUsername, scyllaPassword)
var batchCounter = 0
val session = cluster.connect(tableConfig.keySpace)
val preparedStatement: PreparedStatement = session.prepare(TEST_QUERY)
iter.foreach
case (test_name: String) =>
// Get results
val testResults = session.execute(preparedStatement.bind(test_name))
if (testResults != null)
val testResult = testResults.one()
if(testResult != null)
val user_id = testResult.getString("user_id")
selectData ::= Row(user_id, test_name)
session.close()
cluster.close()
println("Head is =======> ")
println(selectData.head)
上面没有返回任何数据,并且由于空指针异常而失败,因为selectedData
列表是空的,尽管其中肯定有与 select 语句匹配的数据。我觉得我的做法不正确,但不知道需要改变什么才能解决这个问题,因此非常感谢任何帮助。
PS:我使用列表来保存结果的整个想法是,我可以使用该列表来创建数据框。如果您能在这里指出正确的方向,我将不胜感激。
【问题讨论】:
【参考方案1】:如果你查看foreachPartition function 的定义,你会发现它的定义不能返回任何东西,因为它的返回类型是void
。
无论如何,这是从 Spark 中查询 Cassandra/Scylla 数据的一种非常糟糕的方式。为此存在Spark Cassandra Connector,由于协议兼容性,它也应该能够与 Scylla 一起使用。
致 read a dataframe from Cassandra 就行了:
spark.read
.format("cassandra")
.option("keyspace", "ksname")
.option("table", "tab")
.load()
Documentation 写的很详细,看完就好了。
【讨论】:
以上是关于Scala:使用火花从 scylla 获取数据的主要内容,如果未能解决你的问题,请参考以下文章