Spark Streaming 实时处理
Posted
技术标签:
【中文标题】Spark Streaming 实时处理【英文标题】:Spark Streaming Real Time Processing 【发布时间】:2015-07-06 15:50:42 【问题描述】:我需要一个能够在工作人员之间进行通信的应用程序。假设工人 1 正在处理工作 1,它将生成其他工作所依赖的数据输出。此外,这个过程要重复很多次,这意味着每次工人 1 生成一个新的数据集,其他工人应该开始输入这个数据集并开始工作。火花能做到这一点吗?到目前为止,我已经看到了火花流实时处理,但流通信似乎没有发生在工作人员之间?任何方向或建议将不胜感激。
【问题讨论】:
听起来你正在寻找类似 Akka 的东西 【参考方案1】:您必须在 1 个 Spark Streaming Job 中一个接一个地定义所需的操作。
虽然我没有尝试过,但您也可以尝试使用一些工作流组件(例如 Oozie)来配置您的标准 Spark 批处理作业(非流式处理)。
最近 Spring XD 还引入了与 Spark Jobs 的集成。这也可能有效 - http://www.slideshare.net/mark_fisher/spark-meets-spring。
【讨论】:
【参考方案2】:import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import org.json4s.jackson.Json;
public class lotWeather
public static void main(String[] args) throws StreamingQueryException
System.setProperty("hadoop.home.dir", "C:\\hadoop-common-2.2.0-bin-master");
SparkSession sparkSession = SparkSession.builder().appName("SparkStreamingMessageListener").master("local").getOrCreate();
enter code here
StructType weatherType= new StructType().add("quarter","String").add("heatType", "string").add("heat","integer")
.add("windType","string").add("wind","integer");
Dataset<Row> rawData = sparkSession.readStream().schema(weatherType).option("sep", ",")
.csv("C:\\Users\\sorun\\OneDrive\\Masaüstü\\bigdata\\sparkstreaming\\*");
Dataset<Row> heatData = rawData.select("quarter", "heat").where("heat>29");
StreamingQuery start = heatData.writeStream().outputMode("append").format("console").start();
start.awaitTermination();
【讨论】:
一些解释和代码通常对 OP 有帮助 link 错误链接以上是关于Spark Streaming 实时处理的主要内容,如果未能解决你的问题,请参考以下文章
流式计算助力实时数据处理spark-streaming入门实战
Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二