使用 json 模式更新 spark 数据框中的列

Posted

技术标签:

【中文标题】使用 json 模式更新 spark 数据框中的列【英文标题】:Updating column in spark dataframe with json schema 【发布时间】:2016-11-14 07:18:48 【问题描述】:

我有 json 文件,我正在尝试使用 SHA 256 对其中的一个字段进行哈希处理。这些文件位于 AWS S3 上。我目前在 Apache Zeppelin 上使用带有 python 的 spark。

这是我的 json 架构,我正在尝试散列“mac”字段;

 |-- Document: struct (nullable = true)
 |    |-- data: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- mac: string (nullable = true)

我已经尝试了几件事;

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib  

hcData = sqlc.read.option("inferSchema","true").json(inputPath)
hcData.registerTempTable("hcData")


name = 'Document'
udf = UserDefinedFunction(lambda x: hashlib.sha256(str(x).encode('utf-8')).hexdigest(), StringType())
new_df = hcData.select(*[udf(column).alias(name) if column == name else column for column in hcData.columns])

此代码运行良好。但是,当我尝试对 mac 字段进行哈希处理并更改名称变量时,什么也没有发生;

name = 'Document.data[0].mac'
name = 'mac'

我猜是因为找不到具有给定名称的列。

我尝试稍微修改一下代码;

def valueToCategory(value):
    return hashlib.sha256(str(value).encode('utf-8')).hexdigest()


udfValueToCategory = udf(valueToCategory, StringType())
df = hcData.withColumn("Document.data[0].mac",udfValueToCategory("Document.data.mac"))

此代码散列“Document.data.mac”并创建新列,其中包含散列的 mac 地址。我想更新现有的列。对于那些没有嵌套的变量可以更新,没有问题,但是对于嵌套的变量我找不到更新的方法。

所以基本上,我想用 spark python 散列嵌套 json 文件中的一个字段。谁能知道如何使用 schema 更新 spark 数据框?

【问题讨论】:

【参考方案1】:

下面是我的问题的 python 解决方案。

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import hashlib  
import re


def find(s, r):
    l = re.findall(r, s)
    if(len(l)!=0):
        return l
    else:
        lis = ["null"]
        return lis



def hash(s):
    return hashlib.sha256(str(s).encode('utf-8')).hexdigest()



def hashAll(s, r):
    st = s
    macs = re.findall(r, s)
    for mac in macs:
        st = st.replace(mac, hash(mac))
    return st


rdd = sc.textFile(inputPath)

regex = "([0-9A-Z]1,2[:-])5([0-9A-Z]1,2)"
hashed_rdd = rdd.map(lambda line: hashAll(line, regex))

hashed_rdd.saveAsTextFile(outputPath)

【讨论】:

【参考方案2】:

好吧,我已经找到了解决我的问题的方法 scala。可能有冗余代码,但它仍然有效。

import scala.util.matching.Regex
import java.security.MessageDigest

val inputPath = ""
val outputPath = ""

//finds mac addresses with given regex
def find(s: String, r: Regex): List[String] = 
    val l = r.findAllIn(s).toList
    if(!l.isEmpty) 
        return l
     else 
        val lis: List[String] = List("null")
        return lis
    


//hashes given string with sha256
def hash(s: String): String = 
    return MessageDigest.getInstance("SHA-256").digest(s.getBytes).map(0xFF & _).map  "%02x".format(_) .foldLeft("")_ + _


//hashes given line
def hashAll(s: String, r:Regex): String = 
    var st = s
    val macs = find(s, r)
    for (mac <- macs)
        st = st.replaceAll(mac, hash(mac))
    
    return st


//read data
val rdd = sc.textFile(inputPath)

//mac address regular expression
val regex = "(([0-9A-Z]1,2[:-])5([0-9A-Z]1,2))".r

//hash data
val hashed_rdd = rdd.map(line => hashAll(line, regex))

//write hashed data
hashed_rdd.saveAsTextFile(outputPath)

【讨论】:

以上是关于使用 json 模式更新 spark 数据框中的列的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

Spark基于其他数据框中的列对数据框中的列进行重复数据删除

将前导零添加到 Spark 数据框中的列 [重复]

Scala(Spark)连接数据框中的列[重复]

Spark - 使用 JSON 文件的许可模式将所有记录移动到损坏的列

spark中的isNullOrEmpty函数检查数据框中的列是不是为空或空字符串