ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)

Posted 二两窝子面

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)相关的知识,希望对你有一定的参考价值。

        尝试在ODPS上用Spark,由于java和scala都需要装SDK和配置POM来编写ODPS Spark脚本(其实就是自己被各版本号弄懵了没装明白),而Dataworks中可以直接在线编写Python资源并提交使用,因此测试了Pyspark。

        数据集操作逻辑:跟最早时候在ODPS上测试使用MR一样,习惯性地做分组排序并自关联的测试——将数据集做分组排序并打上row_number,再对rn加个偏移量进行自关联,最后写入目标表。

        中文乱码问题:由于工作原因,Dataworks中的很多模块比较落后,因此被编码问题困扰了很久。具体表现就是中文插入目标表后是乱码的情况。

        经测试,reload(sys)\\sys.setdefaultencoding('utf8')是没用的。。。官方说的添加配置项也是没用的。。。最后想着先无脑decode一下,发现竟然解决了问题,说明ODPS Spark使用的是unicode?最后结论就是注意使用decode,示例如下:

# -*- coding: utf-8 -*-
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import random

reload(sys)
sys.setdefaultencoding('utf8')

if __name__ == "__main__":
    spark = SparkSession.builder\\
        .appName("spark sql")\\
        .config("spark.sql.broadcastTimeout", 20 * 60)\\
        .config("spark.sql.crossJoin.enabled", True)\\
        .config("odps.exec.dynamic.partition.mode", "nonstrict")\\
        .config("spark.sql.catalogImplementation", "odps")\\
        .getOrCreate()

#创建\\导入
    #创建一个可迭代对象
    data = [i for i in range(0, 100)]
    #调用sparkContext将dara转化为一个RDD,并通过map函数生成两个字段
    rdd0 = spark.sparkContext.parallelize(data, 2).map(lambda s: (random.choice(["杭州","宁波","温州"]).decode("utf-8"), s))
    #通过toDF转换成结构化DataFrame
    df0 = rdd0.toDF("city: string, num: int")
    #或者直接通过结构化接口创建DataFrame
    # df0 = spark.createDataFrame()
    #或者读取已有结构化数据
    #sourcetable="project.input_source_table"
    #df0 = spark.sql("select city,num from %s" % sourcetable)

#计算
    #分组排序打序号-结构化接口DF操作方式(用开窗函数),也可以直接跑SQL会更简单一些,效率上没有差别
    window = Window.partitionBy('city').orderBy(df0.num) #传入df0.num.desc()则为倒序
    df1 = df0.withColumn('rn', F.row_number().over(window))

    #分组排序打序号-低级接口RDD操作方式
    #注意:这里groupByKey没有传入分区器,shuffle应该没有排序,原生sorted操作使内存消耗变大,所以可能比结构化接口低效
    # rdd1 = rdd0.groupByKey().flatMap(lambda row: list(map(lambda x: (row[0],x[1],x[0]),enumerate(sorted(row[1]),1))))
    # # groupByKey返回的是RDD[K,Iterator[V]],不熟的话会有点小绕,可以先用Python原生map梳理下转化操作的逻辑:
    # # data=[('a',[1,5,4,6,3,2]),('b',[11,15,14,16,13,12])]
    # # ans=map(lambda row: list(map(lambda x: (row[0],x[0],x[1]),enumerate(sorted(row[1]),1))),data)
    # # 没有原生flatMap可以使用reduce,比如:reduce(lambda x,y:x+y,ans)
    # df1 = rdd1.toDF("city: string, num: int ,rn: int")

    #对某列进行加工
    df = df1.withColumn("city",F.decode(F.format_string("%s市",df1.city),"utf-8"))
    # #数据过滤
    # df = df.filter(df.city=="杭州市")

#关联
    df_right = df.withColumn("rn",df.rn+1)
    df = df.join(df_right,["city","rn"],'left')
    df.show(10)
    df = df.toDF("city","num","rn","num2")
    #RDD的关联接口不算复杂,注意需要通过map重新将需关联的字段组成key,在此不做示例


#写出
    #创建目标表
    tablename="project.output_target_table"
    spark.sql("drop table if exists %s" %(tablename))
    spark.sql("create table % s (city string COMMENT '地市',num bigint comment '号',rn bigint comment '序',num2 bigint comment '号2')"%(tablename))
    # df.show(10)
    #写入目标表
    df.write.mode('overwrite').option("encoding","utf-8").insertInto(tablename)
    #临时视图方式写入目标表
    #tmpview=tablename+"_tmp"
    # df.createOrReplaceTempView(tmpview)
    # spark.sql("insert overwrite table %s from %s" %(tablename,tmpview))

以上是关于ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:排序/排序,然后分组和连接字符串

Pyspark - 使用 count() 分组数据并可以排序?

(Pyspark - 在一段时间内按用户分组

Pyspark 分组和结构化数据

PySpark - 获取组中每一行的行号

(pySpark 中分组数据的模式