kafka s3 sink连接器在获取NULL数据时崩溃

Posted

技术标签:

【中文标题】kafka s3 sink连接器在获取NULL数据时崩溃【英文标题】:kafka s3 sink connector crashed when It gets NULL data 【发布时间】:2020-09-11 01:03:14 【问题描述】:

在源连接器发送 NULL 值之前,我有一个工作的 s3 接收器连接器; s3 连接器崩溃。当我从 MS SQL 数据库中删除一条记录时出现了问题。源连接器将删除信息发送到 s3 连接器,而 s3 连接器崩溃。我用不同的名称删除并重新创建了 s3 连接器,没有任何改变。

    org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTaskid=minio-connector1-0 Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadataoffset=16, leaderEpoch=null, metadata='' -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-minio-connector1-0]
2020-05-24 10:10:50,577 ERROR WorkerSinkTaskid=minio-connector1-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        ... 10 more
2020-05-24 10:10:50,577 ERROR WorkerSinkTaskid=minio-connector1-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]

...这是我的 s3 连接器配置:

    apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: "minio-connector1"
  labels:
    strimzi.io/cluster: mssql-minio-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    storage.class: io.confluent.connect.s3.storage.S3Storage  
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    tasks.max: '1'
    topics: filesql1.dbo.Files
    s3.bucket.name: dosyalar
    s3.part.size: '5242880'
    flush.size: '2'
    format: binary
    schema.compatibility: NONE
    max.request.size: "536870912"
    store.url: http://minio.dev-kik.io
    format.class: io.confluent.connect.s3.format.avro.AvroFormat
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    internal.key.converter: org.apache.kafka.connect.json.JsonConverter
    internal.value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

我有两个问题:

1) 如何让 s3 连接器再次运行?

2) 不能期望不从源数据库中删除记录。如何防止 s3 连接器再次崩溃?

【问题讨论】:

【参考方案1】:

请查看连接器文档并查找behavior.on.null.values。您可以将其设置为ignore

【讨论】:

不幸的是,S3 接收器连接器不支持空值。备份压缩主题时完全限制它可以传输的内容。事实上,Confluent 建议使用 S3 连接器来压缩 Schema Registry 的 _schema 主题。这意味着压缩主题的删除值不会保存到 S3,从而失去支持架构注册表硬删除的能力(当您恢复硬删除时,由于 S3 连接器忽略空值而丢失)。 此键也适用于 io.confluent.connect.gcs.GcsSinkConnector 类。

以上是关于kafka s3 sink连接器在获取NULL数据时崩溃的主要内容,如果未能解决你的问题,请参考以下文章

Kafka连接s3 sink多个分区

Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为

在 Confluent S3 Kafka 连接器中压缩 Avro 数据

如何在 Kafka Sink 中为不同环境定义 s3bucket 的名称

kafka s3融合连接器 - 将json上传为字符串

用于 S3 中 PARQUET 格式的 Kafka S3 源连接器