Spark steaming 从 Kafka 读取并在 Java 中应用 Spark SQL 聚合
Posted
技术标签:
【中文标题】Spark steaming 从 Kafka 读取并在 Java 中应用 Spark SQL 聚合【英文标题】:Spark steaming read from Kafka and apply Spark SQL aggregations in Java 【发布时间】:2017-05-11 11:06:05 【问题描述】:我有一个从数据库读取数据并应用 Spark SQL 聚合的 Spark 作业。代码如下(只省略conf选项):
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sqlContext = new SQLContext(sc);
Dataset df = MongoSpark.read(sqlContext).options(readOptions).load();
df.registerTempTable("data");
df.cache();
aggregators = sqlContext.sql(myQuery);
现在我想创建另一个作业,通过 Spark 流 从 Kafka 读取消息,然后通过 Spark SQL 应用相同的聚合。到目前为止我的代码如下:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "192.168.99.100:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", KafkaStatisticsPayloadDeserializer.class);
kafkaParams.put("group.id", "Group1");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(topic);
SparkConf conf = new SparkConf().setAppName(topic).setMaster("local");
/*
* Spark streaming context
*/
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));
/*
* Create an input DStream for Receiving data from socket
*/
JavaInputDStream<ConsumerRecord<String, StatisticsRecord>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, StatisticsRecord>Subscribe(topics, kafkaParams)
);
到目前为止,我已经成功阅读并反序列化了这些消息。所以我的问题是我如何才能在它们上实际应用 Spark SQL 聚合。我尝试了以下但它不起作用。我想我需要首先隔离包含实际消息的“值”字段。
SQLContext sqlContext = new SQLContext(streamingContext.sparkContext());
stream.foreachRDD(rdd ->
Dataset<Row> df = sqlContext.createDataFrame(rdd.rdd(), StatisticsRecord.class);
df.createOrReplaceTempView("data");
df.cache();
Dataset aggregators = sqlContext.sql(SQLContextAggregations.ORDER_TYPE_DB);
aggregators.show();
);
【问题讨论】:
看到这个了吗? spark.apache.org/docs/2.1.0/… 或 databricks.com/blog/2017/04/26/… 【参考方案1】:我已经用下面的代码解决了这个问题。请注意,我现在将消息存储为 JSON 格式而不是实际对象。
SparkConf conf = new SparkConf().setAppName(topic).setMaster("local");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(2));
SparkSession spark = SparkSession.builder().appName(topic).getOrCreate();
/*
* Kafka conf
*/
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", dbUri);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "Group4");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("Statistics");
/*
* Create an input DStream for Receiving data from socket
*/
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
/*
* Keep only the actual message in JSON format
*/
JavaDStream<String> recordStream = stream.flatMap(record -> Arrays.asList(record.value()).iterator());
/*
* Extract RDDs from stream and apply aggregation in each one
*/
recordStream.foreachRDD(rdd ->
if (rdd.count() > 0)
Dataset<Row> df = spark.read().json(rdd.rdd());
df.createOrReplaceTempView("data");
df.cache();
Dataset aggregators = spark.sql(SQLContextAggregations.ORDER_TYPE_DB);
aggregators.show();
);
【讨论】:
【参考方案2】:您应该在应用于您的流的函数中调用上下文。
【讨论】:
以上是关于Spark steaming 从 Kafka 读取并在 Java 中应用 Spark SQL 聚合的主要内容,如果未能解决你的问题,请参考以下文章
Spark:使用 Spark Scala 从 Kafka 读取 Avro 消息
Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题
从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题