自动化基于Spark streaming的SQL服务实时自动化运维
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自动化基于Spark streaming的SQL服务实时自动化运维相关的知识,希望对你有一定的参考价值。
设计背景
spark thriftserver目前线上有10个实例,以往通过监控端口存活的方式很不准确,当出故障时进程不退出情况很多,而手动去查看日志再重启处理服务这个过程很低效,故设计利用Spark streaming去实时获取spark thriftserver的log,通过log判断服务是否停止服务,从而进行对应的自动重启处理,该方案能达到秒级 7 * 24h不间断监控及维护服务。
设计架构
- 在需要检测的spark thriftserver服务节点上部署flume agent来监控日志流 (flume使用interceptor给日志加host信息)
- flume收集的日志流打入kafka
- spark streaming接收kafka的日志流,根据自定义关键词检测日志内容,如果命中关键字则认为服务不可用,把该日志对应的host信息打入mysql
- 写一个shell脚本从mysql读取host信息,执行重启服务操作
软件版本及配置
spark 2.0.1, kafka 0.10, flume 1.7
1)flume配置及命令:
修改flume-conf.properties
agent.sources = sparkTS070 agent.channels = c agent.sinks = kafkaSink # For each one of the sources, the type is defined agent.sources.sparkTS070.type = TAILDIR agent.sources.sparkTS070.interceptors = i1 agent.sources.sparkTS070.interceptors.i1.type = host agent.sources.sparkTS070.interceptors.i1.useIP = false agent.sources.sparkTS070.interceptors.i1.hostHeader = agentHost # The channel can be defined as follows. agent.sources.sparkTS070.channels = c agent.sources.sparkTS070.positionFile = /home/hadoop/xu.wenchun/apache-flume-1.7.0-bin/taildir_position.json agent.sources.sparkTS070.filegroups = f1 agent.sources.sparkTS070.filegroups.f1 = /data1/spark/logs/spark-hadoop-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop070.dx.com.out # Each sink‘s type must be defined agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.kafka.topic = mytest-topic1 agent.sinks.kafkaSink.kafka.bootstrap.servers = 10.87.202.51:9092 agent.sinks.kafkaSink.useFlumeEventFormat = true #Specify the channel the sink should use agent.sinks.kafkaSink.channel = c # Each channel‘s type is defined. agent.channels.c.type = memory
运行命令:
nohup bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,LOGFILE &
2)kafka配置及执行命令:
修改config/server.properties
broker.id=1 listeners=PLAINTEXT://10.87.202.51:9092 log.dirs=/home/hadoop/xu.wenchun/kafka_2.11-0.10.0.1/kafka.log zookeeper.connect=10.87.202.44:2181,10.87.202.51:2181,10.87.202.52:2181 1 2 3 4
运行命令
nohup bin/kafka-server-start.sh config/server.properties &
spark streaming执行命令 :
/opt/spark-2.0.1-bin-2.6.0/bin/spark-submit --master yarn-cluster --num-executors 3 --class SparkTSLogMonito
3)shell脚本
写一个shell脚本从mysql读取host信息,执行重启服务操作
spark streaming监控job的核心代码
这类分享spark streaming代码,以下代码经过一些坑摸索出来验证可用。
stream.foreachRDD { rdd => rdd.foreachPartition { rddOfPartition => val conn = ConnectPool.getConnection println(" conn:" + conn) conn.setAutoCommit(false) //设为手动提交 val stmt = conn.createStatement() rddOfPartition.foreach { event => val body = event.value().get() val decoder = DecoderFactory.get().binaryDecoder(body, null) val result = new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent]).read(null, decoder) val hostname = result.getHeaders.get(new Utf8("agentHost")) val text = new String(result.getBody.array()) if (text.contains("Broken pipe") || text.contains("No active SparkContext")) { val dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmssSSS") val id = dateFormat.format(new Date()) + "_" + (new util.Random).nextInt(999) stmt.addBatch("insert into monitor(id,hostname) values (‘" + id + "‘,‘" + hostname + "‘)") println("insert into monitor(id,hostname) values (‘" + id + "‘,‘" + hostname + "‘)") } } stmt.executeBatch() conn.commit() conn.close() } }
以上是一个实时处理的典型入门应用,刚好遇到这类监控运维问题,于是采用该方案进行处理,效果不错。
以上是关于自动化基于Spark streaming的SQL服务实时自动化运维的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询需要很长时间才能执行
Spark SQL - 在 Spark Streams 上部署 SQL 查询的选项