无法使用镜头 kudu sink 连接器将数据从 kafka 主题插入或更新到 kudu 表

Posted

技术标签:

【中文标题】无法使用镜头 kudu sink 连接器将数据从 kafka 主题插入或更新到 kudu 表【英文标题】:unable to insert or upsert data from kafka topic to kudu table using lenses kudu sink connector 【发布时间】:2020-07-07 05:31:59 【问题描述】:

lenses kudu sink 连接器版本 = kafka-connect-kudu-1.2.3-2.1.0

kudu 表架构

CREATE TABLE IF NOT EXISTS table_name(
su_id bigint not null,
su_tenant_id int null,
su_bu_id int null,
su_user_type string null,
su_acpd_id int null,
su_user_code string null,
su_user_title string null,
su_first_name string not null,
su_middle_name string null,
su_last_name string null,
su_dob timestamp null,
su_doj timestamp null,
su_primary_position_id bigint null,
su_role_id int null,
su_masterdataref string null,
su_primary_address bigint null,
su_mobile_no string null,
su_email_id string null,
su_photo string null,
su_isactive boolean not null,
su_created_by bigint not null,
su_created_timestamp timestamp not null,
su_modified_by bigint null,
su_modified_timestamp timestamp null,
su_status string null,
flex_1 string null,
flex_2 string null,
flex_3 string null,
flex_4 string null,
flex_5 string null,
flex_6 string null,
flex_7 string null,
flex_8 string null,
flex_9 string null,
su_gender string null,
su_theme_id int null,
su_activated_timestamp timestamp not null,
su_deactivated_timestamp timestamp null,
su_level_id smallint null,
su_hierarchy_type string null,
su_user_type_id int null,
su_adh_id int null,
su_user_classification int null,
su_credit_limit decimal(18, 4) null,
su_culture_alov_id int null,
su_culture_al_id smallint null,
su_profile_image_file string null,
su_terms_isagree boolean not null,
su_terms_agreed_timestamp timestamp null,
primary key(su_id)
)
PARTITION BY HASH (su_id) PARTITIONS 3
STORED AS KUDU;

带有 key.converter.schemas.enable = false,value.converter.schemas.enable = false 的 Kafka 主题数据,


  "su_id": 1,
  "su_tenant_id": 0,
  "su_bu_id": 0,
  "su_user_type": "A",
  "su_acpd_id": null,
  "su_user_code": "sampletest",
  "su_user_title": null,
  "su_first_name": "test_data",
  "su_middle_name": null,
  "su_last_name": "",
  "su_dob": null,
  "su_doj": null,
  "su_primary_position_id": null,
  "su_role_id": 1,
  "su_masterdataref": "0",
  "su_primary_address": null,
  "su_mobile_no": null,
  "su_email_id": null,
  "su_photo": null,
  "su_isactive": true,
  "su_created_by": 1,
  "su_created_date": 1526324248760,
  "su_modified_by": 1,
  "su_modified_date": 1547137351267,
  "su_status": "I",
  "flex_1": null,
  "flex_2": null,
  "flex_3": null,
  "flex_4": null,
  "flex_5": null,
  "flex_6": null,
  "flex_7": null,
  "flex_8": null,
  "flex_9": null,
  "su_gender": null,
  "su_theme_id": 406,
  "su_activated_date": 1526324248760,
  "su_deactivated_date": null,
  "su_level_id": null,
  "su_hierarchy_type": null,
  "su_user_type_id": null,
  "su_adh_id": null,
  "su_user_classification": null,
  "su_credit_limit": null,
  "su_culture_alov_id": null,
  "su_culture_al_id": null,
  "su_profile_image_file": null,
  "su_terms_isagree": false,
  "su_terms_agreed_date": null

kudu sink 连接器配置:

配置:1


  "name": "snk_test",
    "config": 
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name AUTOCREATE DISTRIBUTEBY su_id INTO 3 BUCKETS AUTOEVOLVE"

配置:2


  "name": "snk_test",
    "config": 
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name "

