Docker Confluent Kafka HDFS Sink 正在运行但任务失败

Posted

技术标签:

【中文标题】Docker Confluent Kafka HDFS Sink 正在运行但任务失败【英文标题】:Docker Confluent Kafka HDFS Sink Running but Task Failed 【发布时间】:2019-01-25 12:14:29 【问题描述】:

我正在使用 Confluent Kafka all-in-one docker image 在 DigitalOcean 液滴上设置 Kafka。我能够成功运行 Kafka 并使用 Kafka Connect REST API 添加 HDFS 连接器。我用我的Cloudera CDH droplet 的 IP 替换 HOST_IP。

 curl -X POST \
  -H "Content-Type: application/json" \
  --data '
  "name": "hdfs-sink",
  "config": 
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://HOST_IP:8020",
    "flush.size": "3",
    "name": "hdfs-sink"
  ' \
  http://HOST_IP:8083/connectors

然后当我 curl Kafka Connect 获取 hdfs-sink 状态时,我在任务下的 JSON 响应中收到以下错误(服务状态正在运行但任务失败):

java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy 不是 io.confluent.kafka.serializers.subject.SubjectNameStrategy 的实例

更新 因此,我设法通过使用 5.0.0 而不是 cricket007 推荐的 beta(我很傻)来克服这个错误。

但是,当我实际尝试将数据发布到我的 HDFS 实例时,我收到了另一个错误。我正在使用 ksql-datagen 来生成假数据

docker-compose exec ksql-datagen ksql-datagen quickstart=users format=json topic=test_hdfs maxInterval=1000 \ propertiesFile=/etc/ksql/datagen.properties bootstrap-server=broker:9092


    "name": "hdfs-sink",
    "connector": 
        "state": "RUNNING",
        "worker_id": "connect:8083"
    ,
    "tasks": [
        "state": "FAILED",
        "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: test_hdfs\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n",
        "id": 0,
        "worker_id": "connect:8083"
    ],
    "type": "sink"

编辑 2

Avro ksql-datagen 的堆栈跟踪失败

Outputting 1000000 to test_hdfs
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing row to topic test_hdfs using Converter API
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
    at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:172)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:116)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
    at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)

编辑 3

好吧,由于某种原因,即使我使用 ksql-datagen 生成 avro 数据,我仍然在 Kafka Connect 上收到 JSON 序列化错误。

docker-compose exec ksql-datagen ksql-datagen schema=/impressions.avro format=avro schemaRegistryUrl=http://schema-registry:8081 key=impressionid topic=test_hdfs maxInterval=1000 \ propertiesFile=/etc/ksql/datagen.properties bootstrap-server=broker:9092

curl -X POST \
  -H "Content-Type: application/json" \
  --data '
  "name": "hdfs-sink",
  "config": 
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
    "tasks.max": "1",
    "schema.compatibility": "FULL",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://cdh.nuvo.app:8020",
    "flush.size": "3",
    "name": "hdfs-sink"
  ' \
  http://kafka.nuvo.app:8083/connectors

架构注册表配置

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Kafka 连接日志:

  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"impression_816"; line: 1, column: 29]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"impression_816"; line: 1, column: 29]

编辑 4

[2018-08-22 02:05:51,140] ERROR WorkerSinkTaskid=hdfs-sink-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs1
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-08-22 02:05:51,141] ERROR WorkerSinkTaskid=hdfs-sink-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-08-22 02:05:51,243] INFO Publish thread interrupted for client_id=consumer-8 client_type=CONSUMER session= cluster=lUWD_PR0RsiTkaunoUrUfA group=connect-hdfs-sink (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

【问题讨论】:

你能发布完整的堆栈跟踪吗?可能是您必须从连接日志中获取它,而不是其余的 api 听起来你有这个类的冲突版本github.com/confluentinc/schema-registry/blob/master/… 那些链接的撰写文件尚未更新为 5.0.0 版本,仍然是 beta。请使用发布版本 @dawsaw 明天将发布完整的堆栈跟踪。 @cricket_007 明天将尝试使用受支持的发行版。 【参考方案1】:

你设置ksql-datagen ... format=json

但错误表明您已在 Kafka Connect 中设置了 AvroConverter

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

查看您的 Compose 文件...

  CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

如果您想生成 Avro 数据而不是 refer to the ksql-datagen docs。

虽然您正在生成 JSON,但目前,这不是您的配置将放在 HDFS 上的内容。 Avro 是 HDFS Connect 的默认输出格式;如果您参考configuration documentation。

format.class 将数据写入 店铺。格式类实现 io.confluent.connect.storage.format.Format接口。

类型:类 默认值:io.confluent.connect.hdfs.avro.AvroFormat 重要性:高

这些类默认可用:

io.confluent.connect.hdfs.avro.AvroFormat io.confluent.connect.hdfs.json.JsonFormat io.confluent.connect.hdfs.parquet.ParquetFormat io.confluent.connect.hdfs.string.StringFormat

如果你不使用JsonFormat,我相信为了从JSON输出Avro you need a JSON record that looks like so


  "schema": ...
  "payload": ...

否则,无法从 JSON 记录中推断出 Avro 架构。


通过您的一系列编辑,我认为您已转向生产 Avro,但根据我上面提到的内容使用 JsonConverter,这不是我的建议。基本上,Converter 类类型必须匹配生产者数据并定义消费者反序列化器

对于 id -1 的序列化错误,基本上是说 key 或 value 中的数据不是 Avro。现在,KSQL 不能完全使用 Avro 密钥,所以我敢打赌它是失败的密钥反序列化器。为了解决这个问题,设置

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

【讨论】:

@cricket007 我通过使用带有 Avro 格式消息的 ksql-datagen 在这方面取得了进展,但我遇到了一个问题`docker-compose exec ksql-datagen ksql-datagen schema=/impressions.avro format= avro key=impressionid topic=test_hdfs maxInterval=1000 \ propertiesFile=/etc/ksql/datagen.properties bootstrap-server=broker:9092 `Outputting 1000000 to test_hdfs Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing row to topic test_hdfs using Converter API 见编辑2 java.net.ConnectException: Connection refused... 您的架构注册表不起作用,或者您从未运行过。 哦,ksql-datagen,如果你通过format=avro,那么你需要通过schemaRegistryUrl=http://schema-registry:8081 @cricket007 我仍然收到 JSON 解析错误,即使我在任何地方都指定了 Avro。我唯一在想的是与内部有关#用于偏移量和配置数据的内部转换器是可配置的,必须指定,参见上面的编辑 3 好吧 1) 我在您的日志中看到 JsonConverter,但这不应该是您的 Compose 文件的一部分。 2) byte[] 数据不是 JSON 类型。最终,我最初不知道您的主题中有什么数据,但 Connect 正在尝试从 Kafka 主题中读取字节,然后最终以某种方式对其进行解析。如果您生成了 Avro 数据,那么您必须使用 AvroConverter。您可以更改的是 HDFS Connect 配置中的format.class

以上是关于Docker Confluent Kafka HDFS Sink 正在运行但任务失败的主要内容,如果未能解决你的问题,请参考以下文章

在 Linux 上使用 confluent-kafka-go 构建 Go 应用程序

我的 kafka docker 容器无法连接到我的 zookeeper docker 容器

Confluent 平台 Kafka Connect 在 137 号出口崩溃

使用confluent本地安装和使用kafka

Kafka快速入门——Confluent Kafka简介

Kafka快速入门——Confluent Kafka简介