使用 json 文件触发流式传输
Posted
技术标签:
【中文标题】使用 json 文件触发流式传输【英文标题】:spark streaming with json file 【发布时间】:2017-07-22 07:19:02 【问题描述】:我想通过火花流从文件夹位置读取 json 数据。
我假设我的 json 数据是
"transactionId":111,"customerId":1,"itemId": 1,"amountPaid": 100
我希望 Spark SQL 表中的输出为:--
transactionId customerId itemId amountPaid
111 1 1 100
我的代码是:
package org.training.spark.streaming
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds, StreamingContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.functions.udf
import org.training.spark.streaming.sqlstreaming.Persons
object jsonread
def main(args: Array[String])
val sparkConf = new SparkConf().setMaster("local").setAppName("jsonstreaming")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(40))
val lines = ssc.textFileStream("src/main/resources/fileStreaming")
lines.foreachRDD(rdd=>rdd.foreach(println))
val words = lines.flatMap(_. split(","))
words.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
import sqc.implicits._
words.foreachRDD rdd =>
val persons = rdd.map(_.split(":")).map(p => (p(0), p(1))).toDF()
persons.registerTempTable("data")
val jsontable = sqc.sql("SELECT * from data")
jsontable.show
ssc.start()
ssc.awaitTermination()
【问题讨论】:
什么不起作用? println 和 jsontable.show 的结果是什么? println 结果:--- "transactionId":111 "customerId":1 "itemId": 1 "amountPaid": 100 jsontable.show 结果:-- +----------------+-----+ | _1| _2| +----------------+-----+ |"transactionId"| 111| | "客户ID"| 1| | "项目ID"| 1| | “支付金额”| 100| +----------------+-----+ 您的评论说,您能看到控制台中打印的输出吗?对?您希望它以某种格式打印吗? 【参考方案1】:Json 数据:
"transactionId":"111","customerId":"1","itemId": "1","amountPaid": "100"
从上面的 json 数据中读取的 Pyspark 代码:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)
stream_data = ssc.textFileStream("/filepath/")
def readMyStream(rdd):
if not rdd.isEmpty():
df = spark.read.json(rdd)
print('Started the Process')
print('Selection of Columns')
df = df.select('transactionId','customerId','itemId','amountPaid').where(col("transactionId").isNotNull())
df.show()
stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()
【讨论】:
以上是关于使用 json 文件触发流式传输的主要内容,如果未能解决你的问题,请参考以下文章
如何使用预定义的 GitLab CI 变量和流式传输到 GitLab Pipeline 日志的 Tekton 日志直接从 GitLab CI 触发 Tekton Pipeline