UDF 导致警告:CachedKafkaConsumer 未在 UninterruptibleThread 中运行 (KAFKA-1894)

Posted

技术标签:

【中文标题】UDF 导致警告:CachedKafkaConsumer 未在 UninterruptibleThread 中运行 (KAFKA-1894)【英文标题】:UDF cause warning: CachedKafkaConsumer is not running in UninterruptibleThread (KAFKA-1894) 【发布时间】:2018-06-26 00:16:37 【问题描述】:

在通常的structured_kafka_wordcount.py 代码中,

当我通过udf 将行拆分为单词时,如下所示,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)

警告将继续显示:

WARN CachedKafkaConsumer: CachedKafkaConsumer 没有运行 不间断线程。当 CachedKafkaConsumer 的方法时它可能会挂起 因 KAFKA-1894 而中断

另一方面,当我通过pyspark.sql.functions.split 将行拆分为单词时,一切正常。

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)

为什么会发生这种情况以及如何解决警告?

这是我在实践中尝试执行的代码:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)


def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))

【问题讨论】:

【参考方案1】:

除了拒绝 Python UDF *,您对代码中的这个问题无能为力。正如您在异常消息中看到的那样,UninterruptibleThread 是 Kafka 错误 (KAFKA-1894) 的解决方法,旨在防止在中断 KafkaConsumer 时出现无限循环。

它不与PythonUDFRunner 一起使用(在此处引入特殊情况可能没有意义)。

除非您遇到一些相关问题,否则我个人不会担心。你的 Python 代码永远不会直接与 KafkaConsumer 交互。如果您遇到任何问题,应该在上游修复 - 在这种情况下,我建议创建一个 JIRA ticket。


* 你的 unfold 函数可以用 SQL 函数重写,但这将是一个 hack。将消息计数添加为整数:

from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when

p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"

lines = spark.createDataFrame(
    ["asd message repeated 3 times: [ 12]", "some other message"], "string"
)

lines_with_count = lines.withColumn(
   "message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))

用它来explode

exploded = lines_with_count.withColumn(
     "i", 
      expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")

并提取:

exploded.withColumn(
    "value",
    when(
        col("value").rlike(p),
         concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
    ).otherwise(col("value"))).show(4, False)


# +------------------+
# |value             |
# +------------------+
# |asd 12            |
# |asd 12            |
# |asd 12            |
# |some other message|
# +------------------+

【讨论】:

以上是关于UDF 导致警告:CachedKafkaConsumer 未在 UninterruptibleThread 中运行 (KAFKA-1894)的主要内容,如果未能解决你的问题,请参考以下文章

Pig 的 UDF 中存在“in”会导致问题

SQL Server 2008 R2 - 标量 UDF 导致无限循环

UDF 元组包导致错误“Long 不能转换为元组”

鉴于我将 DataBag 溢出到磁盘,为啥此 Pig UDF 会导致“错误:Java 堆空间”?

SPARK 3.1.2 Driver端下载UDF jar包导致磁盘爆满

在表值 UDF 中断言参数