尝试使用 JSON 结构将时间箱转换为 Pyspark 中的分钟和小时数组
Posted
技术标签:
【中文标题】尝试使用 JSON 结构将时间箱转换为 Pyspark 中的分钟和小时数组【英文标题】:Trying to convert time bins to mins and hours array in Pyspark with JSON structure 【发布时间】:2020-11-17 10:54:54 【问题描述】:我的数据格式
<table>
<tr>
<td> id </td>
<td> field_2 </td>
<td> field_3 </td>
<td> date </td>
<td> a_blob </td>
</tr>
<tr>
<td> 1 </td>
<td> some_data </td>
<td> some_data </td>
<td> 11/1/2020 </td>
<td> "name": "abc1", "usage_count": "bin102": 1, "bin103": 1, "bin104": 1, "bin105": 1, "bin110": 1, "bin112": 1, "bin120": 1, "bin121": 1, "bin122": 1, "bin123": 1, "bin124": 1, "bin136": 2, "bin137": 1, "bin138": 1, "bin139": 1, "bin140": 1, "bin141": 2, "bin142": 2, "usage_min": "bin102": 7.7, "bin103": 10, "bin104": 10, "bin105": 2.5, "bin110": 0.1, "bin112": 0.8, "bin120": 6.8, "bin121": 10, "bin122": 10, "bin123": 10, "bin124": 4.3, "bin136": 2.5, "bin137": 10, "bin138": 10, "bin139": 10, "bin140": 10, "bin141": 9.3, "bin142": 3.8, "p_name": "abc_1" </td>
</tr>
</table>
我想把它转换成下面的格式
<table>
<tr>
<td> id </td>
<td> field_2 </td>
<td> field_3 </td>
<td> date </td>
<td> mins_arr </td>
<td> cnt_arr </td>
</tr>
<tr>
<td> 1 </td>
<td> some_data </td>
<td> some_data </td>
<td> 11/1/2020 </td>
<td> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,24.9,50.0,9.9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0</td>
<td> 0,0,0,0,0,0,0,0,0,0,0,3,5,1,0,0,0,0,0,0,0,0,0,0</td>
</tr>
</table>
我正在使用以下代码进行此转换
def convert_to_dense_bins(u_count, u_minutes):
count = [0] * 144
minutes = [0.0] * 144
for key in u_count:
bin_number = key.split("bin")[1]
count[int(bin_number, 10)] = u_count[key]
bin_minutes = u_minutes[key]
minutes[int(bin_number, 10)] = bin_minutes
return count, minutes
def aggregate_to_hourly_bins(count_bin, minutes_bin):
count = []
minutes = []
for i in range(0, 144, 6):
hour_count = sum(count_bin[i:i + 6])
count.append(str(hour_count))
hour_minutes = sum(minutes_bin[i:i + 6])
minutes.append(str(hour_minutes))
return count, minutes
def transform(row):
e_data = json.loads(row[4])
p_name = e_data["p_name"]
name = e_data["name"]
count_bin, minutes_bin = convert_to_dense_bins(e_data["usage_count"],
e_data["usage_minutes"])
count_hourly, minutes_hourly = aggregate_to_hourly_bins(count_bin, minutes_bin)
return (row.id, name, row.feature_1, row.feature_2, p_name, row.date, ','.join(minutes_hourly),
','.join(count_hourly))
new_columns = ["id", "name", "feature_1", "feature_2", "p_name", "date", "mins_arr", "cnt_arr"]
df = df_old.rdd \
.filter(some_filter_function) \
.map(transform) \
.toDF(new_columns)
随着我的数据增长,此代码花费的时间太长。我正在寻找更有效的方法来在 PySpark 中进行这种转换。由于解析为字符串的数据中 JSON 结构的复杂性,我无法使用 Windows 函数等。任何帮助表示赞赏。
【问题讨论】:
感谢您向我指出问题@jxc。但是,我正在尝试为 Spark 2.3.1 版找到解决方案。上面的问题回答了 spark 版本 2.4+。你知道我可以在较低的 Spark 版本中使用什么等价物吗? 【参考方案1】:对于 Spark 2.3.1,使用pandas_udf,见下文:
步骤一:使用json_tuple函数检索usage_count
和usage_min
作为StringType字段:
from pyspark.sql import functions as F
import numpy as np
import pandas as pd
j1 = """"name": "abc1", "usage_count": "bin102": 1, "bin103": 1, "bin104": 1, "bin105": 1, "bin110": 1, "bin112": 1, "bin120": 1, "bin121": 1, "bin122": 1, "bin123": 1, "bin124": 1, "bin136": 2, "bin137": 1, "bin138": 1, "bin139": 1, "bin140": 1, "bin141": 2, "bin142": 2, "usage_min": "bin102": 7.7, "bin103": 10, "bin104": 10, "bin105": 2.5, "bin110": 0.1, "bin112": 0.8, "bin120": 6.8, "bin121": 10, "bin122": 10, "bin123": 10, "bin124": 4.3, "bin136": 2.5, "bin137": 10, "bin138": 10, "bin139": 10, "bin140": 10, "bin141": 9.3, "bin142": 3.8, "p_name": "abc_1""""
df = spark.createDataFrame([(j1,)],['e_data'])
cols = ["name", "p_name", "usage_count", "usage_min"]
df1 = df.select(F.json_tuple("e_data", *cols).alias(*cols))
df1.printSchema()
#root
# |-- name: string (nullable = true)
# |-- p_name: string (nullable = true)
# |-- usage_count: string (nullable = true)
# |-- usage_min: string (nullable = true)
注意:如果您使用 spark-xml 和以下行加载数据,那么上面的 e_data
列应该是名为 td
的列的第 5 个元素 (df['td'][4]
) (类型=array<string>
):
df = spark.read.format("com.databricks.spark.xml").options(rowTag="tr").load('/paths')
步骤 2: 设置 pandas_udf,我们使用 pd.Series.str.findall 将所有 bin 条目检索到具有两个元素(对应于 index
和 value
)的元组列表中,转换/将其映射到 np.array 中,然后用这些索引和值填充 144 个元素的一维数组。接下来使用np.array_split 将上述一维数组拆分为24 块并执行np.sum(axis=1)
,将结果作为pd.Series 返回,其值为浮点数列表。
def _pandas_bin_sum(s,N):
ret = []
for x in map(np.array, s.str.findall(r'"bin(\d+)":([\d.]+)')):
try:
z = np.zeros(144)
z[x[:,0].astype(np.int)] = x[:,1].astype(np.float)
ret.append([ float(e) for e in np.sum(np.array_split(z,N),axis=1) ])
except:
ret.append(None)
return pd.Series(ret)
pandas_bin_sum = F.pandas_udf(lambda x: _pandas_bin_sum(x,24), "array<float>")
第 3 步: 应用 pandas_udf
并使用 F.concat_ws()
转换两列:
df1.withColumn('usage_count', F.concat_ws(',', pandas_bin_sum('usage_count').astype("array<int>"))) \
.withColumn('usage_min', F.concat_ws(',', pandas_bin_sum('usage_min'))) \
.show(1,100,vertical=True)
-RECORD 0----------------------------------------------------------------------------------------------------------
name | abc1
p_name | abc_1
usage_count | 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,2,0,5,0,3,7
usage_min | 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.2,0.9,0.0,41.1,0.0,12.5,43.1
only showing top 1 row
【讨论】:
@Falconic,您介意对我的回答有任何问题进行反馈吗?以上是关于尝试使用 JSON 结构将时间箱转换为 Pyspark 中的分钟和小时数组的主要内容,如果未能解决你的问题,请参考以下文章