Pyspark_结构化流2
Posted 陈万君Allen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark_结构化流2相关的知识,希望对你有一定的参考价值。
Pyspark
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_结构化流2
#博学谷IT学习技术支持
文章目录
前言
接下次继续Pyspark_结构化流,今天是Sink(输出)操作。
一、输出模式
在结构化流中定义好df 或者 处理好df后, 调用 writeStream 完成数据写出操作, 在写出的过程中, 同样也可以设置一些相关的属性, 启动流式应用运行
输出模式:在进行输出的时候, 必须通过outPutMode来设置输出方案, 输出模式共提供三种输出模式
- 1- append 模式: 增量模式
- 指的: 当流处理管道中, 有了新的数据后,才会触发输出, 而且Append模式仅支持追加, 不支持聚合操作, 如果执行聚合会直接报错,同时不支持排序操作. 如果存在排序, 也会直接报错
- 2- complete模式: 完全(全量)模式
- 指的: 每一次都是针对整个所有批次数据进行处理, 由于数据会越来越多, 要求必须对数据进行聚合操作, 否则会直接报错
- 3- update模式: 更新模式
- 指的: 当处理模式中没有聚合操作的时候, 此模式与append基本是一致的, 但是如果有了聚合操作, 仅输出变更和新增的数据, 但是不支持排序操作
1.Append 模式
说明: 当有了聚合计算操作后, 会直接报出错误
说明: 在执行排序操作后, 会直接报出错误
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
if __name__ == '__main__':
print("spark streaming append")
spark = SparkSession.builder.appName("spark streaming append").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df = df.withColumn("words", F.explode(F.split("value", " "))).select("words")
df.writeStream.format("console").outputMode("append").start().awaitTermination()
2.Complete模式
说明: 当没有聚合操作的时候, 会报出错误
说明: 完全模式下, 支持对数据进行排序操作
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
if __name__ == '__main__':
print("spark streaming complete")
spark = SparkSession.builder.appName("spark streaming complete").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df = df.withColumn("words", F.explode(F.split("value", " "))).groupBy("words").agg(
F.count("words").alias("word_count")
).orderBy("word_count", ascending=False)
df.writeStream.format("console").outputMode("complete").start().awaitTermination()
3.Update模式
说明: 当对数据进行排序操作的时候, 会直接报出错误
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
if __name__ == '__main__':
print("spark streaming update")
spark = SparkSession.builder.appName("spark streaming update").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df = df.withColumn("words", F.explode(F.split("value", " "))).select("words").groupBy("words").agg(
F.count("words").alias("word_count")
)
df.writeStream.format("console").outputMode("update").start().awaitTermination()
二、输出位置
默认情况下, Spark的结构化流支持多种输出的方案:
1- console sink: 将结果输出到控制台, 主要是应用测试中 支持三种方式
2- File Sink: 文件接收器, 将结果输出到某个目录下, 形成文件数据, 仅支持追加方式
3- foreach Sink 或 froeachBatch Sink : 将数据进行遍历处理, 遍历后输出到任何位置 三种方式均支持
4- MemMory Sink: 将结果输出到内存中, 主要目的是进行再次迭代计算, 数据集不能过大, 仅支持append 和 complete模式
5- Kafka sink 将结果输出到Kafka 类似是Kafka的生产者 三种模式都支持
1.File Sink
文件输出方案:
一个批次对应一个文件, 有多少个批次, 就会产生多少个文件
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win
if __name__ == '__main__':
print("spark file sink")
spark = SparkSession.builder.appName("spark file sink").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df = df.withColumn("words", F.explode(F.split("value", " ")))
df.writeStream.format("csv").outputMode("append") \\
.option("header", True).option("sep", "\\001") \\
.option("checkpointLocation", "hdfs://node1:8020/structuredStreaming/chk") \\
.start("hdfs://node1:8020/structuredStreaming/output1").awaitTermination()
可以通过设置触发器, 调整每一批次产生间隔时间
from pyspark.sql import SparkSession
if __name__ == '__main__':
print("spark streaming trigger")
spark = SparkSession.builder.appName("spark streaming trigger").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream \\
.format('rate') \\
.option('rowsPerSecond', 1) \\
.option('rampUpTime', 0) \\
.option('numPartitions', 1) \\
.load()
df.writeStream.format("csv") \\
.outputMode("append") \\
.option("header", True).option("sep", "\\001") \\
.option("checkpointLocation", "hdfs://node1:8020/structuredStreaming/chk") \\
.trigger(processingTime="5 seconds") \\
.start("hdfs://node1:8020/structuredStreaming/output1")
df.writeStream.format("console") \\
.outputMode("append") \\
.trigger(processingTime="5 seconds") \\
.start().awaitTermination()
2.Foreach Sink
foreach sink: 对输出的数据, 一个个进行处理操作
方法一:通过process_row函数方式
from pyspark.sql import SparkSession
def process_row(row):
print(f"row.name,row.address")
if __name__ == '__main__':
print("spark streaming foreach")
spark = SparkSession.builder.appName("spark streaming foreach").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df.createTempView("t1")
df_res = spark.sql("""
select
split(value,"-")[0] as name,
split(value,"-")[1] as address
from t1
""")
df_res.writeStream.foreach(process_row).outputMode("append").trigger(
processingTime="10 seconds").start().awaitTermination()
方法二:通过自定义ForeachWriter类的方式
from pyspark.sql import SparkSession
class ForeachWriter(object):
def open(self, partition_id, epoch_id):
print(f'open方法正在执行, 正在初始化; partition_id,epoch_id')
return True
def process(self, row):
print(f"row.name,row.address")
def stop(self):
print('释放资源')
if __name__ == '__main__':
print("spark streaming foreach")
spark = SparkSession.builder.appName("spark streaming foreach").master("local[1]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df.createTempView("t1")
df_res = spark.sql("""
select
split(value,"-")[0] as name,
split(value,"-")[1] as address
from t1
""")
df_res.writeStream.foreach(ForeachWriter()).outputMode("append").trigger(
processingTime="10 seconds").start().awaitTermination()
3.ForeachBatch Sink
对输出的数据, 进行一批一批的处理操作
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
def foreachBatch(df, batch_id):
print(f"第batch_id批数据")
df = df.withColumn("new_address", F.concat("name", "address"))
df.show()
if __name__ == '__main__':
print("spark streaming foreachBatch")
spark = SparkSession.builder.appName("spark streaming foreachBatch").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df.createTempView("t1")
df_res = spark.sql("""
select
split(value,"-")[0] as name,
split(value,"-")[1] as address
from t1
""")
df_res.writeStream.foreachBatch(foreachBatch).outputMode("append").trigger(
processingTime="10 seconds").start().awaitTermination()
4.Memory Sink
内存输出 基于内存的数据进行二次迭代计算
import time
from pyspark.sql import SparkSession
if __name__ == '__main__':
print("spark streaming memory sink")
# 1- 创建SparkSession镀锡
spark = SparkSession.builder.appName("spark streaming memory sink").master("local[*]") \\
.config('spark.sql.shuffle.partitions', 4) \\
.getOrCreate()
# 2- 读取外部数据源: 监听某一个端口号
df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()
df.createTempView('t1')
# 3- 处理数据
df_res = spark.sql("""
select
split(value,'-')[0] as name,
split(value,'-')[1] as address
from t1
""")
df_res.writeStream.format("memory").queryName("t2").outputMode("append").trigger(
processingTime="10 seconds").start()
while True:
spark.sql("""
select * from t2
""").show()
time.sleep(5)
总结
今天主要和大家分享了输出操作,包括输出模式和输出位置。
在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源
【中文标题】在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源【英文标题】:Using redshift as a JDBC source for readStream in the Structured Streaming API (pyspark) 【发布时间】:2020-11-17 18:26:40 【问题描述】:我正在寻找一个包,或者以前使用 redshift 作为结构化流数据帧源的实现。
spark.readStream \
.format("io.github.spark_redshift_community.spark.redshift") \
.option('url', redshift_url) \
.option('forward_spark_s3_credentials', 'true') \
.load()
使用下面的格式会在读取时出错。如:
Data source io.github.spark_redshift_community.spark.redshift does not support streamed reading
如果您从 Spark 3 降级并使用:com.databricks.spark.redshift
,则会出现同样的错误
是否有已知的解决方法或方法/模式我可以用来实现(在 pyspark 中)redshift 作为 readStream 数据源
【问题讨论】:
【参考方案1】:正如错误所说,这个库不支持对 Redshift 进行流式读取/写入。
可以从项目源at link 确认。该格式不扩展或实现 Micro/Continuous 流读取器和写入器。
没有true streaming
简单的方法可以做到这一点。您可以探索以下途径,
-
探索第 3 方库。搜索
JDBC streaming spark
。免责声明:我没有使用过这些库,因此不认可这些库。
在自定义检查点机制上创建微批处理策略。
扩展说明: AFAIK,Spark JDBC 接口不支持结构化流。
【讨论】:
以上是关于Pyspark_结构化流2的主要内容,如果未能解决你的问题,请参考以下文章
使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问
我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?
基于Python Spark的大数据分析_pyspark实战项目课程