Spark cassandra 连接器 + 加入超时

Posted

技术标签:

【中文标题】Spark cassandra 连接器 + 加入超时【英文标题】:Spark cassandra connector + join time outs 【发布时间】:2020-11-30 16:07:33 【问题描述】:

我需要加入两个 Spark 数据帧并将结果写回 Hive。以下是数据框:

Dataframe1:Cassandra 表 - 分区和聚类键:(ID,PART_NBR)

val df1 = spark.read.format("org.apache.spark.sql.cassandra")
    .option("keyspace", "mykeyspace")
    .option("table", "mytable")
    .load

**Dataframe2:从其他来源获取的键(即上表中的 ID 列的分区键)的数据帧 - 该表中不同键的数量约为 15 万个**

val df2 = spark.read
    .format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("url","****")
    .option("dbtable","table")
    .option("user", "username")
    .option("password", "password123")
    .load()

  val joinExpr = df1.col("ID") === df2.col("ID")

  val res = df1.join(df2,joinExpr)

  res.write.mode(SaveMode.Append).format("orc")
    .saveAsTable("targetTable")

现在此代码总是导致 "com.datastax.oss.driver.api.core.servererrors.ReadFailureException: Cassandra 在读取查询期间以一致性 LOCAL_ONE 失败(需要 1 个响应,但只有 0 个副本响应,1 个失败)".

即使失败,也将 LOCAL_ONE 更改为 QUORUM。

我什至尝试将键数据帧拆分为 20 个键的批次(数据帧中的 20 个 ID 值),然后加入 cassandra 表 - 即使这样也失败了。

我什至尝试过 IN 子句,尽管它可以工作 DBA 限制我们运行它,因为它会加载 Cassandra 并导致 CPU 峰值。

在与 Cassandra DBA 核对时,他们要求进行有针对性的查询,因为上述查询会导致大令牌范围扫描并导致失败。但是个别有针对性的查询会导致 15 万次往返 Cassandra(这需要几个小时才能完成),而且成本太高。

为什么会导致如此庞大的令牌范围扫描?我们如何解决这个问题?我的替代方案是什么?

pom.xml 依赖

 <scala.version>2.11.12</scala.version>
 <spark.version>2.2.0</spark.version>

        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>$spark.version</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>$spark.version</version>
            <scope>provided</scope>
        </dependency>



尝试了以下但它没有直接加入。我有什么遗漏吗?

spark-submit --class ExampleCassandra --deploy-mode client --num-executors 15 --executor-memory 4g  --driver-memory=1g  --conf spark.sql.shuffle.partitions=25 --conf spark.executor.heartbeatInterval=100s --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --jars spark-sql_2.11-2.4.0.jar,spark-core_2.11-2.4.0.jar,spark-hive_2.11-2.4.0.jar,mysql-connector-java-8.0.18.jar,spark-cassandra-connector_2.11-2.5.1.jar ExampleCassandra-bundled-1.0-SNAPSHOT.jar

代码中打印的 Spark 版本 => spark.sparkContext.version = 2.4.0

最终的计划

== Physical Plan ==
*(8) SortMergeJoin [item_nbr#31], [item_nbr#24], Inner
:- *(2) Sort [item_nbr#31 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(item_nbr#31, 25)
:     +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [item_nbr#31,planNum#32,strN#33,currTail#34,currTailTy#35,hor#36,prNbr#37,revSce#38,stckHnad#39] PushedFilters: [], ReadSchema: struct<item_nbr:int,planNum:int,strN:int,currTail:decimal(38,18),currTailTy:s...
+- *(7) Sort [item_nbr#24 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(item_nbr#24, 25)
      +- *(6) HashAggregate(keys=[item_nbr#21], functions=[])
         +- Exchange hashpartitioning(item_nbr#21, 25)
            +- *(5) HashAggregate(keys=[item_nbr#21], functions=[])
               +- *(5) Filter (NOT (trim(lower(item_nbr#21), None) = null) && isnotnull(cast(trim(item_nbr#21, None) as int)))
                  +- Generate explode(split(items#4, ,)), false, [item_nbr#21]
                     +- *(4) Project [items#4]
                        +- *(4) BroadcastHashJoin [planNum#0], [planNum#2], Inner, BuildRight
                           :- *(4) Scan JDBCRelation(( select planNum from QAMdPlans.Plan where plan_type = 'MBM' order by planNum desc ) t) [numPartitions=1] [planNum#0] PushedFilters: [*IsNotNull(planNum)], ReadSchema: struct<planNum:int>
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                              +- *(3) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [planNum#2,items#4] PushedFilters: [], ReadSchema: struct<planNum:int,items:string>

【问题讨论】:

什么是 spark cassandra 连接器版本? com.datastax.sparkspark-cassandra-connector_2.112.0.5 【参考方案1】:

问题是版本 2.0.5 没有优化 Dataframes 的连接 - 如果你这样做 res.explain 你会看到 Spark 将执行从 Cassandra 读取所有数据,然后在 Spark 上执行连接等级。优化连接仅在 RDD API 中可用,为leftJoinWithCassandraTablejoinWithCassandraTable

release of the Spark Cassandra Connector 2.5 改变了这种情况,现在包括针对 Dataframe API 的优化连接(但您需要启用 Spark SQL 扩展才能使其正常工作)。因此,您需要将连接器升级到 2.5.latest(目前为 2.5.1),或使用 RDD API 中的连接功能。

附:我最近 wrote a detailed blog post 使用 Dataframe 和 RDD API 与 Spark 中的 Cassandra 表中的数据进行有效连接。

【讨论】:

谢谢你,Alex,它的信息量很大的博客,你有它。但是 2.5.1 am 面临以下问题。错误ApplicationMaster:用户类抛出异常:java.lang.NoSuchMethodError:org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats()Lorg/apache/spark/sql/catalyst/plans/logical/Statistics; 在原始问题中添加了 pom 依赖项,我猜是当前问题 User class throw exception: java.lang.NoSuchMethodError: 这可能是由于依赖项,任何建议@Alex Ott 啊,SCC 2.5.1 需要 Spark 2.4(或至少 2.3)。 Spark 2.2 没有数据帧连接优化所需的功能 Alex,为问题添加了更多细节。尝试使用 spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions 和 spark 版本 2.4.0,但它没有直接加入。有什么建议吗? 另一个表有多少数据?因为如果是 Cassandra 中 > 90% 的数据,那么它将使用 scan + spark join。

以上是关于Spark cassandra 连接器 + 加入超时的主要内容,如果未能解决你的问题,请参考以下文章

如何用Cassandra连接火花

Spark Cassandra 连接器找不到 java.time.LocalDate

Spark Cassandra 连接器 - perPartitionLimit

Spark Cassandra 连接器 - where 子句

RDD 不可序列化 Cassandra/Spark 连接器 java API

Spark Cassandra 连接器:SQLContext.read + SQLContext.write 与手动解析和插入(JSON -> Cassandra)