组合并计算火花数据框中的 json 列

Posted

技术标签:

【中文标题】组合并计算火花数据框中的 json 列【英文标题】:combine and count a json column in spark dataframe 【发布时间】:2020-07-24 23:28:28 【问题描述】:

我想在 spark 数据框和 hive 表中聚合列值 (json)。

例如

 year,   month,    val (json)
 2010    01        ["a_id":"caes","a_id":"rgvtsa","a_id":"btbsdv"]
 2010    01        ["a_id":"caes","a_id":"uktf","a_id":"ohcwa"]
 2008    10        ["a_id":"rfve","a_id":"yjndf","a_id":"onbds"]
 2008    10        ["a_id":"fvds","a_id":"yjndf","a_id":"yesva"]

我需要:

 year,   month,    val (json),                                                          num (int)
 2010    01        ["a_id":"caes","a_id":"rgvtsa","a_id":"btbsdv,"a_id":"uktf", "a_id":"ohcwa"]     5

 2008    10        ["a_id":"rfve","a_id":"yjndf","a_id":"onbds","a_id":"yesva"]      4

我需要删除重复项并在其中找到 json 字符串的大小(“a_id”的数量)。

数据保存为 hive 表,因此最好通过 pyspark sql 处理它?

如果将其保存为 spark 数据框,我也想知道如何处理它。

我试过了:

 from pyspark.sql.functions import from_json, col
 from pyspark.sql.types import StructType, StructField, StringType

 schema = StructType(
  [
    StructField('a_id', StringType(), True)
  ]
 )

 df.withColumn("val", from_json("val", schema))\
.select(col('year'), col('month'), col('val.*'))\
.show()

但是,“val1”中的所有值都是空的。

谢谢

UPDTAE 我的蜂巢版本:

%sh
 ls /databricks/hive | grep "hive"
 spark--maven-trees--spark_1.4_hive_0.13

我的 DDL:

import pyspark.sql.functions as F
import pyspark.sql.types as T 
from pyspark.sql.types import *

def concate_elements(val):
   return reduce (lambda x, y:x+y, val)

flatten_array = F.udf(concate_elements, T.ArrayType(T.StringType()))

remove_duplicates = udf(lambda row: list(set(row)), 
ArrayType(StringType()))

#final results
df.select("year","month", flatten_array("val").alias("flattenvalues")).withColumn("uniquevalues", remove_duplicates("flattenvalues")).withColumn("size",F.size("uniquevalues")).show()

【问题讨论】:

我从一串 json 文件中读取它并更新了我的答案,如果您想在 val 列中有一个特定的 JSON,请更新您的问题以读取数据(来自 hive 或其他一些来源),因为我已尽力使用 JSON/非 JSON 列创建 DF,但短时间内无法创建,如果提供的答案无法解决您的场景,那么将寻找更多选项来解决您的问题。 为您的方案更新了答案 -> 从 spark 中的 hive 读取数据 不要频繁更改您的问题,而是提出一个新问题?? 【参考方案1】:

考虑输入数据输入Json文件json-input.json

"year":"2010","month":"01","value":["a_id":"caes","a_id":"uktf","a_id":"ohcwa"]
"year":"2011","month":"01","value":["a_id":"caes","a_id":"uktf","a_id":"uktf","a_id":"sathya"]

方法一、从hive中读取数据

1.将数据插入配置单元

ADD JAR /home/sathya/Downloads/json-serde-1.3.7-jar-with-dependencies.jar
CREATE EXTERNAL TABLE json_table (
year string,
month string,
value array<struct<a_id:string>>)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe';
load data local inpath '/home/sathya/json-input.json' into table json_table;

select * from json_table;
OK
2010    01  ["a_id":"caes","a_id":"uktf","a_id":"ohcwa"]
2011    01  ["a_id":"caes","a_id":"uktf","a_id":"uktf","a_id":"sathya"]

2。从 spark 读取数据:

pyspark --jars /home/sathya/Downloads/json-serde-1.3.7-jar-with-dependencies.jar --driver-class-path  /home/sathya/Downloads/json-serde-1.3.7-jar-with-dependencies.jar

df=spark.sql("select * from default.json_table")
df.show(truncate=False)
'''
+----+-----+----------------------------------+
|year|month|value                             |
+----+-----+----------------------------------+
|2010|01   |[[caes], [uktf], [ohcwa]]         |
|2011|01   |[[caes], [uktf], [uktf], [sathya]]|
+----+-----+----------------------------------+
'''

#UDFs for concatenating the array elements & removing duplicates in an array

def concate_elements(val):
    return reduce (lambda x, y:x+y, val)

flatten_array = F.udf(concate_elements, T.ArrayType(T.StringType()))

remove_duplicates = udf(lambda row: list(set(row)), ArrayType(StringType()))

#final results
df.select("year","month",flattenUdf("value").alias("flattenvalues")).withColumn("uniquevalues", remove_duplicates("flattenvalues")).withColumn("size",size("uniquevalues")).show()

'''
+----+-----+--------------------------+--------------------+----+
|year|month|flattenvalues             |uniquevalues        |size|
+----+-----+--------------------------+--------------------+----+
|2010|01   |[caes, uktf, ohcwa]       |[caes, uktf, ohcwa] |3   |
|2011|01   |[caes, uktf, uktf, sathya]|[caes, sathya, uktf]|3   |
+----+-----+--------------------------+--------------------+----+

'''

方法2 - 直接从输入Json文件json-input.json读取

"year":"2010","month":"01","value":["a_id":"caes","a_id":"uktf","a_id":"ohcwa"]
"year":"2011","month":"01","value":["a_id":"caes","a_id":"uktf","a_id":"uktf","a_id":"sathya"]

您的方案的代码是:

import os
import logging 
from pyspark.sql import SQLContext,SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
import pyspark.sql.types as T

df=spark.read.json("file:///home/sathya/json-input.json")

df.show(truncate=False)
'''
+-----+----------------------------------+----+
|month|value                             |year|
+-----+----------------------------------+----+
|01   |[[caes], [uktf], [ohcwa]]         |2010|
|01   |[[caes], [uktf], [uktf], [sathya]]|2011|
+-----+----------------------------------+----+
'''

#UDFs for concatenating the array elements & removing duplicates in an array

def concate_elements(val):
    return reduce (lambda x, y:x+y, val)
 
flatten_array = F.udf(concate_elements, T.ArrayType(T.StringType()))

remove_duplicates = udf(lambda row: list(set(row)), ArrayType(StringType()))

#final results
df.select("year","month",flattenUdf("value").alias("flattenvalues")).withColumn("uniquevalues", remove_duplicates("flattenvalues")).withColumn("size",size("uniquevalues")).show()

'''
+----+-----+--------------------------+--------------------+----+
|year|month|flattenvalues             |uniquevalues        |size|
+----+-----+--------------------------+--------------------+----+
|2010|01   |[caes, uktf, ohcwa]       |[caes, uktf, ohcwa] |3   |
|2011|01   |[caes, uktf, uktf, sathya]|[caes, sathya, uktf]|3   |
+----+-----+--------------------------+--------------------+----+

'''

【讨论】:

我已经尝试过您的“从 spark 读取数据”,我收到错误:org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6484.0 failed 4 次,最近一次失败:在阶段 6484.0 中丢失任务 0.3(TID 1089929、10.71.184.24、执行程序 107):ExecutorLostFailure(执行程序 107 因正在运行的任务之一而退出)原因:远程 RPC 客户端已解除关联。可能是由于容器超过阈值或网络问题。检查驱动程序日志以获取 WARN 消息。我在数据块上运行它。 你能分享我你的hive ddl吗?你使用的是什么版本,它在我的本地对我有用 请在 OP 中查看我的更新。请告诉我什么是“serre”?【参考方案2】:

这是一个适用于 Databricks 的解决方案:

#Import libraries

from pyspark.sql.functions import *
from pyspark.sql.types import *

#Define schema

schema1=StructType([
  StructField('year',IntegerType(),True),
  StructField('month',StringType(),True),
  StructField('val',ArrayType(StructType([
    StructField('a_id',StringType(),True)
  ])))
])

#Test data

rowsArr=[
  [2010,'01',["a_id":"caes","a_id":"rgvtsa","a_id":"btbsdv"]],
  [2010,'01',["a_id":"caes","a_id":"uktf","a_id":"ohcwa"]],
  [2008,'10',["a_id":"rfve","a_id":"yjndf","a_id":"onbds"]],
  [2008,'10',["a_id":"fvds","a_id":"yjndf","a_id":"yesva"]]
]

#Create dataframe

df1=(spark
     .createDataFrame(rowsArr,schema=schema1)
    )

#Create database

spark.sql('CREATE DATABASE IF NOT EXISTS testdb')

#Dump it into hive table

(df1
 .write
 .mode('overwrite')
 .options(schema=schema1)
 .saveAsTable('testdb.testtable')
)

#read from hive table

df_ht=(spark
       .sql('select * from testdb.testtable')
      )

#Perform transformation

df2=(df_ht
     .groupBy('year','month')
     .agg(array_distinct(flatten(collect_list('val'))).alias('val'))
     .withColumn('num',size('val'))
    )

输入 DF:

输出DF:

【讨论】:

以上是关于组合并计算火花数据框中的 json 列的主要内容,如果未能解决你的问题,请参考以下文章

遍历火花数据框中的列并计算最小值最大值

计算火花数据框中所有列(300 列)的每个不同值的出现次数

计算火花数据框中的字数

聚合火花数据框中的多列(所有组合)

如何在火花中合并或连接具有不相等列号的数据框

将行值转换为火花数据框中的列数组