Apache Spark 选择所有行
Posted
技术标签:
【中文标题】Apache Spark 选择所有行【英文标题】:Apache Spark selects all rows 【发布时间】:2018-01-18 14:31:03 【问题描述】:当我使用 JDBC 连接来提供 spark 时,即使我在数据帧上使用过滤;当我检查我的 oracle 数据源上的查询日志时,我看到 spark 正在执行:
SELECT [column_names] FROM MY_TABLE
参考https://***.com/a/40870714/1941560,
我期待 Spark 懒惰地计划查询并执行如下;
SELECT [column_names] FROM MY_TABLE WHERE [filter_predicate]
但 spark 没有这样做。之后它会获取所有数据和过滤器。我需要这种行为,因为我不想每 x 分钟检索一次所有表,而只想更改行(UPDATE_DATE
的增量过滤)。
有没有办法做到这一点?
这是我的python代码:
df = ...
lookup_seconds = 5 * 60;
now = datetime.datetime.now(pytz.timezone("some timezone"))
max_lookup_datetime = now - datetime.timedelta(seconds=lookup_seconds)
df.where(df.UPDATE_DATE > max_lookup_datetime).explain()
解释结果:
Physical Plan == *Filter (isnotnull(UPDATE_DATE#21) && (UPDATE_DATE#21 > 1516283483208806)) +- Scan ExistingRDD[NO#19,AMOUNT#20,UPDATE_DATE#21,CODE#22,AMOUNT_OLD#23]
编辑:完整答案是here
【问题讨论】:
您能否展示一下如何创建df
和扩展执行计划(explain(True)
)。
这将有助于了解您如何准确过滤数据。您是否使用 JDBC 连接器创建数据框,然后进行过滤?如果是这样,则 JDBC 连接器中没有谓词下推,@Guitao 的答案是正确的。但是,如果您使用 dbtable 属性进行过滤,请添加代码 sn-p 以便我们提供帮助。
@LiorChaga Spark 支持使用 JDBC 源的谓词下推(我已经发布了与 OP 完全相同逻辑的计划,其中过滤器被下推)。不需要为琐碎的谓词手动过滤。仍然 - 没有minimal reproducible example,我们只能猜测发生了什么。
【参考方案1】:
这里最可能的情况是您cache
输入DataFrame
。在这种情况下,Spark 不会尝试选择或投影下推,而是将数据提取到集群并在本地处理。
很容易说明这种行为:
df = spark.read.jdbc(url, table, properties=)
df
DataFrame[id: int, UPDATE_DATE: timestamp]
如果数据没有被缓存:
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Physical Plan ==
*Scan JDBCRelation(df) [numPartitions=1] [UPDATE_DATE#1] PushedFilters: [*IsNotNull(UPDATE_DATE), *GreaterThan(UPDATE_DATE,2018-01-18 15:35:13.07596)], ReadSchema: struct<UPDATE_DATE:timestamp>
选择和投影都被向下推。但是,如果你cache
df
,并再次检查执行计划:
df.cache()
DataFrame[id: int, UPDATE_DATE: timestamp]
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>
== Physical Plan ==
*Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryTableScan [UPDATE_DATE#1], [isnotnull(UPDATE_DATE#1), (UPDATE_DATE#1 > 1516289713075960)]
+- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>
投影和选择都会延迟。
【讨论】:
我对此表示赞同,因为 api v2 必须解决 ***.com/questions/32573991/… 中提到的这个问题 顺便说一句,我没有使用缓存。 API V2 没有添加任何新内容,这将涵盖这种情况。重申一下 - 这应该在当前的 API 中工作,除非你做了一些直接阻止它的事情。缓存只是一种可能的选择,但再一次 - 如果没有看到完整的代码和执行计划,就无法确定答案。【参考方案2】:来自官方文档1:
dbtable 应该读取的 JDBC 表。请注意,可以使用在 SQL 查询的 FROM 子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表。
您可以将 JDBC 选项 dbtable 设置为子查询 SQL。例如:
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "(select * from tbl where UPDATE_DATE > max_lookup_datetime) t") \
.option("user", "username") \
.option("password", "password") \
.load()
【讨论】:
以上是关于Apache Spark 选择所有行的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark 进行 map-reduce 流选择 N 列,文件夹下所有 csv 文件的前 M 行?