如何使用火花流处理实时流数据/日志?
Posted
技术标签:
【中文标题】如何使用火花流处理实时流数据/日志?【英文标题】:How to work with real time streaming data/logs using spark streaming? 【发布时间】:2016-04-19 06:28:34 【问题描述】:我是 Spark 和 Scala 的新手。
我想实现一个实时 Spark Consumer,它可以从 Kafka Publisher 每分钟读取网络日志 [获取大约 1GB 的 JSON 日志行/分钟],最后将聚合值存储在 ElasticSearch 中。
聚合基于几个值 [如 bytes_in、bytes_out 等],使用复合键 [如:客户端 MAC、客户端 IP、服务器 MAC、服务器 IP 等]。
我写的 Spark Consumer 是:
object LogsAnalyzerScalaCS
def main(args : Array[String])
val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION")
sparkConf.set("es.nodes", "my ip address")
sparkConf.set("es.port", "9200")
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes.discovery", "false")
val elasticResource = "conrec_1min/1minute"
val ssc = new StreamingContext(sparkConf, Seconds(30))
val zkQuorum = "my zk quorum IPs:2181"
val consumerGroupId = "LogsConsumer"
val topics = "Logs"
val topicMap = topics.split(",").map((_,3)).toMap
val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap)
val logJSON = json.map(_._2)
try
logJSON.foreachRDD( rdd =>
if(!rdd.isEmpty())
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = sqlContext.read.json(rdd)
val groupedData =
((df.groupBy("id","start_time_formated","l2_c","l3_c",
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration"))
val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time")
dataForES.saveToEs(elasticResource)
dataForES.show();
)
catch
case e: Exception => print("Exception has occurred : "+e.getMessage)
ssc.start()
ssc.awaitTermination()
object SQLContextSingleton
@transient private var instance: org.apache.spark.sql.SQLContext = _
def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext =
if (instance == null)
instance = new org.apache.spark.sql.SQLContext(sparkContext)
instance
首先我想知道我的方法是否正确[考虑到我需要 1 分钟的日志聚合]?
使用此代码似乎有问题:
-
此消费者将每 30 秒从 Kafka 代理拉取数据
并将 30 的最终聚合保存到 Elasticsearch
sec 数据,因此增加了 Elasticsearch 中的行数
唯一键 [每分钟至少 2 个条目]。用户界面工具 [
假设 Kibana] 需要做进一步的聚合。如果我增加
轮询时间从 30 秒到 60 秒,然后需要很长时间
聚合,因此根本不会保持实时。
我想以这样一种方式实现它,在 ElasticSearch 中只有一个
每个键的行应该被保存。因此我想执行聚合
直到我没有在我的数据集中获得新的键值
从 Kafka 代理中撤出 [每分钟]。做完之后
一些谷歌搜索我发现这可以通过使用来实现
groupByKey() 和 updateStateByKey() 函数,但我不能
弄清楚我如何在我的情况下使用它[我应该转换 JSON
将行记录成一串具有平坦值的日志行,然后使用
这些功能有]?如果我将使用这些功能,那么什么时候
我应该将最终聚合值保存到 ElasticSearch 中吗?
还有其他方法可以实现吗?
您的快速帮助将不胜感激。
问候, 布佩什
【问题讨论】:
方法看起来不错。为了提高吞吐量,是否可以向 spark 添加更多执行器? 感谢您回复 maasg。您能否确认我列出的观点。 【参考方案1】:import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.streaming.Seconds, StreamingContext
object Main
def main(args: Array[String]): Unit =
val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(15))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)//,localhost:9094,localhost:9095"
val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val out = stream.map(record =>
record.value
)
val words = out.flatMap(_.split(" "))
val count = words.map(word => (word, 1))
val wdc = count.reduceByKey(_+_)
val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
wdc.foreachRDDrdd=>
val es = sqlContext.createDataFrame(rdd).toDF("word","count")
import org.elasticsearch.spark.sql._
es.saveToEs("wordcount/testing")
es.show()
ssc.start()
ssc.awaitTermination()
To see full example and sbt
apache-sparkscalahadoopkafkaapache-spark-sqlspark-streamingapache-spark-2.0elastic
【讨论】:
以上是关于如何使用火花流处理实时流数据/日志?的主要内容,如果未能解决你的问题,请参考以下文章