使用 apache spark 流式处理实时日志

Posted

技术标签:

【中文标题】使用 apache spark 流式处理实时日志【英文标题】:real time log processing using apache spark streaming 【发布时间】:2015-04-23 16:36:03 【问题描述】:

我想创建一个可以实时读取日志的系统,并使用 apache spark 来处理它。如果我应该使用 kafka 或 flume 之类的东西将日志传递给 spark 流,或者我应该使用套接字传递日志,我感到很困惑。我已经浏览了 spark 流文档中的示例程序 - Spark stream example。但如果有人能指导我以更好的方式将日志传递给火花流,我将不胜感激。对我来说,这是一片新地盘。

【问题讨论】:

【参考方案1】:

虽然这是一个老问题,但发布一个来自 Databricks 的链接,该链接有一篇很棒的分步文章,用于考虑许多领域的 Spark 进行日志分析。

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/index.html

希望这会有所帮助。

【讨论】:

请从链接中添加一些内容 嗨罗伯特,恐怕这是一本 GitBook 并且有很好的章节来解释带有日志的火花流的底层用例。如果您正在寻找特定的东西,请告诉我,因为本书涵盖了大部分内容。【参考方案2】:

Apache Flume 可能有助于实时读取日志。 Flume 提供日志收集和传输到使用 Spark Streaming 分析所需信息的应用程序。

1. 从official site 下载Apache Flume 或按照here 的说明进行操作

2. 设置并运行 Flume 修改 Flume 安装目录(FLUME_INSTALLATION_PATH\conf)下的flume-conf.properties.template,这里需要提供日志source、channel和sinks(输出)。有关设置的更多详细信息here

有一个启动flume的例子,它从windows主机上运行的ping命令收集日志信息并将其写入文件:

flume-conf.properties

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.shell = powershell -Command

agent.sources.seqGenSrc.command = for()  ping google.com 

agent.sources.seqGenSrc.channels = memoryChannel

agent.sinks.loggerSink.type = file_roll

agent.sinks.loggerSink.channel = memoryChannel
agent.sinks.loggerSink.sink.directory = D:\\TMP\\flu\\
agent.sinks.loggerSink.serializer = text
agent.sinks.loggerSink.appendNewline = false
agent.sinks.loggerSink.rollInterval = 0

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 100

要运行示例,请转到 FLUME_INSTALLATION_PATH 并执行

java -Xmx20m -Dlog4j.configuration=file:///%CD%\conf\log4j.properties -cp .\lib\* org.apache.flume.node.Application -f conf\flume-conf.properties -n agent

或者,您可以创建在类路径中具有水槽库的 Java 应用程序,并从传递相应参数的应用程序调用 org.apache.flume.node.Application 实例。

如何设置 Flume 来收集和传输日志?

您可以使用一些脚本从指定位置收集日志

agent.sources.seqGenSrc.shell = powershell -Command
agent.sources.seqGenSrc.command = your script here

您也可以启动提供智能日志收集的 java 应用程序(在字段中输入“java path_to_main_class arguments”),而不是 windows 脚本。例如,如果文件被实时修改,您可以使用来自 Apache Commons IO 的Tailer。 要配置 Flume 以传输日志信息,请阅读此article

3. 从您的源代码中获取 Flume 流并使用 Spark 对其进行分析。 看看来自 github 的代码示例https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java

【讨论】:

【参考方案3】:

您可以使用 Apache Kafka 作为日志的队列系统。生成日志的系统(例如 websever)会将日志发送到 Apache KAFKA。然后就可以使用apachestorm或者spark流库来实时读取KAFKA topic并处理日志了。

您需要创建日志流,您可以使用 Apache Kakfa 创建。 kafka 与storm 和apache spark 有集成。两者各有利弊。

对于 Storm Kafka 集成,请查看 here

对于 Apache Spark Kafka 集成,请查看 here

【讨论】:

以上是关于使用 apache spark 流式处理实时日志的主要内容,如果未能解决你的问题,请参考以下文章

在 Apache Spark 中将批处理 RDD 的结果与流式 RDD 相结合

专业的力量 海量日志监控平台(Spark Streaming,实时流式处理系统)

Kafka流处理平台

sparkstreaming+flume+kafka实时流式处理完整流程

流式大数据处理的三种框架:Storm,Spark和Samza

聊聊批计算、流计算、Hadoop、Spark、Storm、Flink等等