Spark/Hive - 将数据分组为“数据透视表”格式

Posted

技术标签:

【中文标题】Spark/Hive - 将数据分组为“数据透视表”格式【英文标题】:Spark/Hive - Group data into a "pivot-table" format 【发布时间】:2017-09-22 20:42:25 【问题描述】:

我有一组非常烦人的文件结构如下:

userId string,
eventType string,
source string,
errorCode string,
startDate timestamp,
endDate timestamp

每个文件的每个 eventId 可以包含任意数量的记录,具有不同的 eventTypes 和来源,以及每个文件的不同代码和开始/结束日期。

在 Hive 或 Spark 中是否有一种方法可以在 userId 上将所有这些组合在一起,有点像键值,其中值是与 userId 关联的所有字段的列表?具体来说,我希望它由 eventType 和 source 键入。基本上我想用表格长度换取宽度,有点像数据透视表。我的目标是最终以 Apache Parquet 或 Avro 文件格式存储,以便将来进行更快速的分析。

这是一个例子:

来源数据:

userId, eventType, source, errorCode, startDate, endDate
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'

目标:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'

字段名或顺序无所谓,只要我能区分就行。

我已经尝试了两种方法来让它工作:

    从表中手动选择每个组合并加入主数据集。这工作得很好,并行化也很好,但不允许关键字段有任意数量的值,并且需要预定义架构。 使用 Spark 创建键值字典,其中每个值都是字典。基本上遍历数据集,如果字典不存在,则向字典添加一个新键,如果它不存在,则为该条目向值字典添加一个新字段。这工作得很好,但是非常慢并且不能很好地并行化,如果可以的话。我也不确定这是否是 Avro/Parquet 兼容格式。

这两种方法有什么替代方法吗?甚至比我的目标更好的结构?

【问题讨论】:

【参考方案1】:

你想要这样的东西吗?

from pyspark.sql.functions import struct, col, create_map, collect_list

df = sc.parallelize([
    ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'],
    ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'],
    ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'],
    ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'],
    ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'],
    ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'],
    ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777']
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate'))
df.show()

new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\
    withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')]))

new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail'))
new_df.show()

【讨论】:

谢谢!这似乎可以工作!我在我的实时数据集上进行了尝试,就分组方式而言,它几乎返回了我想要的结果。不过,我不熟悉“地图列表”数据结构,并且在其操作的任何地方都找不到任何记录。我想一个后续问题是,我如何与这个数据结构交互?例如,如何获取特定用户的 CHARGE/MERCH 属性? 很高兴它有帮助!我认为这可能有助于开始处理后续问题:from itertools import chain; new_df.printSchema(); rdd1 = new_df.where(col('userId') == '552113').select('event_detail').rdd.flatMap(lambda x: chain(*(x))); keys = rdd1.map(lambda x: x.keys()).collect(); values = rdd1.map(lambda x: x.values()).collect(); keysvalues 是需要研究的问题。【参考方案2】:

你可以试试这个并给你的cmets,

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *

>>> spark = SparkSession.builder.getOrCreate()

>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')]

>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate'])
>>> df.show(10,False)
+------+---------+--------+---------+-----------------------+-----------------------+
|userId|eventType|source  |errorCode|startDate              |endDate                |
+------+---------+--------+---------+-----------------------+-----------------------+
|552113|ACK      |PROVIDER|0        |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|
|284723|ACK      |PROVIDER|0        |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|
|552113|TRADE    |MERCH   |0        |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|
|552113|CHARGE   |MERCH   |0        |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|
|284723|REFUND   |MERCH   |1        |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
|552113|CLOSE    |PROVIDER|0        |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|
|284723|CLOSE    |PROVIDER|0        |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|
+------+---------+--------+---------+-----------------------+-----------------------+

>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list'))

>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId

>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output

>>> def f(Vals):
        aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above
        if len(aggVals) == 5:
            return aggVals
        else:
            missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null
            for idx,val in missngval:
                aggVals.insert(idx,[None]*5)
            return aggVals

>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType())))
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values'))

>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array]

>>> oldnames = df4.columns
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch']

>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns
>>> finalDF.show()    
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider   |endDateAckProvider     |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch    |endDateTradeMerch      |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch   |endDateChargeMerch     |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider   |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch   |endDateRefundMerch     |
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
|284723|ACK                 |PROVIDER         |0                   |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null               |null            |null               |null                   |null                   |null                |null             |null                |null                   |null                   |CLOSE                 |PROVIDER           |0                     |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND              |MERCH            |1                   |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
|552113|ACK                 |PROVIDER         |0                   |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE              |MERCH           |0                  |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE              |MERCH            |0                   |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE                 |PROVIDER           |0                     |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null                |null             |null                |null                   |null                   |
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+

【讨论】:

以上是关于Spark/Hive - 将数据分组为“数据透视表”格式的主要内容,如果未能解决你的问题,请参考以下文章

Pandas 数据透视表和分组按月和小时

Excel 数据透视表:在分组数据透视表中添加单列

Pandas分组统计与时间序列

分组函数中带有汇总和案例语句的数据透视表

在Excel中进行分组和求和

数据透视表和分组依据