如何使用火花流处理实时流数据/日志?

Posted

技术标签:

【中文标题】如何使用火花流处理实时流数据/日志?【英文标题】:How to work with real time streaming data/logs using spark streaming? 【发布时间】:2016-04-19 06:28:34 【问题描述】:

我是 Spark 和 Scala 的新手。

我想实现一个实时 Spark Consumer,它可以从 Kafka Publisher 每分钟读取网络日志 [获取大约 1GB 的 JSON 日志行/分钟],最后将聚合值存储在 ElasticSearch 中。

聚合基于几个值 [如 bytes_in、bytes_out 等],使用复合键 [如:客户端 MAC、客户端 IP、服务器 MAC、服务器 IP 等]。

我写的 Spark Consumer 是:

object LogsAnalyzerScalaCS
    def main(args : Array[String]) 
          val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION")
          sparkConf.set("es.nodes", "my ip address")
          sparkConf.set("es.port", "9200")
          sparkConf.set("es.index.auto.create", "true")
          sparkConf.set("es.nodes.discovery", "false")

          val elasticResource = "conrec_1min/1minute"
          val ssc = new StreamingContext(sparkConf, Seconds(30))
          val zkQuorum = "my zk quorum IPs:2181"
          val consumerGroupId = "LogsConsumer"
          val topics = "Logs"
          val topicMap = topics.split(",").map((_,3)).toMap
          val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap)
          val logJSON = json.map(_._2)
          try
            logJSON.foreachRDD( rdd =>
              if(!rdd.isEmpty())
                  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
                  import sqlContext.implicits._
                  val df = sqlContext.read.json(rdd)
                  val groupedData = 
((df.groupBy("id","start_time_formated","l2_c","l3_c",
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration"))
                  val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time")
                  dataForES.saveToEs(elasticResource)
                  dataForES.show();
                
              )
             
          catch
            case e: Exception => print("Exception has occurred : "+e.getMessage)
          
          ssc.start()
          ssc.awaitTermination()
        

object SQLContextSingleton 
    @transient  private var instance: org.apache.spark.sql.SQLContext = _
    def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext = 
      if (instance == null) 
        instance = new org.apache.spark.sql.SQLContext(sparkContext)
      
      instance
    
  

首先我想知道我的方法是否正确[考虑到我需要 1 分钟的日志聚合]?

使用此代码似乎有问题:

    此消费者将每 30 秒从 Kafka 代理拉取数据 并将 30 的最终聚合保存到 Elasticsearch sec 数据,因此增加了 Elasticsearch 中的行数 唯一键 [每分钟至少 2 个条目]。用户界面工具 [ 假设 Kibana] 需要做进一步的聚合。如果我增加 轮询时间从 30 秒到 60 秒,然后需要很长时间 聚合,因此根本不会保持实时。 我想以这样一种方式实现它,在 ElasticSearch 中只有一个 每个键的行应该被保存。因此我想执行聚合 直到我没有在我的数据集中获得新的键值 从 Kafka 代理中撤出 [每分钟]。做完之后 一些谷歌搜索我发现这可以通过使用来实现 groupByKey() 和 updateStateByKey() 函数,但我不能 弄清楚我如何在我的情况下使用它[我应该转换 JSON 将行记录成一串具有平坦值的日志行,然后使用 这些功能有]?如果我将使用这些功能,那么什么时候 我应该将最终聚合值保存到 ElasticSearch 中吗? 还有其他方法可以实现吗?

您的快速帮助将不胜感激。

问候, 布佩什

【问题讨论】:

方法看起来不错。为了提高吞吐量,是否可以向 spark 添加更多执行器? 感谢您回复 maasg。您能否确认我列出的观点。 【参考方案1】:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.streaming.Seconds, StreamingContext

object Main 
def main(args: Array[String]): Unit = 


val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(15))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "group1",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)//,localhost:9094,localhost:9095"

val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val out = stream.map(record =>
  record.value
)

val words = out.flatMap(_.split(" "))
val count = words.map(word => (word, 1))
val wdc = count.reduceByKey(_+_)

val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())

wdc.foreachRDDrdd=>
        val es = sqlContext.createDataFrame(rdd).toDF("word","count")
        import org.elasticsearch.spark.sql._
        es.saveToEs("wordcount/testing")
  es.show()


ssc.start()
ssc.awaitTermination()

 

To see full example and sbt

apache-sparkscalahadoopkafkaapache-spark-sqlspark-streamingapache-spark-2.0elastic

【讨论】:

以上是关于如何使用火花流处理实时流数据/日志?的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花流中刷新加载的数据帧内容?

如何使用火花流检查 rdd 是不是为空?

有人可以建议使用火花流进行日志分析的最佳方法吗

如何将火花流 DF 写入 Kafka 主题

如何使用火花流计算流中的新元素

如何更新火花流中的广播变量?