Spark 2.4 上带有字典的 UDF

Posted

技术标签:

【中文标题】Spark 2.4 上带有字典的 UDF【英文标题】:UDFs with Dictionaries on Spark 2.4 【发布时间】:2020-01-23 11:03:58 【问题描述】:

我使用的是 Pyspark 2.4.4,我需要使用 UDF 来创建我想要的输出。此 UDF 使用广播字典。首先,看起来我需要修改 UDF 的代码以接受字典。其次,我不确定我正在做的事情是在 Spark 2.4 中最有效的方式。我的代码如下:

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()

#Broadcast the dictionary
pcDict = sc.broadcast(Dict)

## Function that calls the dictionary
def example(n):
    nodes = []
    children = [i[0] for i in pcD.value[n]]
    for child in children:
                    nodes.append(child)

    return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])

## Convert the Python function to UDF
schema = StructType([
    StructField("Out1", ArrayType(IntegerType()), False),
    StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])

example_udf = F.udf(example, schema)

# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])

### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")

我的第一个问题是关于字典的。我应该使用它还是有其他更有效的方法?我在考虑 collectAsMap(),但鉴于它可用于 rdds,我不确定这是否是 Spark 2.4 中的方式。

第二个问题是,鉴于字典是要走的路,我应该如何修改 udf 函数?

提前致谢!

【问题讨论】:

如果你的字典内存很小并且适合你的执行器内存,那么它很好。 如果不是,您认为最好的选择是什么? 广播是确保静态数据(即只读数据)只发送给执行者一次的正确方法。如果您的 udf 没有修改它,那么它是正确的。 ***.com/questions/38056774/spark-cache-vs-broadcast 再次嗨,我不确定我是否理解这里的一部分。 pcDict 是一个带有键 preacher 的字典,尽管您使用 udf 中的用户 ID 来访问字典。哪一个将成为字典的键?如果 n 与字典键相同,您可以将 UDF 替换为连接以获得更好的性能 对于第二个问题,最好从头开始解释问题,以便通过描述初始数据集和所需输出来提供更好的概述。我的第一个猜测是,应该可以通过避免使用字典来扁平化你的数据,也就是说,你可以只使用 df 和 testDf 之间的连接,而不是按传教士分组。通过这种方式,您可以获得更容易使用的扁平结构 【参考方案1】:

关于第一个问题,我认为 pandas 提供了一种将数据转换为字典的优雅方式。尽管由于 pandas 将在一个节点中执行,您可能需要利用集群的强大功能,因此决定选择 Spark 版本。另一个因素是字典本身的大小。如果您确定字典可以轻松放入一个节点,则可以安全地保留 pandas 版本,否则请尝试下一个 Spark 代码:

from pyspark.sql import functions as F

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

df = df.rdd.map(lambda r: (r[1], (r[0], r[2]))) \
      .toDF(["preacher", "tuple"]) \
      .groupBy("preacher") \
      .agg(F.collect_list("tuple").alias("tuple"))

dict = 
for k,v in df.rdd.collectAsMap().items():
  dict[k] = list(map(lambda row: (row[0], row[1]), v))

dict
# 3: [(2382556, '2012-11-23 22:03:42'), (7854140, '2012-11-28 22:03:42')],
#  2: [(220, '2012-11-22 22:03:42')]

另外值得一提的是,Spark 将与每个任务一起打包并发送程序中使用的所有局部变量。因此broadcast 适用于应该存储在执行器上的大型变量,以便任何任务都可以轻松访问。

【讨论】:

以上是关于Spark 2.4 上带有字典的 UDF的主要内容,如果未能解决你的问题,请参考以下文章

带有 PySpark 2.4 的 Pandas UDF [重复]

PySpark UDF 优化挑战使用带有正则表达式的字典(Scala?)

如何将 Azure 流分析中的“类字典”结构转换为带有 javascript UDF 的多维数组?

databricks udf 广播字典值返回字典列表;无法访问该列表中字典的值

Pyspark UDF 无法使用大字典

Legacy UDF - 确定字典/地图中的最大值