Pyspark 最近使用的一些有趣姿势的梳理

Posted piperck

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark 最近使用的一些有趣姿势的梳理相关的知识,希望对你有一定的参考价值。

之前对 SQL 还是不是非常熟悉的,但是现在或多或少还是会写一些计算任务。比如最近在推送将所有天级的耗时任务都从传统关系型数据库迁移至 Spark 集群当中进行计算,中间遇到一些有趣的小问题在这里记录一下。

 

Q: 我想按照某个字段分组并且把一组查询字段连起来得到一个 json 然后把结果作为一个字段应该怎么弄?

A: 这里我的思路是将我们需要 dumps 的字段给拼接起来,然后使用列表将同一个分组里面的是数据组合起来。然后过一个 udf 把列表中的记录处理成数组最后 json.dumps 一下即可。来看个栗子

# 先查询出要操作的目标信息 这一步可以和下面的操作合并,我这里为了方便看拆开了
df = ss.sql("""
                        SELECT 
                            t1.pay_id, 
                            t1.pay_money, 
                            t1.user_id
                        FROM
                            analytics_db.hd_day_order_record t1 
                    """)

# 拼接目标字符串并且组合
df = df.select(
               df.pay_id,
               df.pay_money,
               df.pay_user_id,
               f.concat_ws(\001, df.pay_id,  df.pay_user_id, df.pay_money).alias(sku_buys))
)

# 拼接一个重复 user_id 的 list
df = df.groupBy(df.pay_user_id).agg(f.collect_list(sku_buys).alias(sku_buys))

# 将 sku_buys 丢给一个 jsondump 的 udf 就可以得到结果了
df = df.select(df.pay_user_id, sb_json(df.sku_buys).alias(‘sku_buys‘))

 

Q: 如果我想对目标进行分组,并且让他在组内有序应该怎么做?

A: 这通常被称为进行组内排序。其实我之前一直尝试用类似的语法来达到这种效果

df = ss.sql("""
        SELECT
            first(t1.sku_mode) AS sku_mode,
            first(t1.exchange_type_t01) AS exchange_type_t01,
            first(t1.user_id) AS user_id,
            first(t1.pay_id) AS pay_id,
            first(t1.charge_time) AS charge_time,
            first(t2.has_yxs_payment) AS has_yxs_payment,
            first(t2.has_sxy_payment) AS has_sxy_payment,
            first(t2.has_cxy_payment) AS has_cxy_payment,
            first(t2.has_sxy19_payment) AS has_sxy19_payment,
            first(t2.sxy19_join_time) AS sxy19_join_time,
            first(t2.yxs_join_time) AS yxs_join_time
        FROM
            d_exchange_info t1
        JOIN
            analytics_db.md_day_dump_users t2
        ON
            t2.the_day = {}
            AND t1.user_id = t2.user_id
        GROUP BY
            t1.user_id
        ORDER BY
            charge_time
""".format(st))

其实这是错的,这里的 order by 并不能达到一个组内排序的效果,而是一个外部排序。所以这里取 first 是一个不稳定的结果。有时候你拿到的是一个结果,也许有时候你拿到的是另外一个结果。要进行组内排序,我们可以先用这样的思路,先对需要 order by 字段的表进行组内排序,然后再让他与其他表 join 获得更多的信息,这样就能保证是有序的。

这里我引用一个窗口函数来达到这样的效果。

        _df = ss.sql("""
                        SELECT 
                            t1.user_id,
                            t1.pay_id,
                            t1.sku_mode,
                            t1.charge_time,
                            t1.exchange_type_t01,
                            ROW_NUMBER() OVER(PARTITION BY t1.user_id ORDER BY t1.charge_time)
                        FROM 
                            {} t1 
                        WHERE 
                            t1.refund_state = 0
                    """.format(exchange_info_table))

我先使用窗口函数 ROW_NUMBER 以 user_id 分组并且根据 charge_time 对表一进行组内排序。得到结果之后,再与另外一张表 join 得到补充信息就能达到想要的效果。

 

Q: 我想对结果进行转列应该怎么做?

A: 行转列 列转行可能是 SQL 计算里面会经常使用到的方法,但是对于 SQL 并不熟悉的同学(比如我)就不知道该怎么整来看个例子

df = ss.sql("""
    SELECT
        user_id,
        sku_mode,
        credit_score
    FROM
        analytics_db.hd_day_user_credit
    WHERE
        gain_time >= {}
        AND gain_time < {}
        AND the_day = {}
""".format(start_time, end_time, st))
# df.show(10)

展示的数据类似于

+--------------------+--------+------------+
|             user_id|sku_mode|credit_score|
+--------------------+--------+------------+
|d394899919216bc10...|     yxs|           3|
|625002ad625bc9a69...|     yxs|           3|
|8dd11e29bf50cb8c8...|     cxy|           3|
|0f0b88ff589cb46cd...|     yxs|           3|
|eeb8e839139876971...|     yxs|           1|
|f63b2b9c8340d3c80...|     cxy|           1|
|806c9f0349e7e8389...|     cxy|           1|
|bf312181eaaa0ec9e...|     yxs|           1|
|ee4a7984dc40cabbf...|     yxs|           3|
|7a6b15f16c1f782de...|   sxy19|           3|
+--------------------+--------+------------+
only showing top 10 rows

我们可以基于此将 sku_mode 一样的类型合并进行行转列变换

df = df.groupby(user_id).pivot(
    sku_mode, [yxs, cxy, sxy, sxy19]
).agg(
    f.sum(credit_score)
).fillna(0)

这句话的意思是根据 user_id 进行分组,并且将 sku_mode 的行转列,需要转列的类型需要在后面的 list 中添加,并且列里记录 各sku_mode credit_score 汇总的量。

+--------------------+---+---+---+-----+
|             user_id|yxs|cxy|sxy|sxy19|
+--------------------+---+---+---+-----+
|5ec336994e7b5d73f...|  0|  0|  0|    2|
|06b1120a4544b1b8a...|  0|  0|  0|    2|
|6fe19e193ad43bafc...|  0|  0|  0|    3|
|3e5f9fc4550ae7cba...|  1|  0|  0|    0|
|b1d1d856e28908f5a...|  1|  0|  0|    3|
|7a065e02ed1693cf4...|  2|  0|  0|    0|
|651f9f0b11de08003...|  0|  0|  0|    3|
|d02491502946aba2f...|  0|  0|  0|    2|
|e24b58cb87762b2da...|  0|  6|  0|   15|
|925f6a832b1a95c45...|  1|  0|  0|    0|
+--------------------+---+---+---+-----+
only showing top 10 rows

 

以上是关于Pyspark 最近使用的一些有趣姿势的梳理的主要内容,如果未能解决你的问题,请参考以下文章

设计安全的账号系统的正确姿势

设计安全的账号系统的正确姿势

设计安全的账号系统的正确姿势转

学习vue.js的正确姿势(转载)

NSnotificationCenter 正确使用姿势, removeObject 探索

ConfigurationManager姿势快闪