将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]

Posted

技术标签:

【中文标题】将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]【英文标题】:passing value of RDD to another RDD as variable - Spark #Pyspark [duplicate] 【发布时间】:2018-05-22 02:32:50 【问题描述】:

我目前正在探索如何通过 sqlContext 调用大的 hql 文件(包含 100 行 insert into select 语句)。

另外,hqls 文件是参数化的,所以在从 sqlContext 调用时,我也想传递参数。

浏览了大量博客和帖子,但没有找到任何答案。

我正在尝试的另一件事是将 rdd 的输出存储到变量中。

pyspark

max_date=sqlContext.sql("select max(rec_insert_date) from table")

现在想将 max_date 作为变量传递给下一个 rdd

incremetal_data=sqlConext.sql(s"select count(1) from table2 where rec_insert_date > $max_dat")

这不起作用,而且max_date 的值是=

u[row-('20018-05-19 00:00:00')]

现在还不清楚如何修剪这些多余的字符。

【问题讨论】:

您的max_date 是一个DataFrame,它是一个Dataset[Row],所以这显然不会像您希望的那样进行插值。您可以使用collect(),然后将其转换为正常值进行插入。 【参考方案1】:

sql Context 重新定义了一个 Dataset[Row]。你可以从那里获得你的价值

max_date=sqlContext.sql("select count(rec_insert_date) from table").first()[0]

在 Spark 2.0+ 中使用 spark Session 你可以

 max_date=spark.sql("select count(rec_insert_date) from table").rdd.first()[0]

从返回的数据帧中获取底层rdd

【讨论】:

【参考方案2】:

您不应该使用max(rec_insert_date) 而不是count(rec_insert_date) 吗?

将一个查询返回的值传递给另一个查询有两种选择:

    使用collect,它将触发计算并将返回值分配给变量

    max_date = sqlContext.sql("select max(rec_insert_date) from table").collect()[0][0] # max_date has actual date assigned to it incremetal_data = sqlConext.sql(s"select count(1) from table2 where rec_insert_date > ''".format(max_date))

    另一个(更好的)选择是使用 Dataframe API

    from pyspark.sql.functions import col, lit incremental_data = sqlContext.table("table2").filter(col("rec_insert_date") > lit(max_date))

    使用交叉连接 - 如果第一个查询的结果超过 1 个,则应避免使用交叉连接。优点是您不会破坏处理图,因此 Spark 可以优化所有内容。

    max_date_df = sqlContext.sql("select max(rec_insert_date) as max_date from table") # max_date_df is a dataframe with just one row incremental_data = sqlContext.table("table2").join(max_date_df).filter(col("rec_insert_date") > col("max_date"))

至于你第一个问题如何从 Spark 调用大的 hql 文件:

如果您使用的是 Spark 1.6,那么您需要创建一个 HiveContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-tables 如果您使用的是 Spark 2.x,那么在创建 SparkSession 时您需要启用 Hive 支持https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

您可以从将 im 插入 sqlContext.sql(...) 方法开始,根据我的经验,这通常是有效的,并且是将逻辑重写为 DataFrames/Datasets API 的一个很好的起点。在集群中运行它时可能会出现一些问题,因为您的查询将由 Spark 的 SQL 引擎 (Catalyst) 执行,不会传递给 Hive。

【讨论】:

谢谢您的友好回复,正如您正确指出的那样,它应该是 max 。我会尽力让你知道,我相信它会奏效的。 如果我想直接运行.hql文件,可以用下面的命令吗? sqlContext.sql(open("file.hql").read())

以上是关于将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]的主要内容,如果未能解决你的问题,请参考以下文章

Spark基础学习笔记21:RDD检查点与共享变量

RDD编程下(自学四)

如何将值列表传递到 json+sparksql 中的 rdd

在 Spark Streaming 中刷新 RDD

如何将一个 SwiftUI 视图作为变量传递给另一个视图结构

C# - 将变量传递给另一个表单