Apache Spark 选择所有行

Posted

技术标签:

【中文标题】Apache Spark 选择所有行【英文标题】:Apache Spark selects all rows 【发布时间】:2018-01-18 14:31:03 【问题描述】:

当我使用 JDBC 连接来提供 spark 时,即使我在数据帧上使用过滤;当我检查我的 oracle 数据源上的查询日志时,我看到 spark 正在执行:

SELECT [column_names] FROM MY_TABLE

参考https://***.com/a/40870714/1941560,

我期待 Spark 懒惰地计划查询并执行如下;

SELECT [column_names] FROM MY_TABLE WHERE [filter_predicate]

但 spark 没有这样做。之后它会获取所有数据和过滤器。我需要这种行为,因为我不想每 x 分钟检索一次所有表,而只想更改行(UPDATE_DATE 的增量过滤)。

有没有办法做到这一点?

这是我的python代码:

df = ...
lookup_seconds = 5 * 60;
now = datetime.datetime.now(pytz.timezone("some timezone"))
max_lookup_datetime = now - datetime.timedelta(seconds=lookup_seconds)
df.where(df.UPDATE_DATE > max_lookup_datetime).explain()

解释结果:

Physical Plan == *Filter (isnotnull(UPDATE_DATE#21) && (UPDATE_DATE#21 > 1516283483208806)) +- Scan ExistingRDD[NO#19,AMOUNT#20,UPDATE_DATE#21,CODE#22,AMOUNT_OLD#23] 

编辑:完整答案是here

【问题讨论】:

您能否展示一下如何创建df 和扩展执行计划(explain(True))。 这将有助于了解您如何准确过滤数据。您是否使用 JDBC 连接器创建数据框,然后进行过滤?如果是这样,则 JDBC 连接器中没有谓词下推,@Guitao 的答案是正确的。但是,如果您使用 dbtable 属性进行过滤,请添加代码 sn-p 以便我们提供帮助。 @LiorChaga Spark 支持使用 JDBC 源的谓词下推(我已经发布了与 OP 完全相同逻辑的计划,其中过滤器被下推)。不需要为琐碎的谓词手动过滤。仍然 - 没有minimal reproducible example,我们只能猜测发生了什么。 【参考方案1】:

这里最可能的情况是您cache 输入DataFrame。在这种情况下,Spark 不会尝试选择或投影下推,而是将数据提取到集群并在本地处理。

很容易说明这种行为:

df = spark.read.jdbc(url, table, properties=)
df
DataFrame[id: int, UPDATE_DATE: timestamp]

如果数据没有被缓存:

df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Physical Plan ==
*Scan JDBCRelation(df) [numPartitions=1] [UPDATE_DATE#1] PushedFilters: [*IsNotNull(UPDATE_DATE), *GreaterThan(UPDATE_DATE,2018-01-18 15:35:13.07596)], ReadSchema: struct<UPDATE_DATE:timestamp>

选择和投影都被向下推。但是,如果你cachedf,并再次检查执行计划:

df.cache()
DataFrame[id: int, UPDATE_DATE: timestamp]
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
   +- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]

== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
   +- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>

== Physical Plan ==
*Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryTableScan [UPDATE_DATE#1], [isnotnull(UPDATE_DATE#1), (UPDATE_DATE#1 > 1516289713075960)]
      +- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>

投影和选择都会延迟。

【讨论】:

我对此表示赞同,因为 api v2 必须解决 ***.com/questions/32573991/… 中提到的这个问题 顺便说一句,我没有使用缓存。 API V2 没有添加任何新内容,这将涵盖这种情况。重申一下 - 这应该在当前的 API 中工作,除非你做了一些直接阻止它的事情。缓存只是一种可能的选择,但再一次 - 如果没有看到完整的代码和执行计划,就无法确定答案。【参考方案2】:

来自官方文档1:

dbtable 应该读取的 JDBC 表。请注意,可以使用在 SQL 查询的 FROM 子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表。

您可以将 JDBC 选项 dbtable 设置为子查询 SQL。例如:

jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "(select * from tbl where UPDATE_DATE > max_lookup_datetime) t") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

【讨论】:

以上是关于Apache Spark 选择所有行的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark 进行 map-reduce 流选择 N 列,文件夹下所有 csv 文件的前 M 行?

Apache Spark 如何在内存中工作?

选择最大查询返回 Apache Hive 中表中的所有行

forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员

带有替代方法的重载方法值选择

对同一个 apache Spark RDD 的操作会导致所有语句重新执行