使用这两个配置我收到以下错误

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat

org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n原因: org.apache.kafka.connect.errors.DataException:JsonConverter 与 schemas.enable 需要 \"schema\" 和 \"payload\" 字段,并且可能不需要 包含附加字段。如果您尝试反序列化纯 JSON 数据,在您的转换器配置中设置 schemas.enable=false。\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:348)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 还有 13 个\n

具有 key.converter.schemas.enable = true,value.converter.schemas.enable = true 的 Kafka 主题,


  "schema": 
    "type": "struct",
    "fields": [
      
        "type": "int64",
        "optional": false,
        "field": "su_id"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_tenant_id"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_bu_id"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_user_type"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_acpd_id"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_user_code"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_user_title"
      ,
      
        "type": "string",
        "optional": false,
        "field": "su_first_name"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_middle_name"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_last_name"
      ,
      
        "type": "int32",
        "optional": true,
        "name": "io.debezium.time.Date",
        "version": 1,
        "field": "su_dob"
      ,
      
        "type": "int32",
        "optional": true,
        "name": "io.debezium.time.Date",
        "version": 1,
        "field": "su_doj"
      ,
      
        "type": "int64",
        "optional": true,
        "field": "su_primary_position_id"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_role_id"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_masterdataref"
      ,
      
        "type": "int64",
        "optional": true,
        "field": "su_primary_address"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_mobile_no"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_email_id"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_photo"
      ,
      
        "type": "boolean",
        "optional": false,
        "field": "su_isactive"
      ,
      
        "type": "int64",
        "optional": false,
        "field": "su_created_by"
      ,
      
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_created_date"
      ,
      
        "type": "int64",
        "optional": true,
        "field": "su_modified_by"
      ,
      
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_modified_date"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_status"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_1"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_2"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_3"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_4"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_5"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_6"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_7"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_8"
      ,
      
        "type": "string",
        "optional": true,
        "field": "flex_9"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_gender"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_theme_id"
      ,
      
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_activated_date"
      ,
      
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_deactivated_date"
      ,
      
        "type": "int16",
        "optional": true,
        "field": "su_level_id"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_hierarchy_type"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_user_type_id"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_adh_id"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_user_classification"
      ,
      
        "type": "bytes",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": 
          "scale": "4",
          "connect.decimal.precision": "18"
        ,
        "field": "su_credit_limit"
      ,
      
        "type": "int32",
        "optional": true,
        "field": "su_culture_alov_id"
      ,
      
        "type": "int16",
        "optional": true,
        "field": "su_culture_al_id"
      ,
      
        "type": "string",
        "optional": true,
        "field": "su_profile_image_file"
      ,
      
        "type": "boolean",
        "optional": false,
        "field": "su_terms_isagree"
      ,
      
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "su_terms_agreed_date"
      
    ],
    "optional": true,
    "name": "mssql.dbo.table_name.Value"
  ,
  "payload": 
    "su_id": 1,
    "su_tenant_id": 0,
    "su_bu_id": 0,
    "su_user_type": "A",
    "su_acpd_id": null,
    "su_user_code": "sampletest1",
    "su_user_title": null,
    "su_first_name": "test_data",
    "su_middle_name": null,
    "su_last_name": "",
    "su_dob": null,
    "su_doj": null,
    "su_primary_position_id": null,
    "su_role_id": 1,
    "su_masterdataref": "0",
    "su_primary_address": null,
    "su_mobile_no": null,
    "su_email_id": null,
    "su_photo": null,
    "su_isactive": true,
    "su_created_by": 1,
    "su_created_date": 1526324248760,
    "su_modified_by": 1,
    "su_modified_date": 1547137351267,
    "su_status": "I",
    "flex_1": null,
    "flex_2": null,
    "flex_3": null,
    "flex_4": null,
    "flex_5": null,
    "flex_6": null,
    "flex_7": null,
    "flex_8": null,
    "flex_9": null,
    "su_gender": null,
    "su_theme_id": 406,
    "su_activated_date": 1526324248760,
    "su_deactivated_date": null,
    "su_level_id": null,
    "su_hierarchy_type": null,
    "su_user_type_id": null,
    "su_adh_id": null,
    "su_user_classification": null,
    "su_credit_limit": null,
    "su_culture_alov_id": null,
    "su_culture_al_id": null,
    "su_profile_image_file": null,
    "su_terms_isagree": false,
    "su_terms_agreed_date": null
  

