Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)
Posted
技术标签:
【中文标题】Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)【英文标题】:Spark Cassandra Connector: SQLContext.read + SQLContext.write vs. manual parsing and inserting (JSON -> Cassandra) 【发布时间】:2016-07-01 06:52:42 【问题描述】:早上好,
我刚开始研究 Apache Spark 和 Apache Cassandra。第一步是一个真正简单的用例:获取一个包含例如的文件。客户+分数。
Cassandra 表将客户作为 PrimaryKey。 Cassandra 只是在本地运行(所以根本没有集群!)。
所以 SparkJob(独立本地 [2])正在解析 JSON 文件,然后将整个内容写入 Cassandra。
第一个解决方案是
val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val cass = CassandraConnector(conf)
val customerScores = sc.textFile(file).cache()
val customerScoreRDD = customerScores.mapPartitions(lines =>
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
lines
.map(line =>
mapper.readValue(line, classOf[CustomerScore])
)
//Filter corrupt ones: empty values
.filter(customerScore => customerScore.customer != null && customerScore.score != null)
)
customerScoreRDD.foreachPartition(rows => cass.withSessionDo(session =>
val statement: PreparedStatement = session.prepare("INSERT INTO playground.customer_score (customer,score) VALUES (:customer,:score)")
rows.foreach(row =>
session.executeAsync(statement.bind(row.customer.asInstanceOf[Object], row.score))
)
))
sc.stop()
意味着手动完成所有操作,解析行,然后插入 Cassandra。
对于 10000000 条记录,这大约需要 714020 毫秒(包括创建 SparkContext 等...)。
然后我阅读了有关 spark-cassandra-connector 的信息并做了以下事情:
val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
var sql = new SQLContext(sc)
val customerScores = sql.read.json(file)
val customerScoresCorrected = customerScores
//Filter corrupt ones: empty values
.filter("customer is not null and score is not null")
//Filter corrupt ones: invalid properties
.select("customer", "score")
customerScoresCorrected.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "playground", "table" -> "customer_score"))
.save()
sc.stop()
在需要的代码和使用给定的 API 方面要简单得多。
对于 10000000 条记录,此解决方案大约需要 1232871 毫秒(再次强调,测量点相同)。
(也有第三种解决方案,手动解析并使用 saveToCassandra
,这需要 1530877 毫秒)
现在是我的问题:
哪种方式是实现此用例的“正确”方式,那么哪种方式是当今的“最佳实践”(在实际场景中,集群 cassandra 和 spark,性能最好的一种)?
因为从我的结果来看,我会使用“手动”的东西而不是 SQLContext.read
+ SQLContext.write
。
提前感谢您的 cmets 和提示。
【问题讨论】:
我们在编写 RDD(使用saveToCassandra
)时使用 Cassandra 连接器取得了不错的效果。使用 RDDs 而不是 DataFrames 可以让您根据 Cassandra 令牌范围(使用 repartitionByCassandraReplica
)重新分区,这将导致大多数写入是本地的,从而避免大量 Cassandra 协调器工作。
感谢@LiMuBei,这实际上减少了saveToCassandra
,即使在本地测试用例中也是如此。总的来说,“手动”解决方案(第一个代码 sn-p)似乎仍然是最快的解决方案。
在您的第一个解决方案中,您是否不等待异步操作实际完成?在我看来,这种方式可能无法保证所有插入操作都成功完成。
嗯......你在我的例子中是对的我不知道一切是否正常。
【参考方案1】:
其实玩了这么久,还是要考虑跟风的。
当然是数据量 您的数据类型:尤其是分区键的多样性(每个分区键不同与大量重复) 环境:Spark 执行器、Cassandra 节点、复制...对于我的 UseCase 玩弄
def initSparkContext: SparkContext =
val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
// since we have nearly totally different PartitionKeys, default: 1000
.set("spark.cassandra.output.batch.grouping.buffer.size", "1")
// write as much concurrently, default: 5
.set("spark.cassandra.output.concurrent.writes", "1024")
// batch same replica, default: partition
.set("spark.cassandra.output.batch.grouping.key", "replica_set")
val sc = new SparkContext(conf)
sc
在我的本地跑步中确实显着提高了速度。
因此,非常需要尝试各种参数以获得最佳方法。至少这是我得到的结论。
【讨论】:
以上是关于Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)的主要内容,如果未能解决你的问题,请参考以下文章
Spark Cassandra 连接器找不到 java.time.LocalDate
Spark Cassandra 连接器 - perPartitionLimit
Spark Cassandra 连接器 - where 子句
RDD 不可序列化 Cassandra/Spark 连接器 java API
Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)