Structured Streaming
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Structured Streaming相关的知识,希望对你有一定的参考价值。
- import org.apache.spark.sql.types._
- val pathA = "hdfs:/tpc-ds/data/store_sales"
- val pathB = "hdfs:/tpc-ds/data/store/"
- // For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)
- val A_df = sqlContext.read.format("com.databricks.spark.csv")
- .option("header","false")
- .option("inferSchema","false")
- .option("delimiter","|")
- .load(pathA)
- // Assign column names to the Store Sales dataframe
- val storeSalesDF = A_df.select(
- A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
- A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
- A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
- A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
- )
- val B_df = sqlContext.read.format("com.databricks.spark.csv")
- .option("header","false")
- .option("inferSchema","false")
- .option("delimiter","|")
- .load(pathB)
- // Assign column names to the Region dataframe
- val storeDF = B_df.select(
- B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
- B_df("_c1").cast(StringType).as("S_STORE_ID")
- B_df("_c5").cast(StringType).as("S_STORE_NAME")
- )
- val joinedDF = storeSalesDF.join(storeDF,
- storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
- )
- joinedDF.take(5)
What is the full routine of Structured Streaming?
Let’s look at the code (the example is from the Spark source code and I made some edits):
val spark = SparkSession .builder .
master("local[2]") .
appName("StructuredNetworkWordCount").
getOrCreate()
val schemaExp = StructType(
StructField("name", StringType, false) ::
StructField("city", StringType, true)
:: Nil
)
//Standard DataSource API, only the read is changed to readStream.
val words = spark.readStream.format("json").schema(schemaExp)
.load("file:///tmp/dir")
//Some APIs of DataFrame.
val wordCounts = words.groupBy("name").count()
//Standard DataSource writing API, only the write is changed to writeStream.
val query = wordCounts.writeStream
//complete,append,update。Currently,
//only the first two types are supported.
.outputMode("complete")
//The console, parquet, memory, and foreach types
.format("console")
.trigger(ProcessingTime(5.seconds))//Here is where the timer is set.
.start()
query.awaitTermination()
This is the complete routine of Structured Streaming.
Structured Streaming currently only supports File and Socket sources. It can output four types, as mentioned above. The foreach can be infinitely expanded. For example:
val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds))
.outputMode("complete")
.foreach(new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}"))
fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp"))
true
}
}).start()
以上是关于Structured Streaming的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1