kudu sink 连接器配置:

配置:1


  "name": "snk_test",
    "config": 
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name AUTOCREATE DISTRIBUTEBY su_id INTO 3 BUCKETS AUTOEVOLVE"

配置:2


  "name": "snk_test",
    "config": 
    "connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
    "topics": "mssql.dbo.table_name",
    "connect.kudu.schema.registry.url": "http://localhost:8081",
    "connect.kudu.master": "*.*.*.*:7051",
    "connect.kudu.kcql": "upsert into impala::test_db.table_name select * from mssql.dbo.table_name "

使用这两个配置我收到以下错误

org.apache.kafka.connect.errors.ConnectException:退出 WorkerSinkTask 由于不可恢复的异常。\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n原因: java.lang.RuntimeException: scala.MatchError: null\n\tat com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)\n\tat com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:83)\n\tat com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:64)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.handleTry(KuduWriter.scala:50)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.applyInsert(KuduWriter.scala:143)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.write(KuduWriter.scala:100)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$put$2.apply(KuduSinkTask.scala:68)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$put$2.apply(KuduSinkTask.scala:68)\n\tat scala.Option.foreach(Option.scala:257)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask.put(KuduSinkTask.scala:68)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 个以上\n原因:scala.MatchError: null\n\tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter$class.com$datamountaineer$streamreactor$connect$kudu$KuduConverter$$addFieldToRow(KuduConverter.scala:106)\n\tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter$$anonfun$convertToKuduUpsert$2.apply(KuduConverter.scala:48)\n\tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter$$anonfun$convertToKuduUpsert$2.apply(KuduConverter.scala:48)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:234)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:104)\n\tat com.datamountaineer.streamreactor.connect.kudu.KuduConverter$class.convertToKuduUpsert(KuduConverter.scala:48)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.convertToKuduUpsert(KuduWriter.scala:50)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.com$datamountaineer$streamreactor$connect$kudu$sink$KuduWriter$$handleSinkRecord$1(KuduWriter.scala:130)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KuduWriter.scala:138)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(KuduWriter.scala:138)\n\tat scala.collection.Iterator$$anon$11.next(Iterator.scala:410)\n\tat scala.collection.Iterator$$anon$11.next(Iterator.scala:410)\n\tat scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074)\n\tat scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)\n\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)\n\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)\n\tat scala.collection.Iterator$class.foreach(Ite​​rator.scala:891)\n\tat scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$$anonfun$1.apply$mcV$sp(KuduWriter.scala:141)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$$anonfun$1.apply(KuduWriter.scala:141)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$$anonfun$1.apply(KuduWriter.scala:141)\n\tat scala.util.Try$.apply(Try.scala:192)\n\tat com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.applyInsert(KuduWriter.scala:136)\n\t... 还有 16 个\n"

【问题讨论】:

【参考方案1】:

转换错误,

use key.converter=...AvroConverter

value.converter=...AvroConverter

设置

 key.converter.schema.registry.url=http://registry-host:8081
 value.converter.schema.registry.url=http://registry-host:8081

【讨论】:

以上是关于无法使用镜头 kudu sink 连接器将数据从 kafka 主题插入或更新到 kudu 表的主要内容,如果未能解决你的问题,请参考以下文章

无法将 Kafka 与 InfluxDB Sink Connector 连接

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

Kafka JDBC Sink 句柄数组数据类型

Confluent Kafka Sink Connector 未将数据加载到 Postgres 表

Azure Web App 在 Kudu Env 中未显示连接字符串

使用 SQL Server 上的 JDBC Sink 连接器自动创建适当的 DATETIME 类型字段