KafkaOffsetMonitor源码及存储浅析
Posted wingooom
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KafkaOffsetMonitor源码及存储浅析相关的知识,希望对你有一定的参考价值。
KafkaOffsetMonitor简述
KafkaOffsetMonitor(下文简称KOM)是有由Kafka开源社区提供的一款Web管理界面,这个应用程序用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,可以浏览当前的消费者组,查看每个Topic的所有Partition的当前消费情况,浏览查阅Topic的历史消费信息等
KafkaOffsetMonitor 数据采集展现
数据采集源
Kafka源码中有定义对象ZkUtils(kafka-master\\core\\src\\main\\scala\\kafka\\utils):
而KOM本质上就是对ZkUtils中的这些属性的读取操作。
web实现
KOM是使用jetty作为web容器的,通过angular.js来实现类似MVC功能的。
getGroups具体流程分析:
KOM中一些流程主要体现在app.js和controller.js中。
- 首先需要定义app.js文件,在KOM中的app.js文件为:
var app = angular.module('offsetapp',
["offsetapp.controllers", "offsetapp.directives", "ngRoute"],
function($routeProvider)
$routeProvider
.when("/",
templateUrl: "views/grouplist.html",
controller: "GroupListCtrl"
)
.when("/group/:group",
templateUrl: "views/group.html",
controller: "GroupCtrl"
)
.when("/group/:group/:topic",
templateUrl: "views/topic.html",
controller: "TopicCtrl"
)
......
;;
);
angular.module("offsetapp.services", ["ngResource"])
.factory("offsetinfo", ["$resource", "$http", function($resource, $http)
function groupPartitions(cb)
return function(data)
var groups = _(data.offsets).groupBy(function(p)
var t = p.timestamp;
if(!t) t = 0;
return p.group+p.topic+t.toString();
);
groups = groups.values().map(function(partitions)
return
group: partitions[0].group,
topic: partitions[0].topic,
partitions: partitions,
logSize: _(partitions).pluck("logSize").reduce(function(sum, num)
return sum + num;
),
offset: _(partitions).pluck("offset").reduce(function(sum, num)
return sum + num;
),
timestamp: partitions[0].timestamp
;
).value();
data.offsets = groups;
cb(data);
;
return
getGroup: function(group, cb)
return $resource("./group/:group").get(group:group, groupPartitions(cb));
,
......
;
]);
- 下面是controller.js文件:
angular.module('offsetapp.controllers',["offsetapp.services"])
.controller("GroupCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo",
function($scope, $interval, $routeParams, offsetinfo)
offsetinfo.getGroup($routeParams.group, function(d)
$scope.info = d;
$scope.loading=false;
);
$scope.loading=true;
$scope.group = $routeParams.group;
])
.controller("GroupListCtrl", ["$scope", "offsetinfo",
function($scope, offsetinfo)
$scope.loading = true;
offsetinfo.listGroup().success(function(d)
$scope.loading=false;
$scope.groups = d;
);
])
......
;
- index.html部分代码块
<!-- Collect the nav links, forms, and other content for toggling -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="#">Consumer Groups</a></li>
<li><a href="/#/topics">Topic List</a></li>
<li class="dropdown">
<a href="javascript:void(0)" class="dropdown-toggle" data-toggle="dropdown">Visualizations <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/#/activetopicsviz">Active Topic Consumers</a></li>
<li><a href="/#/clusterviz">Cluster Overview</a></li>
</ul>
</li>
</ul>
</div><!-- /.navbar-collapse -->
其中”#”表示访问项目根目录:对比app.js文件的
.when("/", templateUrl: "views/grouplist.html",controller: "GroupListCtrl")
表示当访问项目根目录时使用的模板文件是grouplist.html,
<div class="page-header">
<h1>Please select the group you would like to monitor</h1>
</div>
<div class="alert alert-info" ng-show="loading">
Loading ...
</div>
<ul class="list-group">
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/g">g</a></li>
</ul>
使用的controller是GroupListCtrl,继续看controller.js中的GroupListCtrl定义:
.controller("GroupListCtrl", ["$scope", "offsetinfo", function($scope, offsetinfo)
$scope.loading = true;
offsetinfo.listGroup().success(function(d)
$scope.loading=false;
$scope.groups = d;
);
])
会调用offsetinfo.listGroup()方法,再到app.js文件中查看listGroup方法定义:
listGroup: function() return $http.get("./group");
这个时候会使用http模块映射到group这个path上,到这里就要看scala的代码了,进到OffsetGetterWeb.scala中,该类继承了UnfilteredWebApp类,在UnfilteredWebApp中定义了启动方法.
继续看group这个path的定义:
case GET(Path(Seg("group" :: Nil))) =>
JsonContent ~> ResponseString(write(getGroups(args)))
调用getGroups方法首先会初始化zkClient和使用zkClient构造OffsetGetter类,接着调用OffsetGetter的getGroups方法:
def getGroups: Seq[String] =
try
ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath)
catch
case NonFatal(t) =>
error(s"could not get groups because of $t.getMessage", t)
Seq()
也就是说getGroups就是读取zookeeper中的/consumers目录的数据,读取完成之后通过$scope.groups = d;代码将结果赋给$scope.groups,这样grouplist.html中就可以通过遍历groups来得到每个group了:
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/g">g</a></li>
注:ng-repeat 指令用于循环输出指定次数的 HTML 元素
得到所有的groups之后,通过./#/group/链接可以访问每个group的具体信息。
数据采集周期
kafka监控的采集周期,也就是刷新时间refresh,还有保留时间retain,是在启动时指定的,默认是10s刷新一次,数据保留2天
注:KOM的运行需要通过sbt assembly进行编译打包
OffsetGetterWeb中定时任务方法:
def schedule(args: OWArgs)
def retryTask[T](fn: => T)
try
retry(3)
fn
catch
case NonFatal(e) =>
error("Failed to run scheduled task", e)
timer.scheduleAtFixedRate(new TimerTask()
override def run()
retryTask(writeToDb(args))
, 0, args.refresh.toMillis)
timer.scheduleAtFixedRate(new TimerTask()
override def run()
retryTask(args.db.emptyOld(System.currentTimeMillis - args.retain.toMillis))
, args.retain.toMillis, args.retain.toMillis)
def writeToDb(args: OWArgs)
val groups = getGroups(args)
groups.foreach
g =>
val inf = getInfo(g, args).offsets.toIndexedSeq
info(s"inserting $inf.size")
args.db.insertAll(inf)
DB写操作:每执行一次(采集)刷新,就会执行一次写操作(insertAll),一次清除旧数据的操作(emptyOld).
DB读操作:监控的读操作只有在查询历史信息(offsetHistory)时才查询DB,其他的数据都是实时的数据。
def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession
implicit s =>
val o = offsets
.where(off => off.group === group && off.topic === topic)
.sortBy(_.timestamp)
.map(_.forHistory)
.list()
OffsetHistory(group, topic, o)
数据库存储
数据库sqlite
KOM的数据库采用sqlite
val database = Database.forURL(s"jdbc:sqlite:$dbfile.db",
driver = "org.sqlite.JDBC")
默认数据库文件位置:…/KafkaOffsetMonitor-master/offsetapp.db
数据库字段
表名:OFFSETS
存储字段:
字段名 | 字段类型 | 说明 | 是否可空 |
---|---|---|---|
id | INTEGER | PRIMARY KEY、AUTOINCREMEN | NOT NULL |
group | VARCHAR(254) | 分组 | NOT NULL |
topic | VARCHAR(254) | 话题 | NOT NULL |
partition | INTEGER | 分区编号 | NOT NULL |
offset | BIGINT | 偏移量 | NOT NULL |
log_size | BIGINT | 分区内已接收消息总量 | NOT NULL |
owner | VARCHAR(254) | 所属者 | 可为null |
timestamp | BIGINT | 时间戳 | NOT NULL |
creation | BIGINT | 创建时间 | NOT NULL |
modified | BIGINT | 最新更新时间 | NOT NULL |
数据库索引:
def idx = index("idx_search", (group, topic))
def tidx = index("idx_time", (timestamp))
def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true)
存储性能及改造分析
因为KOM可以配置sqlite数据保留时间,定期清除过期数据,具体的存储性能跟存储时间和存储量有关,需要根据需求测试评估。
但基于sqlite本身特性:主打轻便,基于文件
因此对于大规模存储(>100W)性能欠佳,会导致页面加载较慢,且不支持分布式,没有用户管理。
mysql的功能完全能覆盖Sqlite,若改造则需要将源码中的OffsetDB.scala文件中对数据库操作的函数(insert,insertAll,emptyOld,offsetHistory,maybeCreate)进行改写。
难点分析:要求熟悉scala语言,熟悉scala对mysql的操作。
以上是关于KafkaOffsetMonitor源码及存储浅析的主要内容,如果未能解决你的问题,请参考以下文章
Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用