ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)
Posted 二两窝子面
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)相关的知识,希望对你有一定的参考价值。
尝试在ODPS上用Spark,由于java和scala都需要装SDK和配置POM来编写ODPS Spark脚本(其实就是自己被各版本号弄懵了没装明白),而Dataworks中可以直接在线编写Python资源并提交使用,因此测试了Pyspark。
数据集操作逻辑:跟最早时候在ODPS上测试使用MR一样,习惯性地做分组排序并自关联的测试——将数据集做分组排序并打上row_number,再对rn加个偏移量进行自关联,最后写入目标表。
中文乱码问题:由于工作原因,Dataworks中的很多模块比较落后,因此被编码问题困扰了很久。具体表现就是中文插入目标表后是乱码的情况。
经测试,reload(sys)\\sys.setdefaultencoding('utf8')是没用的。。。官方说的添加配置项也是没用的。。。最后想着先无脑decode一下,发现竟然解决了问题,说明ODPS Spark使用的是unicode?最后结论就是注意使用decode,示例如下:
# -*- coding: utf-8 -*-
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import random
reload(sys)
sys.setdefaultencoding('utf8')
if __name__ == "__main__":
spark = SparkSession.builder\\
.appName("spark sql")\\
.config("spark.sql.broadcastTimeout", 20 * 60)\\
.config("spark.sql.crossJoin.enabled", True)\\
.config("odps.exec.dynamic.partition.mode", "nonstrict")\\
.config("spark.sql.catalogImplementation", "odps")\\
.getOrCreate()
#创建\\导入
#创建一个可迭代对象
data = [i for i in range(0, 100)]
#调用sparkContext将dara转化为一个RDD,并通过map函数生成两个字段
rdd0 = spark.sparkContext.parallelize(data, 2).map(lambda s: (random.choice(["杭州","宁波","温州"]).decode("utf-8"), s))
#通过toDF转换成结构化DataFrame
df0 = rdd0.toDF("city: string, num: int")
#或者直接通过结构化接口创建DataFrame
# df0 = spark.createDataFrame()
#或者读取已有结构化数据
#sourcetable="project.input_source_table"
#df0 = spark.sql("select city,num from %s" % sourcetable)
#计算
#分组排序打序号-结构化接口DF操作方式(用开窗函数),也可以直接跑SQL会更简单一些,效率上没有差别
window = Window.partitionBy('city').orderBy(df0.num) #传入df0.num.desc()则为倒序
df1 = df0.withColumn('rn', F.row_number().over(window))
#分组排序打序号-低级接口RDD操作方式
#注意:这里groupByKey没有传入分区器,shuffle应该没有排序,原生sorted操作使内存消耗变大,所以可能比结构化接口低效
# rdd1 = rdd0.groupByKey().flatMap(lambda row: list(map(lambda x: (row[0],x[1],x[0]),enumerate(sorted(row[1]),1))))
# # groupByKey返回的是RDD[K,Iterator[V]],不熟的话会有点小绕,可以先用Python原生map梳理下转化操作的逻辑:
# # data=[('a',[1,5,4,6,3,2]),('b',[11,15,14,16,13,12])]
# # ans=map(lambda row: list(map(lambda x: (row[0],x[0],x[1]),enumerate(sorted(row[1]),1))),data)
# # 没有原生flatMap可以使用reduce,比如:reduce(lambda x,y:x+y,ans)
# df1 = rdd1.toDF("city: string, num: int ,rn: int")
#对某列进行加工
df = df1.withColumn("city",F.decode(F.format_string("%s市",df1.city),"utf-8"))
# #数据过滤
# df = df.filter(df.city=="杭州市")
#关联
df_right = df.withColumn("rn",df.rn+1)
df = df.join(df_right,["city","rn"],'left')
df.show(10)
df = df.toDF("city","num","rn","num2")
#RDD的关联接口不算复杂,注意需要通过map重新将需关联的字段组成key,在此不做示例
#写出
#创建目标表
tablename="project.output_target_table"
spark.sql("drop table if exists %s" %(tablename))
spark.sql("create table % s (city string COMMENT '地市',num bigint comment '号',rn bigint comment '序',num2 bigint comment '号2')"%(tablename))
# df.show(10)
#写入目标表
df.write.mode('overwrite').option("encoding","utf-8").insertInto(tablename)
#临时视图方式写入目标表
#tmpview=tablename+"_tmp"
# df.createOrReplaceTempView(tmpview)
# spark.sql("insert overwrite table %s from %s" %(tablename,tmpview))
以上是关于ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)的主要内容,如果未能解决你的问题,请参考以下文章