Spark SQL 具有相同行的不同分区

Posted

技术标签:

【中文标题】Spark SQL 具有相同行的不同分区【英文标题】:Spark SQL different partitions with the same Row 【发布时间】:2017-04-04 18:58:50 【问题描述】:

我正在尝试使用如下代码将 Spark SQL Row 值插入数据库:

final Broadcast<String> jdbcUrl = sc.broadcast(config.jdbcUrl());
df.foreachPartition((final Iterator<Row> it) -> 
    final Sql2o sql2o = new Sql2o(jdbcUrl.value(), null, null, new NoQuirks());
    try (final Connection conn = sql2o.beginTransaction()) 
        final String sql = "INSERT INTO Table (Id, Value) VALUES (:id, :value)";
        final Query query = conn.createQuery(sql, false);
        int batchSize = 0;
        while (it.hasNext()) 
            final Row row = it.next();
            query.addParameter("id", row.getLong(0))
                .addParameter("value", row.get(1));
                .addToBatch();
            if (++batchSize == 1000) 
                query.executeBatch();
                conn.commit();
                batchSize = 0;
            
        
        query.executeBatch();
        conn.commit();
    
);

我收到主键违规错误:

java.sql.BatchUpdateException:违反主键约束 'PK_Table'。无法在对象“表”中插入重复键。这 重复键值为 42。

我添加了一些调试日志记录代码,并验证了两个不同的执行程序试图插入相同的行(具有相同的 id 和值)。

在 Spark SQL 开始插入 Row 值之前,该表为空。此外,我尝试在 DataFrame 上调用 foreachPartition() 之前调用 distinct()persist(),但我仍然遇到问题。

同一个DataFrame的不同分区不应该有不同的数据吗?分区器不总是保证吗?

编辑:

我在 DataFrame 上运行 df.groupBy(df.col("id")).count().filter(col("count").gt(1)).show(); 并且没有分组到多个行的 id:

+--+-----+
|id|count|
+--+-----+
+--+-----+

据我所知,看起来同一个分区正在不同的执行程序中同时迭代。怎么会?

【问题讨论】:

如果这不是 SQL Server 问题,为什么还要标记 Microsoft SQL-Server? @pmbAustin,很公平。我删除了标签。 正如您的错误代码所说:42。这就是答案;)好的,现在真正的建议:distinct() 生成不同的行。这意味着,您可以有 2 个相同的 ID,但第二个值会不同。尝试做groupBy('id).count @T.Gawęda,并将其与 df.count 进行比较? 否,将其过滤为大于 1 的值 【参考方案1】:

也许,列索引是错误的; “id”不在 0 列中。更好:

userow.getAs[Long]("id")

而不是 row.getLong(0)

【讨论】:

感谢您的回答。试过了,我得到了这个问题。 :- (

以上是关于Spark SQL 具有相同行的不同分区的主要内容,如果未能解决你的问题,请参考以下文章

Hive 分区到 Spark 分区

Spark parquet 模式演变

sparksql怎么批量删除分区

Spark调研笔记第3篇 - Spark集群相应用的调度策略简单介绍

Spark读取HDFS数据分区参考

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量