Spark SQL 具有相同行的不同分区
Posted
技术标签:
【中文标题】Spark SQL 具有相同行的不同分区【英文标题】:Spark SQL different partitions with the same Row 【发布时间】:2017-04-04 18:58:50 【问题描述】:我正在尝试使用如下代码将 Spark SQL Row 值插入数据库:
final Broadcast<String> jdbcUrl = sc.broadcast(config.jdbcUrl());
df.foreachPartition((final Iterator<Row> it) ->
final Sql2o sql2o = new Sql2o(jdbcUrl.value(), null, null, new NoQuirks());
try (final Connection conn = sql2o.beginTransaction())
final String sql = "INSERT INTO Table (Id, Value) VALUES (:id, :value)";
final Query query = conn.createQuery(sql, false);
int batchSize = 0;
while (it.hasNext())
final Row row = it.next();
query.addParameter("id", row.getLong(0))
.addParameter("value", row.get(1));
.addToBatch();
if (++batchSize == 1000)
query.executeBatch();
conn.commit();
batchSize = 0;
query.executeBatch();
conn.commit();
);
我收到主键违规错误:
java.sql.BatchUpdateException:违反主键约束 'PK_Table'。无法在对象“表”中插入重复键。这 重复键值为 42。
我添加了一些调试日志记录代码,并验证了两个不同的执行程序试图插入相同的行(具有相同的 id 和值)。
在 Spark SQL 开始插入 Row 值之前,该表为空。此外,我尝试在 DataFrame 上调用 foreachPartition()
之前调用 distinct()
和 persist()
,但我仍然遇到问题。
同一个DataFrame的不同分区不应该有不同的数据吗?分区器不总是保证吗?
编辑:
我在 DataFrame 上运行 df.groupBy(df.col("id")).count().filter(col("count").gt(1)).show();
并且没有分组到多个行的 id:
+--+-----+
|id|count|
+--+-----+
+--+-----+
据我所知,看起来同一个分区正在不同的执行程序中同时迭代。怎么会?
【问题讨论】:
如果这不是 SQL Server 问题,为什么还要标记 Microsoft SQL-Server? @pmbAustin,很公平。我删除了标签。 正如您的错误代码所说:42。这就是答案;)好的,现在真正的建议:distinct()
生成不同的行。这意味着,您可以有 2 个相同的 ID,但第二个值会不同。尝试做groupBy('id).count
@T.Gawęda,并将其与 df.count
进行比较?
否,将其过滤为大于 1 的值
【参考方案1】:
也许,列索引是错误的; “id”不在 0 列中。更好:
userow.getAs[Long]("id")
而不是 row.getLong(0)
【讨论】:
感谢您的回答。试过了,我得到了这个问题。 :- (以上是关于Spark SQL 具有相同行的不同分区的主要内容,如果未能解决你的问题,请参考以下文章