使用 json 文件触发流式传输

Posted

技术标签:

【中文标题】使用 json 文件触发流式传输【英文标题】:spark streaming with json file 【发布时间】:2017-07-22 07:19:02 【问题描述】:

我想通过火花流从文件夹位置读取 json 数据。

我假设我的 json 数据是

"transactionId":111,"customerId":1,"itemId": 1,"amountPaid": 100

我希望 Spark SQL 表中的输出为:--

transactionId   customerId  itemId  amountPaid
    111              1         1       100

我的代码是:

package org.training.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.functions.udf
import org.training.spark.streaming.sqlstreaming.Persons

object jsonread 
  def main(args: Array[String]) 
    val sparkConf = new SparkConf().setMaster("local").setAppName("jsonstreaming")

    val sc = new SparkContext(sparkConf)

    // Create the context
    val ssc = new StreamingContext(sc, Seconds(40))
    val lines = ssc.textFileStream("src/main/resources/fileStreaming")
    lines.foreachRDD(rdd=>rdd.foreach(println))

    val words = lines.flatMap(_. split(","))

    words.foreachRDD(rdd=>rdd.foreach(println))
    val sqc = new SQLContext(sc);
    import sqc.implicits._

    words.foreachRDD  rdd =>
      val persons = rdd.map(_.split(":")).map(p => (p(0), p(1))).toDF()

      persons.registerTempTable("data")

      val jsontable = sqc.sql("SELECT * from data")
      jsontable.show
    
    ssc.start()
    ssc.awaitTermination()
  

【问题讨论】:

什么不起作用? println 和 jsontable.show 的结果是什么? println 结果:--- "transactionId":111 "customerId":1 "itemId": 1 "amountPaid": 100 jsontable.show 结果:-- +----------------+-----+ | _1| _2| +----------------+-----+ |"transactionId"| 111| | "客户ID"| 1| | "项目ID"| 1| | “支付金额”| 100| +----------------+-----+ 您的评论说,您能看到控制台中打印的输出吗?对?您希望它以某种格式打印吗? 【参考方案1】:

Json 数据:

"transactionId":"111","customerId":"1","itemId": "1","amountPaid": "100"

从上面的 json 数据中读取的 Pyspark 代码:

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window


sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)

stream_data = ssc.textFileStream("/filepath/")


def readMyStream(rdd):
  if not rdd.isEmpty():
    df = spark.read.json(rdd)
    print('Started the Process')
    print('Selection of Columns')
    df = df.select('transactionId','customerId','itemId','amountPaid').where(col("transactionId").isNotNull())
    df.show()


stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()

【讨论】:

以上是关于使用 json 文件触发流式传输的主要内容,如果未能解决你的问题,请参考以下文章

如何使用预定义的 GitLab CI 变量和流式传输到 GitLab Pipeline 日志的 Tekton 日志直接从 GitLab CI 触发 Tekton Pipeline

等待所有流完成 - 流式传输文件目录

gRPC 服务器端流式传输:如何无限期地继续流式传输?

Spring-MVC:如何从控制器流式传输 mp3 文件

如何从文件中流式传输 JSON?

使用 GSON 的 JsonReader 流式传输 Json 文件时,您可以将对象转储为字符串吗?