如何开始调试Kafka源码
Posted 背风衣人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何开始调试Kafka源码相关的知识,希望对你有一定的参考价值。
章节目录
参考链接
下载地址
安装Scala环境
Kafka运行需要 scala 环境。
从 GitHub 选择 release 版本后,从下方跳转到官方安装包下载页面,我打算在 Windows 系统上进行调试,所以下载的是 scala-2.13.10.msi
包,下载完成后直接双击打开开始安装。
笔记:
没有意外的话,默认 win10 的 scala 安装目录是:
C:\\Program Files (x86)\\scala
安装完成后,可以在系统环境变量 Path
中找到新增的 scala 目录下的 bin
目录。
说明安装成功。
在 cmd 命令终端验证 scala:
C:\\Users\\XXXX>scala
Welcome to Scala 2.13.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251).
Type in expressions for evaluation. Or try :help.
scala>
至此安装完成。
Idea 配置
在 Idea 里配置 scala。
在 Idea 中安装插件 Scala
,如果安装失败,可以考虑手动下载安装:Idea Scala 插件手动安装下载地址
注意:
Idea安装插件Scala
其实不需要上面安装系统环境变量的步骤,但是 IdeaScala
插件仅支持在Idea内部使用,命令行无法使用。
安装Gradle环境
- 安装包:
gradle-8.0-bin.zip
- 源码包:
gradle-8.0-src.zip
- 包含以上全部内容的包:
gradle-8.0-all.zip
如果不是对 gradle 源码有兴趣的,只需要选择 gradle-8.0-bin.zip
。
解压缩到目录(例如:``)
增加系统变量:GRADLE_HOME=C:\\Program Files (x86)\\gradle\\gradle-8.0
增加Path变量:%GRADLE_HOME%\\bin
确定后,进入cmd验证:
C:\\Users\\XXXX>gradle -v
------------------------------------------------------------
Gradle 8.0
------------------------------------------------------------
Build time: 2023-02-13 13:15:21 UTC
Revision: 62ab9b7c7f884426cf79fbedcf07658b2dbe9e97
Kotlin: 1.8.10
Groovy: 3.0.13
Ant: Apache Ant(TM) version 1.10.11 compiled on July 10 2021
JVM: 1.8.0_251 (Oracle Corporation 25.251-b08)
OS: Windows 10 10.0 amd64
启动Zookeeper
从 Zookeeper GitHub 仓库 克隆源码到本地,用Idea运行。
提示:
本文不会涉及到 Zookeeper 的源码,所以其实可以下载安装包直接运行,不用克隆 git 仓库。
只是基于作者自己的学习需求,所以克隆了 Zookeeper 的源码
注意:
Zookeeper一定要和Kafka服务运行在同一个域名(测试的时候部署在同一个主机用localhost
访问最简单了)下,不然的话,可能出现某些意向不到的数据交互问题,并最终导致莫名其妙的各种超时报错。
编译
- maven -> Profiles 切换到
full-build
- maven -> Apache ZooKeeper -> Lifecycle -> compile
修改配置
- 复制
config/zoo_sample.cfg
到同目录下,重命名为zoo.cfg
- 修改
zoo.cfg
:- windows系统下的绝对路径:
dataDir=F:\\\\workbox\\\\github\\\\zookeeper\\\\tmp\\\\zookeeper
admin.serverPort=8083
- windows系统下的绝对路径:
启动
启动类为 org.apache.zookeeper.server.quorum.QuorumPeerMain
启动配置补充:Program arguments
为 F:\\workbox\\github\\zookeeper\\conf\\zoo.cfg
启动报错:java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
at org.apache.zookeeper.server.persistence.Util.makeSnapshotName(Util.java:97)
at org.apache.zookeeper.server.persistence.FileTxnSnapLog.save(FileTxnSnapLog.java:478)
at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:300)
at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:285)
at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:531)
at org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:704)
at org.apache.zookeeper.server.NioserverCnxnFactory.startup(NIOServerCnxnFactory.java:744)
at org.apache.zookeeper.server.ServerCnxnFactory.startup(ServerCnxnFactory.java:130)
at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:161)
at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:113)
at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:68)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:141)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:91)
Caused by: java.lang.ClassNotFoundException: org.xerial.snappy.SnappyInputStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 common frames omitted
其他的还有例如:
java.lang.NoClassDefFoundError: com/codahale/metrics/Reservoir
java.lang.NoClassDefFoundError: org/eclipse/jetty/server/HttpConfiguration$Customizer
java.lang.NoClassDefFoundError: org/xerial/snappy/SnappyInputStream
解决方案:遇到这类报错,需要修改 pom.xml
,注释掉相应依赖的<scope>provided</scope>
就不一一列举了,我一共注释了这些 <scope>provided</scope>
:
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
启动报错:Failed to bind to /0.0.0.0:8080
java.io.IOException: Failed to bind to /0.0.0.0:8080
at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:349)
at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:310)
at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:234)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
at org.eclipse.jetty.server.Server.doStart(Server.java:401)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
at org.apache.zookeeper.server.admin.JettyAdminServer.start(JettyAdminServer.java:185)
at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:155)
at org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:113)
at org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:68)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:141)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:91)
Caused by: java.net.BindException: Address already in use: bind
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:344)
... 12 common frames omitted
解决方案:zoo.cfg
配置文件中增加配置admin.serverPort=8083
,端口号取一个不冲突的即可。
启动Kafka
从 Kafka GitHub 仓库 克隆源码到本地,用Idea运行。
刚开始 gradle 会下载所有依赖包,需要耐心等待。最终可以看到一个gradle
目录
切换 git 分支到指定版本,例如腾讯云目前采用 1.1.1
版本的kafka,那么我们为了排查该版本的问题,那就要将源码分支切换到相应的版本。
git checkout 1.1
调整配置
config/server.properties
zookeeper.connect
改为本地zookeeper的域名和端口号
build.gradle
project(':core')
dependencies
testCompile libs.slf4jlog4j
改为compile libs.slf4jlog4j
启动
Kafka服务启动类为core/src/main/scala/kafka/Kafka.scala
。
启动类配置如图:
启动日志如下:
21:00:35: Executing ':core:Kafka.main()'...
Starting Gradle Daemon...
Connected to the target VM, address: '127.0.0.1:29046', transport: 'socket'
Gradle Daemon started in 831 ms
> Configure project :
Building project 'core' with Scala version 2.11.12
> Task :clients:compileJava UP-TO-DATE
> Task :clients:processResources NO-SOURCE
> Task :clients:classes UP-TO-DATE
> Task :clients:determineCommitId UP-TO-DATE
> Task :clients:createVersionFile
> Task :clients:jar UP-TO-DATE
> Task :core:compileJava NO-SOURCE
Disconnected from the target VM, address: '127.0.0.1:29046', transport: 'socket'
> Task :core:compileScala UP-TO-DATE
> Task :core:processResources UP-TO-DATE
> Task :core:classes UP-TO-DATE
Connected to the target VM, address: 'localhost:29059', transport: 'socket'
> Task :core:Kafka.main()
[2023-02-15 21:00:40,858] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-15 21:00:41,061] INFO starting (kafka.server.KafkaServer)
[2023-02-15 21:00:41,061] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-15 21:00:41,071] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2023-02-15 21:00:41,081] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,081] INFO Client environment:host.name=KNX-DP-A0010 (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,081] INFO Client environment:java.version=1.8.0_251 (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,085] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,085] INFO Client environment:java.home=C:\\Program Files\\Java\\jdk1.8.0_251\\jre (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,085] INFO Client environment:java.class.path=F:\\workbox\\github\\kafka\\core\\build\\classes\\java\\main;F:\\workbox\\github\\kafka\\core\\build\\classes\\scala\\main;F:\\workbox\\github\\kafka\\core\\build\\resources\\main;F:\\workbox\\github\\kafka\\clients\\build\\libs\\kafka-clients-1.1.2-SNAPSHOT.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\com.fasterxml.jackson.core\\jackson-databind\\2.9.7\\e6faad47abd3179666e89068485a1b88a195ceb7\\jackson-databind-2.9.7.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\net.sf.jopt-simple\\jopt-simple\\5.0.4\\4fdac2fbe92dfad86aa6e9301736f6b4342a3f5c\\jopt-simple-5.0.4.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\com.yammer.metrics\\metrics-core\\2.2.0\\f82c035cfa786d3cbec362c38c22a5f5b1bc8724\\metrics-core-2.2.0.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\com.typesafe.scala-logging\\scala-logging_2.11\\3.8.0\\24b01e596b348c5caa195e44f6e22c32dfdd0c84\\scala-logging_2.11-3.8.0.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.scala-lang\\scala-reflect\\2.11.12\\2bb23c13c527566d9828107ca4108be2a2c06f01\\scala-reflect-2.11.12.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.scala-lang\\scala-library\\2.11.12\\bf5534e6fec3d665bd6419c952a929a8bdd4b591\\scala-library-2.11.12.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\com.101tec\\zkclient\\0.10\\c54d4b5a5e89af75a80b6d5857400165ce5188d0\\zkclient-0.10.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.apache.zookeeper\\zookeeper\\3.4.10\\8eebdbb7a9df83e02eaa42d0e5da0b57bf2e4da\\zookeeper-3.4.10.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.slf4j\\slf4j-log4j12\\1.7.25\\110cefe2df103412849d72ef7a67e4e91e4266b4\\slf4j-log4j12-1.7.25.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.slf4j\\slf4j-api\\1.7.25\\da76ca59f6a57ee3102f8f9bd9cee742973efa8a\\slf4j-api-1.7.25.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.lz4\\lz4-java\\1.4.1\\ad89b11ac280a2992d65e078af06f6709f1fe2fc\\lz4-java-1.4.1.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\org.xerial.snappy\\snappy-java\\1.1.7.1\\d5190b41f3de61e3b83d692322d58630252bc8c3\\snappy-java-1.1.7.1.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\com.fasterxml.jackson.core\\jackson-annotations\\2.9.0\\7c10d545325e3a6e72e06381afe469fd40eb701\\jackson-annotations-2.9.0.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\com.fasterxml.jackson.core\\jackson-core\\2.9.7\\4b7f0e0dc527fab032e9800ed231080fdc3ac015\\jackson-core-2.9.7.jar;C:\\Users\\KitmanLee\\.gradle\\caches\\modules-2\\files-2.1\\log4j\\log4j\\1.2.17\\5af35056b4d257e4b64b9e8069c0746e8b08629f\\log4j-1.2.17.jar (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,085] INFO Client environment:java.library.path=C:\\Program Files\\Java\\jdk1.8.0_251\\bin;C:\\Windows\\Sun\\Java\\bin;C:\\Windows\\system32;C:\\Windows;C:\\Program Files\\Python310\\Scripts\\;C:\\Program Files\\Python310\\;C:\\Program Files\\Java\\jdk1.8.0_251\\bin\\;C:\\Program Files\\Java\\jdk1.8.0_251\\jre\\bin\\;D:\\Program Files (x86)\\NetSarang\\Xshell 6\\;D:\\Program Files (x86)\\NetSarang\\Xftp 6\\;C:\\Program Files (x86)\\VMware\\VMware Workstation\\bin\\;C:\\Program Files (x86)\\Common Files\\Oracle\\Java\\javapath;D:\\workSoft\\apache-maven-3.6.3-bin\\apache-maven-3.6.3\\bin\\;C:\\Windows\\system32;C:\\Windows;C:\\Windows\\System32\\Wbem;C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\;C:\\Windows\\System32\\OpenSSH\\;C:\\Program Files\\Git\\cmd;C:\\ProgramData\\chocolatey\\bin;C:\\Program Files\\PuTTY\\;C:\\Program Files\\IDM Computer Solutions\\UltraEdit;C:\\android;C:\\Windows\\System32;D:\\Program Files (x86)\\Namp\\;C:\\Program Files (x86)\\scala\\bin;C:\\Program Files (x86)\\gradle\\gradle-8.0\\bin;C:\\Users\\KitmanLee\\AppData\\Local\\Microsoft\\WindowsApps;;C:\\Users\\KitmanLee\\AppData\\Local\\Programs\\Microsoft VS Code\\bin;C:\\Program Files (x86)\\Nmap;. (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,085] INFO Client environment:java.io.tmpdir=C:\\Users\\KITMAN~1\\AppData\\Local\\Temp\\ (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:os.name=Windows 10 (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:os.version=10.0 (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:user.name=KitmanLee (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:user.home=C:\\Users\\KitmanLee (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,086] INFO Client environment:user.dir=F:\\workbox\\github\\kafka (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,087] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@4e5ed836 (org.apache.zookeeper.ZooKeeper)
[2023-02-15 21:00:41,206] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2023-02-15 21:00:41,206] INFO Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-02-15 21:00:41,207] INFO Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2023-02-15 21:00:41,260] INFO Session establishment complete on server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181, sessionid = 0x1000287cb550000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2023-02-15 21:00:41,262] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2023-02-15 21:00:41,776] INFO Cluster ID = 7PKCJB4_SFaNfpSIb4VPJg (kafka.server.KafkaServer)
[2023-02-15 21:00:41,805] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 1.1-IV0
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 1.1-IV0
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 9092
principal.builder.class = null
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect = localhost:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2023-02-15 21:00:41,810] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 1.1-IV0
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 1.1-IV0
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 9092
principal.builder.class = null
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect = localhost:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2023-02-15 21:00:41,824] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2023-02-15 21:00:41,824] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2023-02-15 21:00:41,824] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2023-02-15 21:00:41,853] INFO Loading logs. (kafka.log.LogManager)
[2023-02-15 21:00:48,155] INFO Logs loading complete in 6301 ms. (kafka.log.LogManager)
[2023-02-15 21:00:48,160] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2023-02-15 21:00:48,160] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2023-02-15 21:00:48,271] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2023-02-15 21:00:48,288] INFO [SocketServer brokerId=0] Started 1 acceptor threads (kafka.network.SocketServer)
[2023-02-15 21:00:48,299] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-15 21:00:48,300] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-15 21:00:48,300] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-15 21:00:48,306] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2023-02-15 21:00:48,341] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2023-02-15 21:00:48,408] INFO Result of znode creation at /brokers/ids/0 is: OK (kafka.zk.KafkaZkClient)
[2023-02-15 21:00:48,409] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(KNX-DP-A0010,9092,ListenerName(PLAINTEXT),PLAINTEXT)) (kafka.zk.KafkaZkClient)
[2023-02-15 21:00:48,426] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-15 21:00:48,428] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-15 21:00:48,428] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-15 21:00:48,430] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
[2023-02-15 21:00:48,468] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
[2023-02-15 21:00:48,474] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2023-02-15 21:00:48,475] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2023-02-15 21:00:48,476] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2023-02-15 21:00:48,574] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:1000,blockEndProducerId:1999) by writing to Zk with path version 2 (kafka.coordinator.transaction.ProducerIdManager)
[2023-02-15 21:00:48,591] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-02-15 21:00:48,592] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2023-02-15 21:00:48,592] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-02-15 21:00:48,610] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2023-02-15 21:00:48,648] INFO [SocketServer brokerId=0] Started processors for 1 acceptors (kafka.network.SocketServer)
[2023-02-15 21:00:48,649] INFO Kafka version : 1.1.2-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-15 21:00:48,649] INFO Kafka commitId : cd15202179f339c4 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-15 21:00:48,650] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
尝试调试Kafka事务发送消息
建议断点
kafka.server.KafkaApis#handle
方法,因为这个是处理所有请求并多路复用到正确api的顶级方法,所有客户端到服务端的请求,都经过这里,然后根据策略模式转发到相应的委派方法。
最好,我们在调试的时候,断掉到相应的 case
即可。例如我们要调试事务发送消息,可以断点到以下几处:
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
该方法完整代码如下所示:
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request)
try
trace(s"Handling request:$request.requestDesc(true) from connection $request.context.connectionId;" +
s"securityProtocol:$request.context.securityProtocol,principal:$request.context.principal")
request.header.apiKey match
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
catch
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
finally
request.apiLocalCompleteTimeNanos = time.nanoseconds
写个测试服务来发送Kafka事务消息
Controller
@GetMapping("/tx-two")
@Transactional(rollbackFor = Exception.class, transactionManager = "kafkaTransactionManager")
public String sendTransactionTwo(@RequestParam("message") String message) throws InterruptedException
log.info("发送消息:", message);
senderService.sendTransactionTwo(message);
return "send transaction-one doing...";
Service
import com.leekitman.pangea.evolution.kafka.consumer.group.TransactionEventOne;
import com.leekitman.pangea.evolution.kafka.consumer.group.TransactionEventTwo;
import com.leekitman.pangea.evolution.kafka.controller.callback.TransactionOneCallback;
import com.leekitman.pangea.evolution.kafka.dao.ProcessEventRepository;
import com.leekitman.pangea.evolution.kafka本地idea配置kafka源码调试环境
windows下IntelliJ IDEA搭建kafka源码环境