通过 java 客户端的 Spark Cassandra 连接

Posted

技术标签:

【中文标题】通过 java 客户端的 Spark Cassandra 连接【英文标题】:Spark Cassandra connection through java client 【发布时间】:2021-09-06 22:12:04 【问题描述】:

我想通过 spark 作业连接到我的 scylla db/cassandra 并使用 java 客户端执行查找查询。我试过了

val spark = SparkSession.builder.appName("ScyllaSparkClient")
  .master("local[1]")
  .getOrCreate()


import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show

val vdf = df.mapPartitions(p => 
  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect("MyKeySpace")

    val res = p.map(record => 
      val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='$record.get(1)' and first='$record.get(0)'")
      val row = results.one()
      var scyllaRow: Person = null
      if (row != null) 
        scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
      

      scyllaRow
    )

  session.close()
  cluster.close()
  res
)
vdf.show()

但遇到主机不可用异常(虽然没有连接问题,但它与 java 客户端一起工作正常)

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:210)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:274)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:114)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:94)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 27 more

感谢任何帮助。

【问题讨论】:

【参考方案1】:

您需要使用 Spark Cassandra 连接器从 Spark 连接到 Cassandra 数据库。

连接器可从此处获得——https://github.com/datastax/spark-cassandra-connector。但是由于您要连接到 Scylla DB,因此您可能需要使用 Scylla 的连接器分支。干杯!

【讨论】:

Datastax 或 ScyllaDB 连接器都可以正常工作。但建议使用 Scylla 连接器,因为它是“分片感知”的,从而产生更好的吞吐量和更低的延迟。 github.com/scylladb/scylla-migrator【参考方案2】:

使用 com.datastax.spark.connector.cql.CassandraConnector 中的“CassandraConnector 它将负责每个分区的会话管理。

def main(args: Array[String]): Unit = 

val spark = SparkSession.builder.appName("ScyllaSparkClient")
  .config("spark.cassandra.connection.host", "localhost")
  .master("local[1]")
  .getOrCreate()


import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show

val connector = CassandraConnector(spark.sparkContext.getConf)

val vdf = df.mapPartitions(p => 
  connector.withSessionDo  session =>
    val res = p.map(record => 
      val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='$record.get(1)' and first='$record.get(0)'")
      val row = results.one()
      var scyllaRow: Person = null
      if (row != null) 
        scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
      
      scyllaRow
    )
    res
  
)
vdf.show()

它会起作用的!

【讨论】:

以上是关于通过 java 客户端的 Spark Cassandra 连接的主要内容,如果未能解决你的问题,请参考以下文章

EMR LinkageError 上的 Spark + Cassandra

通过 Dataflow Java API 返回客户端的不一致 BigQuery 数据类型

Spark从入门到精通7:Spark客户端之Spark Submit的使用

Java获取客户端IP

java cxf动态调用服务端的webservices方法

redis在java客户端的操作