输入数据框的 Spark Udf 函数

Posted

技术标签:

【中文标题】输入数据框的 Spark Udf 函数【英文标题】:Spark Udf function with Dataframe in input 【发布时间】:2017-01-10 19:39:25 【问题描述】:

我必须使用 python 开发一个 Spark 脚本来检查一些日志并验证用户是否在两个事件之间更改了他的 IP 所在的国家/地区。我有一个 csv 文件,其中包含保存在 HDFS 上的 IP 范围和相关国家/地区,如下所示:

startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany

还有一个日志 csv 文件:

userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login

我使用 Spark Dataframe 加载这两个文件,并且我已经修改了包含带有滞后函数的日志的文件,添加了一个包含 previousIp 的列。我认为的解决方案是将 ip 和 previousIp 替换为关联的国家/地区,以便比较它们并使用 dataFrame.filter("previousIp" != "ip")。 我的问题是,有没有办法在 Spark 中做到这一点?比如:

dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)

为了有这样的数据框:

userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy

如果没有,我该如何解决我的问题?谢谢

【问题讨论】:

【参考方案1】:

如果你先将IP地址转换为数字,实际上很容易。您可以编写自己的 UDF 或使用来自 petrabarus 的代码并像这样注册函数:

spark.sql("CREATE TEMPORARY FUNCTION iptolong as 'net.petrabarus.hiveudfs.IPToLong'")

然后将国家/地区 csv 映射到带有数字的数据框:

>>> ipdb = spark.read.csv('ipdb.csv', header=True).select(
             expr('iptolong(startIp)').alias('ip_from'),
             expr('iptolong(endIp)').alias('ip_to'), 
             'country')
>>> ipdb.show()
+---------+---------+-------+
|  ip_from|    ip_to|country|
+---------+---------+-------+
|        0|167772160|  Italy|
|167772161|335544320|England|
|335544321|503316480|Germany|
+---------+---------+-------+

另外,将您的日志数据框映射到数字:

>>> log = spark.createDataFrame([('15.0.0.1',)], ['ip']) \
            .withColumn('ip', expr('iptolong(ip)'))
>>> log.show()
+---------+
|       ip|
+---------+
|251658241|
+---------+

然后您可以使用between 条件加入此数据框:

>>> log.join(broadcast(ipdb), log.ip.between(ipdb.ip_from, ipdb.ip_to)).show()
+---------+---------+---------+-------+
|       ip|  ip_from|    ip_to|country|
+---------+---------+---------+-------+
|251658241|167772161|335544320|England|
+---------+---------+---------+-------+

【讨论】:

以上是关于输入数据框的 Spark Udf 函数的主要内容,如果未能解决你的问题,请参考以下文章

使用 udf 选择数据框的列

Apache Spark - 注册 UDF - 返回数据帧

数据帧上的 spark GROUPED_MAP udf 是不是并行运行?

spark scala - UDF 用于创建新列

spark自定义UDF为啥参数最多21个

在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?