Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)

Posted

技术标签:

【中文标题】Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)【英文标题】:Spark Cassandra Connector: SQLContext.read + SQLContext.write vs. manual parsing and inserting (JSON -> Cassandra) 【发布时间】:2016-07-01 06:52:42 【问题描述】:

早上好,

我刚开始研究 Apache Spark 和 Apache Cassandra。第一步是一个真正简单的用例:获取一个包含例如的文件。客户+分数。

Cassandra 表将客户作为 PrimaryKey。 Cassandra 只是在本地运行(所以根本没有集群!)。

所以 SparkJob(独立本地 [2])正在解析 JSON 文件,然后将整个内容写入 Cassandra。

第一个解决方案是

val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val cass = CassandraConnector(conf)

val customerScores = sc.textFile(file).cache()

val customerScoreRDD = customerScores.mapPartitions(lines => 
  val mapper = new ObjectMapper with ScalaObjectMapper
  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  mapper.registerModule(DefaultScalaModule)
  lines
    .map(line => 
      mapper.readValue(line, classOf[CustomerScore])
    )
    //Filter corrupt ones: empty values
    .filter(customerScore => customerScore.customer != null && customerScore.score != null)
)


customerScoreRDD.foreachPartition(rows => cass.withSessionDo(session => 
  val statement: PreparedStatement = session.prepare("INSERT INTO playground.customer_score (customer,score) VALUES (:customer,:score)")
  rows.foreach(row => 
    session.executeAsync(statement.bind(row.customer.asInstanceOf[Object], row.score))
  )
))

sc.stop()

意味着手动完成所有操作,解析行,然后插入 Cassandra。

对于 10000000 条记录,这大约需要 714020 毫秒(包括创建 SparkContext 等...)。

然后我阅读了有关 spark-cassandra-connector 的信息并做了以下事情:

val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
var sql = new SQLContext(sc)

val customerScores = sql.read.json(file)

val customerScoresCorrected = customerScores
  //Filter corrupt ones: empty values
  .filter("customer is not null and score is not null")
  //Filter corrupt ones: invalid properties
  .select("customer", "score")

customerScoresCorrected.write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("keyspace" -> "playground", "table" -> "customer_score"))
  .save()

sc.stop()

在需要的代码和使用给定的 API 方面要简单得多。

对于 10000000 条记录,此解决方案大约需要 1232871 毫秒(再次强调,测量点相同)。

(也有第三种解决方案,手动解析并使用 saveToCassandra,这需要 1530877 毫秒

现在是我的问题:

哪种方式是实现此用例的“正确”方式,那么哪种方式是当今的“最佳实践”(在实际场景中,集群 cassandra 和 spark,性能最好的一种)? 因为从我的结果来看,我会使用“手动”的东西而不是 SQLContext.read + SQLContext.write

提前感谢您的 cmets 和提示。

【问题讨论】:

我们在编写 RDD(使用 saveToCassandra)时使用 Cassandra 连接器取得了不错的效果。使用 RDDs 而不是 DataFrames 可以让您根据 Cassandra 令牌范围(使用 repartitionByCassandraReplica)重新分区,这将导致大多数写入是本地的,从而避免大量 Cassandra 协调器工作。 感谢@LiMuBei,这实际上减少了saveToCassandra,即使在本地测试用例中也是如此。总的来说,“手动”解决方案(第一个代码 sn-p)似乎仍然是最快的解决方案。 在您的第一个解决方案中,您是否不等待异步操作实际完成?在我看来,这种方式可能无法保证所有插入操作都成功完成。 嗯......你在我的例子中是对的我不知道一切是否正常。 【参考方案1】:

其实玩了这么久,还是要考虑跟风的。

当然是数据量 您的数据类型:尤其是分区键的多样性(每个分区键不同与大量重复) 环境:Spark 执行器、Cassandra 节点、复制...

对于我的 UseCase 玩弄

def initSparkContext: SparkContext = 
    val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
        // since we have nearly totally different PartitionKeys, default: 1000
        .set("spark.cassandra.output.batch.grouping.buffer.size", "1")
        // write as much concurrently, default: 5
       .set("spark.cassandra.output.concurrent.writes", "1024")
       // batch same replica, default: partition
       .set("spark.cassandra.output.batch.grouping.key", "replica_set") 
    val sc = new SparkContext(conf)
    sc

在我的本地跑步中确实显着提高了速度。

因此,非常需要尝试各种参数以获得最佳方法。至少这是我得到的结论。

【讨论】:

以上是关于Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)的主要内容,如果未能解决你的问题,请参考以下文章

如何用Cassandra连接火花

Spark Cassandra 连接器找不到 java.time.LocalDate

Spark Cassandra 连接器 - perPartitionLimit

Spark Cassandra 连接器 - where 子句

RDD 不可序列化 Cassandra/Spark 连接器 java API

Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)