spark如何在cassandra表之间复制数据?

Posted

技术标签:

【中文标题】spark如何在cassandra表之间复制数据?【英文标题】:How does spark copy data between cassandra tables? 【发布时间】:2018-11-11 03:25:43 【问题描述】:

当从一个表读取数据并在 cassandra 中将其写入另一个表时,谁能解释一下 spark 的内部工作原理。

这是我的用例:

我正在通过 kafka 主题将来自 IOT 平台的数据摄取到 cassandra。我有一个小的 python 脚本,它解析来自 kafka 的每条消息以获取它所属的表名,准备一个查询并使用 datastax 的 cassandra-driver for python 将其写入 cassandra。使用该脚本,我可以每分钟将大约 300000 条记录 摄取到 cassandra 中。但是我的传入数据速率是每分钟 510000 条记录,因此 kafka 消费者延迟不断增加。

Python 脚本已经在对 cassandra 进行并发调用。如果我增加 python 执行器的数量,cassandra-driver 开始失败,因为 cassandra 节点对其不可用。我假设我在那里打的每秒 cassandra 调用次数是有限制的。这是我收到的错误消息:

ERROR Operation failed: ('Unable to complete the operation against any hosts', <Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',))"

最近,我运行了一个 pyspark 作业,将数据从一个表中的几列复制到另一个。该表中有大约 1.68 亿条记录。 Pyspark 作业在大约 5 小时内完成。因此它每分钟处理超过 550000 条记录

这是我正在使用的 pyspark 代码:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

版本:

卡桑德拉 3.9。 Spark 2.1.0。 Datastax 的 spark-cassandra-connector 2.0.1 Scala 2.11 版

集群:

具有 3 个工作节点和 1 个主节点的 Spark 设置。 3 个工作节点也安装了 cassandra 集群。 (每个 cassandra 节点都有一个 spark 工作节点) 允许每个工作人员使用 10 GB 内存和 3 个内核。

所以我想知道:

spark 是否首先从 cassandra 读取所有数据,然后将其写入新表,或者 spark cassandra 连接器中是否有某种优化,允许它在 cassandra 表中移动数据而不读取所有记录?

如果我将我的 python 脚本替换为一个 spark 流作业,在该作业中我解析数据包以获取 cassandra 的表名,这将有助于我更快地将数据摄取到 cassandra 中吗?

【问题讨论】:

【参考方案1】:

Spark 连接器经过优化,因为它可以并行处理数据并将数据读取/插入到拥有数据的节点中。使用 Cassandra Spark 连接器可能会获得更好的吞吐量,但这需要更多资源。

谈论您的任务 - 300000 次插入/分钟是 5000 次/秒,坦率地说,这不是一个很大的数字 - 您可以通过进行不同的优化来提高吞吐量:

使用asynchronous calls 提交请求。您只需要确保提交更多可以由一个连接处理的请求(但您也可以增加此数量 - 我不确定如何在 Python 中执行此操作,但请查看Java driver doc 了解一下) . 使用正确的一致性级别(LOCAL_ONE 应该会给你很好的性能) 使用正确的load balancing policy 您可以并行运行多个脚本副本,确保它们都在同一个 Kafka 消费者组中。

【讨论】:

谢谢。我将尝试这些优化。我已经在使用异步调用。我也在使用 python 的 concurrent 库,它允许我在同一个脚本中拥有多个执行器。但是,当我从某个数量(比如 7 个)增加执行程序的数量时,我开始收到 'Unable to complete the operation against any hosts' 错误。就好像节点已经离线一样。可能是因为我遇到了一些 cassandra 调用限制吗? 您需要检查 Cassandra 端的日志以查看其原因。很可能是节点过载了,但这可能有很多原因 - 这取决于您用于写入的一致性级别等。

以上是关于spark如何在cassandra表之间复制数据?的主要内容,如果未能解决你的问题,请参考以下文章

如何在spark中读写cassandra数据

当表在集群中的多个节点之间复制时,COPY 如何在 cassandra 中工作?

SPARK SQL 和 Cassandra 之间的时区不匹配

Spark + cassandra:如何创建键空间?

Spark 和 Cassandra:推荐的接口方式

如何在 Spark 中从 cassandra datastax 云中读取数据