Pyspark_结构化流2

Posted 陈万君Allen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark_结构化流2相关的知识,希望对你有一定的参考价值。

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_结构化流2
#博学谷IT学习技术支持


文章目录


前言

接下次继续Pyspark_结构化流,今天是Sink(输出)操作。


一、输出模式

在结构化流中定义好df 或者 处理好df后, 调用 writeStream 完成数据写出操作, 在写出的过程中, 同样也可以设置一些相关的属性, 启动流式应用运行

输出模式:在进行输出的时候, 必须通过outPutMode来设置输出方案, 输出模式共提供三种输出模式

  • 1- append 模式: 增量模式
    • 指的: 当流处理管道中, 有了新的数据后,才会触发输出, 而且Append模式仅支持追加, 不支持聚合操作, 如果执行聚合会直接报错,同时不支持排序操作. 如果存在排序, 也会直接报错
  • 2- complete模式: 完全(全量)模式
    • 指的: 每一次都是针对整个所有批次数据进行处理, 由于数据会越来越多, 要求必须对数据进行聚合操作, 否则会直接报错
  • 3- update模式: 更新模式
    • 指的: 当处理模式中没有聚合操作的时候, 此模式与append基本是一致的, 但是如果有了聚合操作, 仅输出变更和新增的数据, 但是不支持排序操作

1.Append 模式

说明: 当有了聚合计算操作后, 会直接报出错误
说明: 在执行排序操作后, 会直接报出错误

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("spark streaming append")

    spark = SparkSession.builder.appName("spark streaming append").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " "))).select("words")

    df.writeStream.format("console").outputMode("append").start().awaitTermination()

2.Complete模式

说明: 当没有聚合操作的时候, 会报出错误
说明: 完全模式下, 支持对数据进行排序操作

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    print("spark streaming complete")

    spark = SparkSession.builder.appName("spark streaming complete").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " "))).groupBy("words").agg(
        F.count("words").alias("word_count")
    ).orderBy("word_count", ascending=False)

    df.writeStream.format("console").outputMode("complete").start().awaitTermination()

3.Update模式

说明: 当对数据进行排序操作的时候, 会直接报出错误

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


if __name__ == '__main__':
    print("spark streaming update")

    spark = SparkSession.builder.appName("spark streaming update").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " "))).select("words").groupBy("words").agg(
        F.count("words").alias("word_count")
    )

    df.writeStream.format("console").outputMode("update").start().awaitTermination()

二、输出位置

默认情况下, Spark的结构化流支持多种输出的方案:

1- console sink: 将结果输出到控制台, 主要是应用测试中 支持三种方式

2- File Sink: 文件接收器, 将结果输出到某个目录下, 形成文件数据, 仅支持追加方式

3- foreach Sink 或 froeachBatch Sink : 将数据进行遍历处理, 遍历后输出到任何位置 三种方式均支持

4- MemMory Sink: 将结果输出到内存中, 主要目的是进行再次迭代计算, 数据集不能过大, 仅支持append 和 complete模式

5- Kafka sink 将结果输出到Kafka 类似是Kafka的生产者 三种模式都支持

1.File Sink

文件输出方案:
一个批次对应一个文件, 有多少个批次, 就会产生多少个文件

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window as win

if __name__ == '__main__':
    print("spark file sink")

    spark = SparkSession.builder.appName("spark file sink").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df = df.withColumn("words", F.explode(F.split("value", " ")))

    df.writeStream.format("csv").outputMode("append") \\
        .option("header", True).option("sep", "\\001") \\
        .option("checkpointLocation", "hdfs://node1:8020/structuredStreaming/chk") \\
        .start("hdfs://node1:8020/structuredStreaming/output1").awaitTermination()

可以通过设置触发器, 调整每一批次产生间隔时间

from pyspark.sql import SparkSession

if __name__ == '__main__':
    print("spark streaming trigger")

    spark = SparkSession.builder.appName("spark streaming trigger").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream \\
        .format('rate') \\
        .option('rowsPerSecond', 1) \\
        .option('rampUpTime', 0) \\
        .option('numPartitions', 1) \\
        .load()

    df.writeStream.format("csv") \\
        .outputMode("append") \\
        .option("header", True).option("sep", "\\001") \\
        .option("checkpointLocation", "hdfs://node1:8020/structuredStreaming/chk") \\
        .trigger(processingTime="5 seconds") \\
        .start("hdfs://node1:8020/structuredStreaming/output1")

    df.writeStream.format("console") \\
        .outputMode("append") \\
        .trigger(processingTime="5 seconds") \\
        .start().awaitTermination()

2.Foreach Sink

foreach sink: 对输出的数据, 一个个进行处理操作
方法一:通过process_row函数方式

