使用 python 构建 Spark 结构化流

Posted

技术标签:

【中文标题】使用 python 构建 Spark 结构化流【英文标题】:Spark structured streaming with python 【发布时间】:2017-04-07 17:55:57 【问题描述】:

我正在尝试使用 Kafka 和 Python 激发结构化流式传输。 需求:我需要在 Spark 中处理来自 Kafka(JSON 格式)的流数据(执行转换),然后将其存储在数据库中。

我有 JSON 格式的数据,例如, "a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"

我打算使用spark.readStream 来阅读 Kafka 之类的内容,

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()

我参考了this link 以供参考,但不知道如何解析 JSON 数据。我试过了,

data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)]

但看起来它不起作用。

任何使用 Python 处理 Spark 结构化流的人都可以指导我继续使用示例示例或链接吗?

使用,

schema = StructType([
    StructField("a", DoubleType()),
    StructField("b", DoubleType()),
    StructField("name", StringType()),
    StructField("time", TimestampType())])

inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
data = inData.select(from_json(col("value").cast("string"), schema))
query = data.writeStream.outputMode("Append").format("console").start()

程序运行,但我在控制台上获得值,

+-----------------------------------+
|jsontostruct(CAST(value AS STRING))|
+-----------------------------------+
|               [null,null,null,2...|
|               [null,null,null,2...|
+-----------------------------------+

17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: 
  "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e",
  "runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b",
  "name" : null,
  "timestamp" : "2017-04-07T19:23:15.013Z",
  "numInputRows" : 2,
  "inputRowsPerSecond" : 125.0,
  "processedRowsPerSecond" : 12.269938650306749,
  "durationMs" : 
    "addBatch" : 112,
    "getBatch" : 8,
    "getOffset" : 2,
    "queryPlanning" : 4,
    "triggerExecution" : 163,
    "walCommit" : 26
  ,
  "eventTime" : 
    "watermark" : "1970-01-01T00:00:00.000Z"
  ,
  "stateOperators" : [ ],
  "sources" : [ 
    "description" : "KafkaSource[Subscribe[test]]",
    "startOffset" : 
      "test" : 
        "0" : 366
      
    ,
    "endOffset" : 
      "test" : 
        "0" : 368
      
    ,
    "numInputRows" : 2,
    "inputRowsPerSecond" : 125.0,
    "processedRowsPerSecond" : 12.269938650306749
   ],
  "sink" : 
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6aa91aa2"
  

我在这里错过了什么吗?

【问题讨论】:

【参考方案1】:

您可以将from_json 与架构一起使用:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType([
    StructField("a", DoubleType()),
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())])

data.select(from_json(col("value").cast("string"), schema))

或者使用get_json_object获取单个字段作为字符串:

from pyspark.sql.functions import get_json_object

data.select([
    get_json_object(col("value").cast("string"), "$.".format(c)).alias(c)
    for c in ["a", "b", "name", "time"]])

cast他们稍后根据您的需要。

【讨论】:

以上是关于使用 python 构建 Spark 结构化流的主要内容,如果未能解决你的问题,请参考以下文章

[Spark]-结构化流之初始篇

地铁译:Spark for python developers ---构建Spark批处理和流处理应用前的数据准备

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

带有自定义接收器的 Spark 结构化流中的输入行数

使用 spark 结构化流从 s3 读取 avro 文件

将 Spark 结构化流与 Confluent Schema Registry 集成