尝试理解 Kafka Scala 语法
Posted
技术标签:
【中文标题】尝试理解 Kafka Scala 语法【英文标题】:Try to understand Kafka Scala Syntax 【发布时间】:2020-05-19 07:38:04 【问题描述】:我一直在 Kafka 项目的核心模块中浏览 Log
类的 Kafka 源代码,但我对 scala 仍然很陌生。
我遇到了一种很难理解的语法。
这是sn-ps的代码:
sn-p 1:
// Now do a second pass and load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
retryOnOffsetOverflow
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
logSegments.foreach(_.close())
segments.clear()
loadSegmentFiles()
sn-p2:
private[log] def retryOnOffsetOverflow[T](fn: => T): T =
while (true)
try
return fn// what is fn here in context of code snippet 1?
catch
case e: LogSegmentOffsetOverflowException =>
info(s"Caught segment overflow error: $e.getMessage. Split segment and retry.")
splitOverflowedSegment(e.segment)//##!!!1.return a List[Segement], but where does this return goes?
throw new IllegalStateException()
我发现很难理解的是,方法retryOnOffsetOverflow
在 sn-p 1 中是如何被调用的,以及传递给它的参数是什么? 我知道retryOnOffsetOverflow
的参数是一个函数,但是在这个sn-p 中传递给这个函数的参数是什么?
我也不清楚retryOnOffsetOverflow
在这里的返回值是什么?返回是T
,这是通用的吗?我不确定这里retryOnOffsetOverflow
的返回是什么,根据它是否捕获异常会有所不同吗?如果是这样,具体的回报分别是多少?
非常感谢您的解释,如果我错过了回答问题所需的代码,请告诉我。
更新:我会纠正我自己,retryOnOffsetOverflow
的参数是一个别名参数,除非并且直到它在方法主体中的某个位置被引用,否则它不会被评估。
【问题讨论】:
【参考方案1】:更新:最后部分略有更改,看起来它会在下一次循环迭代中加载“拆分”文件。
retryOnOffsetOverflow
的参数在这里是花括号内的所有内容 - 基本上,这三行代码 - 这是它接受的函数 (fn: => T)
。
retryOnOffsetOverflow
正在尝试执行已通过的函数,如果执行成功,则返回它的答案。有一点难以理解——当出现异常时,splitOverflowedSegment
被调用不是因为它的返回类型,而是因为它在replaceSegments
函数中“改变状态”这一事实。该段将在下一次循环迭代重新启动时读取,在 loadSegmentFiles
函数中。
【讨论】:
是的,非常感谢,这是一个清晰的解释。我看到在下一个循环中loadSegmentFiles ()
重新加载了拆分器文件。但还有一个问题是retryOnOffsetOverflow ()
中的循环何时结束?直到在方法的最后一行遇到异常?
直到fn
函数成功执行。如果它将完成,return
将被执行,从而打破循环。
好吧,我终于看到retryOnOffsetOverflow
方法的逻辑了,所以它一直在无限循环中执行fn
,并且每次fn
不能成功执行,它都会捕获异常并为下一轮循环更新段的状态,最后fn
将毫无例外地运行,这就是retryOnOffsetOverflow
结束的时候。我说的对吗?
是的,我认为你是正确的(至少我以同样的方式理解这段代码)。
另外,不确定最后一个“IllegalState”异常代表什么 - 我看不到可以引发此异常的路径。【参考方案2】:
我发现很难理解的是,如何在 sn-p 1 中调用 retryOnsetOverflow 方法,以及将什么作为其参数的参数传递给它?我知道 retryOnOffsetOverflow 的参数是一个函数,但是在这个 sn-p 中传递给这个函数的参数是什么?
考虑以下简化示例
def printTillSuccess[T](fn: => T): T =
while (true)
val result = fn
println(result)
return result
throw new Exception("dead end")
printTillSuccess("a")
printTillSuccess( "a" )
printTillSuccess "a"
fn: => T
不是函数,而是by-name 参数。它将在每个引用上进行评估,即在val result = fn
行。 Scala 中的函数有一个apply
方法,但这里不是这种情况。
您可以通过()
将值传递给方法,这在示例printTillSuccess("a")
中完成。
Scala 允许使用 包装任何代码块,最后一条语句将用作该块的结果。因此,
"a"
与 "a"
相同。
所以,现在您可以将"a"
传递给方法,因此printTillSuccess( "a" )
是一个有效的调用。
最后,Scala 允许在方法中用块定义 替换
()
,这会打开语法 printTillSuccess "a"
。
我也不清楚这里 retryOnsetOverflow 的返回是什么? 返回是 T ,这是一种泛型?我不确定是什么 这里retryOnOffsetOverflow的返回,会不会根据不同 事实上它是否捕获了异常?如果是这样,究竟会是什么 分别是回报?
返回类型是按名称参数的类型,它是T
。而声明return fn
是唯一定义T
的地方。
在LogSegmentOffsetOverflowException
的情况下,将调用catch 并执行splitOverflowedSegment
。这将改变一些内部状态,while(true)
的下一次迭代将再次评估 by-name 参数。因此,异常不会改变返回类型,但允许下一次迭代发生。
while
循环只有在fn
计算成功或抛出不同的异常时才能退出。无论如何,如果发生,返回将是T
。
【讨论】:
非常感谢您提供如此详尽且易于理解的教程,哈哈。我明白了很多。这真的很有帮助。好吧,我必须说 scala 是如此灵活。叹息。【参考方案3】:def retryOnOffsetOverflow[T](fn: => T): T
retryOnOffsetOverflow()
是一种采用单个“按名称”参数的方法,这意味着除非并且直到在方法主体的某处引用该参数,否则该参数不会被计算。
所以fn
可能是一个值,也可能是多行代码,并且直到这里try return fn
之前它都将保持未评估(未执行),在那里它被包裹在try
中执行。
fn
评估将产生某种类型的值。该类型是什么并不重要(我们称之为T
),但retryOnOffsetOverflow()
需要返回相同的类型。
【讨论】:
感谢您的回答。所以你的意思是retryOnOffsetOverflow()
需要返回与其参数相同的类型,即返回大括号中的所有语句? ` logSegments.foreach(_.close()) segments.clear() loadSegmentFiles()`
基本上是的。如果fn
返回一个值(或计算为一个值),则该T
类型的值将从retryOnOffsetOverflow()
返回。如果fn
抛出并被捕获(所以它一定是LogSegmentOffsetOverflowException
),那么splitOverflowedSegment()
必须返回与fn
应该返回的T
相同类型的值。以上是关于尝试理解 Kafka Scala 语法的主要内容,如果未能解决你的问题,请参考以下文章