Pyspark 结构化流处理

Posted

技术标签:

【中文标题】Pyspark 结构化流处理【英文标题】:Pyspark Structured streaming processing 【发布时间】:2019-07-17 14:35:29 【问题描述】:

我正在尝试使用 spark 制作结构化流应用程序,主要思想是从 kafka 源读取,处理输入,写回另一个主题。我已经成功地使火花读写卡夫卡,但我的问题是处理部分。我已经尝试使用 foreach 函数来捕获每一行并在写回 kafka 之前对其进行处理,但是它总是只执行 foreach 部分并且从不写回 kafka。但是,如果我从写入流中删除 foreach 部分,它将继续写入,但现在我失去了处理。

如果有人能给我一个例子来说明如何做到这一点,我将非常感激。

这是我的代码

spark = SparkSession \
.builder \
.appName("StructuredStreamingTrial") \
.getOrCreate()
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "KafkaStreamingSource") \
  .load()

ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .writeStream \
  .outputMode("update") \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "StreamSink") \
  .option("checkpointLocation", "./testdir")\
  .foreach(foreach_function)
  .start().awaitTermination()

foreach_function 就是

def foreach_function(df):
    try:
        print(df)
    except:
        print('fail')
    pass 

【问题讨论】:

【参考方案1】:

在基于 Pyspark 的结构化流 API 中写入 Kafka 接收器之前处理数据,我们可以轻松地使用 UDF 函数处理任何类型的复杂转换。

示例代码如下。此代码尝试读取 JSON 格式的消息 Kafka 主题并解析消息以将消息从 JSON 转换为 CSV 格式并重写为另一个主题。您可以处理任何处理转换来代替 'json_formatted' function 。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os

#  Spark Streaming context :

spark = SparkSession.builder.appName('pda_inst_monitor_status_update').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)


#  Creating  readstream DataFrame :

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "KafkaStreamingSource") \
  .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

df1.registerTempTable("test")


def json_formatted(s):
    val_dict = json.loads(s)
    return str([
                    val_dict["after"]["ID"]
                ,   val_dict["after"]["INST_NAME"]
                ,   val_dict["after"]["DB_UNIQUE_NAME"]
                ,   val_dict["after"]["DBNAME"]
                ,   val_dict["after"]["MON_START_TIME"]
                ,   val_dict["after"]["MON_END_TIME"]
                ]).strip('[]').replace("'","").replace('"','')

spark.udf.register("JsonformatterWithPython", json_formatted)

squared_udf = udf(json_formatted)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))



#  Declaring the Readstream Schema DataFrame :

df2.coalesce(1).writeStream \
   .writeStream \
   .outputMode("update") \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "localhost:9092") \
   .option("topic", "StreamSink") \
   .option("checkpointLocation", "./testdir")\
   .start()

ssc.awaitTermination()

【讨论】:

以上是关于Pyspark 结构化流处理的主要内容,如果未能解决你的问题,请参考以下文章

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

PySpark 结构化流式处理:将查询的输出传递到 API 端点

PySpark 结构化流将 udf 应用于窗口

kafka 到 pyspark 结构化流,将 json 解析为数据帧

pyspark 结构数据处理

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?