知识点-Spark小节

Posted Current

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了知识点-Spark小节相关的知识,希望对你有一定的参考价值。

Spark处理字符串日期的max和min的方式
Spark处理数据存储到Hive的方式
Spark处理新增列的方式map和udf、functions
Spark处理行转列pivot的使用
Python 3.5.3
Spark1.6.2

欢迎访问个人主页博客

Spark处理字符串日期的max和min的方式

一般是字符串类型的日期在使用Spark的agg求max时,是不正确的,API显示只支持数值型的max、min
hive的SQL查询引擎是支持字符串日期的max和min的

字符串日期转为时间戳再聚合

unix_timestamp

public static Column unix_timestamp(Column s)
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.
Parameters:
s - (undocumented)
Returns:
(undocumented)
Since:
1.5.0
from pyspark.sql import functions as F

df.withColumn(startuptime_stamp, F.unix_timestamp(startuptime))
使用HiveSQL
select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id

Spark处理数据存储到Hive的方式

通常Spark任务处理后的结果数据会存储到Hive表中,可以先保存至HDFS目录再load、最方便还是直接使用临时表和HiveContext插入数据

saveAsTextFile & load data

repartition根据实际文件大小进行调整,数据比较小时,保存成一个文件

df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)

先删除分区,如果已经存在的话
再覆盖原来的数据【方便重新重复跑或修复数据】
此处使用shell,也可使用HiveContext的sql

alter table app_table drop if exists partition(datestr=$day_01);
load data inpath hdfs://xx/out/$day_01 overwrite into table app_table partition(datestr=$day_01);
hivectx.sql & insert
app_table1_df.registerTempTable("app_table1_tmp")
app_table2_df.registerTempTable("app_table2_tmp")
hivectx.sql("set spark.sql.shuffle.partitions=1")
hivectx.sql("alter table app_table drop if exists partition(datestr=‘%s‘)" % daystr)
hivectx.sql("insert overwrite table app_table partition(datestr=‘%s‘) select * from app_table1_tmp" % daystr)
hivectx.sql("insert into app_table partition(datestr=‘%s‘) select * from app_table2_tmp" % daystr)

Spark处理新增列的方式map和udf、functions

Spark在处理数据转换时,通常需要使用map、flatmap等操作,其中使用map会产生新的列或修改某列字段的值
Spark同样支持自定义函数UDF以及提供了类似Hive内置函数的各种各样的处理函数

map

需要定义函数和StructType
忽略数值判断细节和精度等

from pyspark.sql.types import *

def a_func(_):
    return _[id], _[cnt1], _[cnt2], _[cnt1] / (_[cnt1] + _[cnt1])

a_schema = StructType([
    StructField(id, StringType(), True),
    StructField(cnt1, IntegerType(), True),
    StructField(cnt2, IntegerType(), True),
    StructField(cnt1_rate, IntegerType(), True)
])

a_new_df = sqlctx.createDataFrame(df.select(id, cnt1, cnt2).map(a_func), a_schema)
udf

需要定义函数和UDF
忽略数值判断细节和精度等

def a_func(cnt1, cnt2):
    return cnt1 / (cnt1 + cnt2)

a_udf = F.udf(a_func, IntegerType())

a_new_df = df.withColumn(cnt1_rate, a_udf(df[cnt1], df[cnt2])
functions

处理类似日期字符串的格式转换、等等等
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Spark处理行转列pivot的使用

在使用SQL查询数据时,很多情况下需要将行转为列,以有利于数据的展示和不同维度需求的利用
一般可采用子查询case when、连续join、字段补全union的形式
Spark的DataFrame中可以通过GroupedData的pivot函数来实现

df.groupBy([course_name]).pivot(daystr).sum(score)

df.groupBy([course_name]).pivot(daystr).count()

转换前

daystr course_name score
2017-11-15 yuwen 1
2017-11-15 yuwen 1
2017-11-15 shuxue 1
2017-11-15 yingyu 2
2017-11-16 yuwen 1
2017-11-16 shuxue 1
2017-11-16 yingyu 2

转换后

course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 2 2
course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 1 1

原文地址:https://blog.icocoro.me/2017/11/16/1711-zhishidian-spark%E5%B0%8F%E8%8A%8201/index.html

以上是关于知识点-Spark小节的主要内容,如果未能解决你的问题,请参考以下文章

python+spark程序代码片段

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

基础乐理

Spark闭包与序列化

android小知识点代码片段

Spark Core任务运行机制和Task源代码浅析1