Spark/Hive - 将数据分组为“数据透视表”格式
Posted
技术标签:
【中文标题】Spark/Hive - 将数据分组为“数据透视表”格式【英文标题】:Spark/Hive - Group data into a "pivot-table" format 【发布时间】:2017-09-22 20:42:25 【问题描述】:我有一组非常烦人的文件结构如下:
userId string,
eventType string,
source string,
errorCode string,
startDate timestamp,
endDate timestamp
每个文件的每个 eventId 可以包含任意数量的记录,具有不同的 eventTypes 和来源,以及每个文件的不同代码和开始/结束日期。
在 Hive 或 Spark 中是否有一种方法可以在 userId 上将所有这些组合在一起,有点像键值,其中值是与 userId 关联的所有字段的列表?具体来说,我希望它由 eventType 和 source 键入。基本上我想用表格长度换取宽度,有点像数据透视表。我的目标是最终以 Apache Parquet 或 Avro 文件格式存储,以便将来进行更快速的分析。
这是一个例子:
来源数据:
userId, eventType, source, errorCode, startDate, endDate
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'
目标:
userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'
字段名或顺序无所谓,只要我能区分就行。
我已经尝试了两种方法来让它工作:
-
从表中手动选择每个组合并加入主数据集。这工作得很好,并行化也很好,但不允许关键字段有任意数量的值,并且需要预定义架构。
使用 Spark 创建键值字典,其中每个值都是字典。基本上遍历数据集,如果字典不存在,则向字典添加一个新键,如果它不存在,则为该条目向值字典添加一个新字段。这工作得很好,但是非常慢并且不能很好地并行化,如果可以的话。我也不确定这是否是 Avro/Parquet 兼容格式。
这两种方法有什么替代方法吗?甚至比我的目标更好的结构?
【问题讨论】:
【参考方案1】:你想要这样的东西吗?
from pyspark.sql.functions import struct, col, create_map, collect_list
df = sc.parallelize([
['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'],
['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'],
['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'],
['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'],
['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'],
['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'],
['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777']
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate'))
df.show()
new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\
withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')]))
new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail'))
new_df.show()
【讨论】:
谢谢!这似乎可以工作!我在我的实时数据集上进行了尝试,就分组方式而言,它几乎返回了我想要的结果。不过,我不熟悉“地图列表”数据结构,并且在其操作的任何地方都找不到任何记录。我想一个后续问题是,我如何与这个数据结构交互?例如,如何获取特定用户的 CHARGE/MERCH 属性? 很高兴它有帮助!我认为这可能有助于开始处理后续问题:from itertools import chain; new_df.printSchema(); rdd1 = new_df.where(col('userId') == '552113').select('event_detail').rdd.flatMap(lambda x: chain(*(x))); keys = rdd1.map(lambda x: x.keys()).collect(); values = rdd1.map(lambda x: x.values()).collect();
keys
和 values
是需要研究的问题。【参考方案2】:
你可以试试这个并给你的cmets,
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *
>>> spark = SparkSession.builder.getOrCreate()
>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')]
>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate'])
>>> df.show(10,False)
+------+---------+--------+---------+-----------------------+-----------------------+
|userId|eventType|source |errorCode|startDate |endDate |
+------+---------+--------+---------+-----------------------+-----------------------+
|552113|ACK |PROVIDER|0 |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|
|284723|ACK |PROVIDER|0 |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|
|552113|TRADE |MERCH |0 |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|
|552113|CHARGE |MERCH |0 |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|
|284723|REFUND |MERCH |1 |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
|552113|CLOSE |PROVIDER|0 |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|
|284723|CLOSE |PROVIDER|0 |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|
+------+---------+--------+---------+-----------------------+-----------------------+
>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list'))
>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId
>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output
>>> def f(Vals):
aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above
if len(aggVals) == 5:
return aggVals
else:
missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null
for idx,val in missngval:
aggVals.insert(idx,[None]*5)
return aggVals
>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType())))
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values'))
>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array]
>>> oldnames = df4.columns
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch']
>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns
>>> finalDF.show()
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider |endDateAckProvider |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch |endDateTradeMerch |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch |endDateChargeMerch |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch |endDateRefundMerch |
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
|284723|ACK |PROVIDER |0 |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null |null |null |null |null |null |null |null |null |null |CLOSE |PROVIDER |0 |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND |MERCH |1 |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
|552113|ACK |PROVIDER |0 |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE |MERCH |0 |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE |MERCH |0 |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE |PROVIDER |0 |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null |null |null |null |null |
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
【讨论】:
以上是关于Spark/Hive - 将数据分组为“数据透视表”格式的主要内容,如果未能解决你的问题,请参考以下文章