from pyspark.sql import SparkSession

def process_row(row):
    print(f"row.name,row.address")


if __name__ == '__main__':
    print("spark streaming foreach")

    spark = SparkSession.builder.appName("spark streaming foreach").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView("t1")

    df_res = spark.sql("""
        select
            split(value,"-")[0] as name,
            split(value,"-")[1] as address
        from t1
    """)

    df_res.writeStream.foreach(process_row).outputMode("append").trigger(
        processingTime="10 seconds").start().awaitTermination()

方法二:通过自定义ForeachWriter类的方式

from pyspark.sql import SparkSession

class ForeachWriter(object):
    def open(self, partition_id, epoch_id):
        print(f'open方法正在执行, 正在初始化; partition_id,epoch_id')
        return True

    def process(self, row):
        print(f"row.name,row.address")

    def stop(self):
        print('释放资源')


if __name__ == '__main__':
    print("spark streaming foreach")

    spark = SparkSession.builder.appName("spark streaming foreach").master("local[1]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView("t1")

    df_res = spark.sql("""
        select
            split(value,"-")[0] as name,
            split(value,"-")[1] as address
        from t1
    """)

    df_res.writeStream.foreach(ForeachWriter()).outputMode("append").trigger(
        processingTime="10 seconds").start().awaitTermination()

3.ForeachBatch Sink

对输出的数据, 进行一批一批的处理操作

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

def foreachBatch(df, batch_id):
    print(f"第batch_id批数据")
    df = df.withColumn("new_address", F.concat("name", "address"))
    df.show()

if __name__ == '__main__':
    print("spark streaming foreachBatch")

    spark = SparkSession.builder.appName("spark streaming foreachBatch").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView("t1")

    df_res = spark.sql("""
        select
            split(value,"-")[0] as name,
            split(value,"-")[1] as address
        from t1
    """)

    df_res.writeStream.foreachBatch(foreachBatch).outputMode("append").trigger(
        processingTime="10 seconds").start().awaitTermination()

4.Memory Sink

内存输出 基于内存的数据进行二次迭代计算

import time
from pyspark.sql import SparkSession

if __name__ == '__main__':
    print("spark streaming memory sink")

    # 1- 创建SparkSession镀锡
    spark = SparkSession.builder.appName("spark streaming memory sink").master("local[*]") \\
        .config('spark.sql.shuffle.partitions', 4) \\
        .getOrCreate()

    # 2- 读取外部数据源: 监听某一个端口号
    df = spark.readStream.format("socket").option("host", "node1").option("port", "44444").load()

    df.createTempView('t1')

    # 3- 处理数据
    df_res = spark.sql("""
        select
            split(value,'-')[0] as name,
            split(value,'-')[1] as address
        from t1
    """)

    df_res.writeStream.format("memory").queryName("t2").outputMode("append").trigger(
        processingTime="10 seconds").start()

    while True:
        spark.sql("""
            select * from t2
        """).show()
        time.sleep(5)


总结

今天主要和大家分享了输出操作,包括输出模式和输出位置。

在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源

【中文标题】在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源【英文标题】:Using redshift as a JDBC source for readStream in the Structured Streaming API (pyspark) 【发布时间】:2020-11-17 18:26:40 【问题描述】:

我正在寻找一个包,或者以前使用 redshift 作为结构化流数据帧源的实现。

spark.readStream \
    .format("io.github.spark_redshift_community.spark.redshift") \
    .option('url', redshift_url) \
    .option('forward_spark_s3_credentials', 'true') \
    .load()

使用下面的格式会在读取时出错。如:

Data source io.github.spark_redshift_community.spark.redshift does not support streamed reading

如果您从 Spark 3 降级并使用:com.databricks.spark.redshift,则会出现同样的错误

是否有已知的解决方法或方法/模式我可以用来实现(在 pyspark 中)redshift 作为 readStream 数据源

【问题讨论】:

【参考方案1】:

正如错误所说,这个库不支持对 Redshift 进行流式读取/写入。

可以从项目源at link 确认。该格式不扩展或实现 Micro/Continuous 流读取器和写入器。

没有true streaming 简单的方法可以做到这一点。您可以探索以下途径,

    探索第 3 方库。搜索JDBC streaming spark。免责声明:我没有使用过这些库,因此不认可这些库。 在自定义检查点机制上创建微批处理策略。

扩展说明: AFAIK,Spark JDBC 接口不支持结构化流。

【讨论】:

以上是关于Pyspark_结构化流2的主要内容,如果未能解决你的问题,请参考以下文章

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

基于Python Spark的大数据分析_pyspark实战项目课程

kafka 到 pyspark 结构化流,将 json 解析为数据帧

大数据(8s)Spark结构化流

在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源