无法将日志功能应用于 pyspark 数据帧

Posted

技术标签:

【中文标题】无法将日志功能应用于 pyspark 数据帧【英文标题】:Unable to apply log function to a pyspark dataframe 【发布时间】:2020-12-03 07:07:01 【问题描述】:

所以我有一个大型数据集(大约 1 TB+),我必须在其中执行许多操作,我曾考虑使用 pyspark 来加快处理速度。这是我的导入:

import numpy as np
import pandas as pd

try:
    import pyspark
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession, SQLContext
except ImportError as e:
    raise ImportError('PySpark is not Configured')
    
print(f"PySpark Version : pyspark.__version__")
    
# Creating a Spark-Context
sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))
# Spark Builder
spark = SparkSession.builder \
            .appName('MBLSRProcessor') \
            .config('spark.executor.memory', '10GB') \
            .getOrCreate()
# SQL Context - for SQL Query Executions
sqlContext = SQLContext(sc)

>> PySpark Version : 2.4.7

现在,我想在两列上应用log10 函数 - 对于演示,请考虑以下数据:

data = spark.createDataFrame(pd.DataFrame(
    "A" : [1, 2, 3, 4, 5],
    "B" : [4, 3, 6, 1, 8]
))

data.head(5)
>> [Row(A=1, B=4), Row(A=2, B=3), Row(A=3, B=6), Row(A=4, B=1), Row(A=5, B=8)]

这就是我需要的:log10(A + B)log10(6 + 4) = 1,我为此做了一个这样的函数:

def add(a, b):
    # this is for demonstration
    return np.sum([a, b])

data = data.withColumn("ADD", add(data.A, data.B))
data.head(5)

>> [Row(A=1, B=4, ADD=5), Row(A=2, B=3, ADD=5), Row(A=3, B=6, ADD=9), Row(A=4, B=1, ADD=5), Row(A=5, B=8, ADD=13)]

但是,我不能为np.log10做同样的事情:

def np_log(a, b):
    # actual function
    return np.log10(np.sum([a, b]))

data = data.withColumn("LOG", np_log(data.A, data.B))
data.head(5)

TypeError                                 Traceback (most recent call last)
<ipython-input-13-a5726b6c7dc2> in <module>
----> 1 data = data.withColumn("LOG", np_log(data.A, data.B))
      2 data.head(5)

<ipython-input-12-0e020707faae> in np_log(a, b)
      1 def np_log(a, b):
----> 2     return np.log10(np.sum([a, b]))

TypeError: loop of ufunc does not support argument 0 of type Column which has no callable log10 method

【问题讨论】:

【参考方案1】:

最好的方法是使用原生 Spark 函数:

import pyspark.sql.functions as F
import pandas as pd

data = spark.createDataFrame(pd.DataFrame(
    "A" : [1, 2, 3, 4, 5],
    "B" : [4, 3, 6, 1, 8]
))

data = data.withColumn("LOG", F.log10(F.col('A') + F.col('B')))

但如果你愿意,你也可以使用 UDF:

import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
import numpy as np
import pandas as pd

data = spark.createDataFrame(pd.DataFrame(
    "A" : [1, 2, 3, 4, 5],
    "B" : [4, 3, 6, 1, 8]
))

def udf_np_log(a, b):
    # actual function
    return float(np.log10(np.sum([a, b])))

np_log = F.udf(udf_np_log, FloatType())

data = data.withColumn("LOG", np_log(data.A, data.B))

+---+---+---------+
|  A|  B|      LOG|
+---+---+---------+
|  1|  4|  0.69897|
|  2|  3|  0.69897|
|  3|  6|0.9542425|
|  4|  1|  0.69897|
|  5|  8|1.1139433|
+---+---+---------+

有趣的是,它适用于没有 UDF 的 np.sum,因为我猜 np.sum 只是调用 + 运算符,这对 spark 数据框列有效。

【讨论】:

以上是关于无法将日志功能应用于 pyspark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

python 将yaml文件中定义的过滤器应用于PySpark数据帧

Pyspark:UDF 将正则表达式应用于数据帧中的每一行

如何按行将函数应用于 PySpark 数据帧的一组列?

Pyspark:在数据帧的不同组上应用 kmeans

无法使用 pyspark 将 Xml 数据读取到数据帧

无法在 pyspark 中应用标量 pandas udf