kafka Cassandra接收器连接器中的ClassCastException

Posted

技术标签:

【中文标题】kafka Cassandra接收器连接器中的ClassCastException【英文标题】:ClassCastException in kafka Cassandra sink connector 【发布时间】:2021-02-26 16:44:59 【问题描述】:

我将源用作oracle,将接收器用作cassandra。

    来源 replenis_lead_time(类型编号 (3,0))。在源连接器中,我将字段更改为喜欢 kafka 的 to_char(replenis_lead_time)。 接收replenis_lead_time(输入文本)。为了修复以下异常,我尝试将接收器 Cassandra 字段类型更改为 double、int、text,但我仍然看到演员表异常。

非常感谢任何帮助。

java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
    com.datastax.dsbulk.commons.codecs.string.StringToStringCodec.externalToInternal(StringToStringCodec.java:14) at
    com.datastax.dsbulk.commons.codecs.ConvertingCodec.encode(ConvertingCodec.java:63) at
    com.datastax.kafkaconnector.RecordMapper.bindColumn(RecordMapper.java:239) at
    com.datastax.kafkaconnector.RecordMapper.map(RecordMapper.java:140) at
    com.datastax.kafkaconnector.DseSinkTask.mapAndQueueRecord(DseSinkTask.java:226) at
    com.datastax.kafkaconnector.DseSinkTask.lambda$put$1(DseSinkTask.java:132) at
    java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more 

[2020-11-12 13:03:34,022] ERROR WorkerSinkTaskid=dse-gcp-connector-mm-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at
    org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561) at
    org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at
    org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at
    org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at
    org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at
    org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at
    java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
    java.util.concurrent.FutureTask.run(FutureTask.java:266) at
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
    java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
    java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at
    java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at
    java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629) ... 3 more Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
    com.datastax.dsbulk.commons.codecs.string.StringToStringCodec.externalToInternal(StringToStringCodec.java:14) at
    com.datastax.dsbulk.commons.codecs.ConvertingCodec.encode(ConvertingCodec.java:63) at
    com.datastax.kafkaconnector.RecordMapper.bindColumn(RecordMapper.java:239) at
    com.datastax.kafkaconnector.RecordMapper.map(RecordMapper.java:140) at
    com.datastax.kafkaconnector.DseSinkTask.mapAndQueueRecord(DseSinkTask.java:226) at
    com.datastax.kafkaconnector.DseSinkTask.lambda$put$1(DseSinkTask.java:132) at
    java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more 
[2020-11-12 13:03:34,023] ERROR WorkerSinkTaskid=dse-gcp-connector-mm-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

沉:


  "name": "dse-gcp-connector-mm",
  "config": 
    "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
    "queryExecutionTimeout": "300",
    "loadBalancing.localDc": "GCE-UC1A",
    "auth.password": "*****",
    "tasks.max": "3",
    "topics": "mm_topic5",
    "contactPoints": "uc1a-ecomdev-cas31.**.com,uc1a-ecomdev-cas30.**.com",
    "auth.provider": "None",
    "ignoreErrors": "false",
    "topic.mm_topic5.middle_layer.mm_temp.consistencyLevel": "LOCAL_QUORUM",
    "topic.mm_topic5.masterdata_middle_layer.material_master.consistencyLevel": "LOCAL_QUORUM",
    "topic.mm_topic5.middle_layer.mm_temp.ttl": "864000",
    "connectionPoolLocalSize": "12",
    "topic.mm_topic5.codec.date": "yyyy-MM-dd",
    "topic.mm_topic5.codec.timestamp": "yyyy-MM-dd HH:mm:ss",
    "topic.mm_topic5.middle_layer.mm_temp.mapping": "datebucket=value.datebucket,country=value.country, brand=value.brand, mat_num=value.mat_num, sales_org=value.sales_org,plant=value.plant,cas_num=value.cas_num,mat_desc=value.mat_desc,mat_grp=value.mat_grp,mat_type=value.mat_type,old_mat_num=value.old_mat_num,prod_hier=value.prod_hier,sales_unit=value.sales_unit,strg_cond=value.strg_cond,updt_ts=value.updt_ts2,act_flg=value.act_flg,apo_rlv=value.apo_rlv,base_unit_of_msr=value.base_unit_of_msr,changed_by=value.changed_by,created_by=value.created_by,created_on=value.created_on,env_rlv=value.env_rlv,lab_mat_grp=value.lab_mat_grp,language=value.language,last_change=value.last_change,mat_ctg=value.mat_ctg, plant_desc=value.plant_desc,price_brand_ctg=value.price_brand_ctg,sys_id=value.sys_id,xplant_mat_sts=value.xplant_mat_sts ,division=value.division,prod_grp_sbu=value.prod_grp_sbu,glb_prod_hier=value.glb_prod_hier,portfolio_brand=value.portfolio_brand,xdchain_sts=value.xdchain_sts,dchain_sts=value.dchain_sts,mara_prod_hier=value.mara_prod_hier,tarif=value.tarif,brand_desc=value.brand_desc,tptm_znoinetsls=value.tptm_znoinetsls,plant_mat_sts=value.plant_mat_sts,item_ctg_grp=value.item_ctg_grp,price_ctg_grp=value.price_ctg_grp,prod_attr2=value.prod_attr2,special_proc_key=value.special_proc_key,replenis_lead_time=value.replenis_lead_time,purchase_grp=value.purchase_grp,gr_process_time=value.gr_process_time,mat_grp4=value.mat_grp4",
    "name": "dse-gcp-connector-mm",
    "topic.mm_topic5.masterdata_middle_layer.material_master.mapping": "country=value.country, brand=value.brand, mat_num=value.mat_num, sales_org=value.sales_org,plant=value.plant,cas_num=value.cas_num,mat_desc=value.mat_desc,mat_grp=value.mat_grp,mat_type=value.mat_type,old_mat_num=value.old_mat_num,prod_hier=value.prod_hier,sales_unit=value.sales_unit,strg_cond=value.strg_cond,updt_ts=value.updt_ts2,act_flg=value.act_flg,apo_rlv=value.apo_rlv,base_unit_of_msr=value.base_unit_of_msr,changed_by=value.changed_by,created_by=value.created_by,created_on=value.created_on,env_rlv=value.env_rlv,lab_mat_grp=value.lab_mat_grp,language=value.language,last_change=value.last_change,mat_ctg=value.mat_ctg, plant_desc=value.plant_desc,price_brand_ctg=value.price_brand_ctg,sys_id=value.sys_id,xplant_mat_sts=value.xplant_mat_sts ,division=value.division,prod_grp_sbu=value.prod_grp_sbu,glb_prod_hier=value.glb_prod_hier,portfolio_brand=value.portfolio_brand,xdchain_sts=value.xdchain_sts,dchain_sts=value.dchain_sts,mara_prod_hier=value.mara_prod_hier,tarif=value.tarif,brand_desc=value.brand_desc,tptm_znoinetsls=value.tptm_znoinetsls,plant_mat_sts=value.plant_mat_sts,item_ctg_grp=value.item_ctg_grp,price_ctg_grp=value.price_ctg_grp,prod_attr2=value.prod_attr2,special_proc_key=value.special_proc_key,replenis_lead_time=value.replenis_lead_time,purchase_grp=value.purchase_grp,gr_process_time=value.gr_process_time,mat_grp4=value.mat_grp4",
    "auth.username": "sam"
  

来源:


  "name": "jdbc_mm_test",
  "config": 
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "timestamp.column.name": "UPDT_TS",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "connection.password": "****",
    "tasks.max": "1",
    "query": "select country||'-'||mat_num||'-'||sales_org||'-'||plant||'-'||sys_id as \"id\",act_flg as \"act_flg\",apo_rlv as \"apo_rlv\",base_unit_of_msr as \"base_unit_of_msr\",brand as \"brand\",cas_num as \"cas_num\",to_char(created_on,'YYYY-MM-DD HH24:MI:SS') as \"created_on\",country as \"country\",created_by as \"created_by\",changed_by as \"changed_by\",env_rlv as \"env_rlv\",lab_mat_grp as \"lab_mat_grp\",language as \"language\",to_char(last_change, 'YYYY-MM-DD HH24:MI:SS') as \"last_change\",mat_num as \"mat_num\",mat_ctg as \"mat_ctg\",mat_desc as \"mat_desc\",mat_grp as \"mat_grp\",mat_type as \"mat_type\",old_mat_num as \"old_mat_num\",plant as \"plant\",plant_desc as \"plant_desc\",price_brand_ctg as \"price_brand_ctg\",prod_hier as \"prod_hier\",sales_org as \"sales_org\",sales_unit as \"sales_unit\",strg_cond as \"strg_cond\",to_char(updt_ts, 'YYYY-MM-DD HH24:MI:SS') as \"updt_ts2\",updt_ts,to_char(updt_ts, 'YYYY-MM-DD') as \"datebucket\", sys_id as \"sys_id\",xplant_mat_sts as \"xplant_mat_sts\",division as \"division\",prod_grp_sbu as \"prod_grp_sbu\",glb_prod_hier as \"glb_prod_hier\",portfolio_brand as \"portfolio_brand\",xdchain_sts as \"xdchain_sts\", dchain_sts as\"dchain_sts\", mara_prod_hier as \"mara_prod_hier\",tarif as \"tarif\",brand_desc as \"brand_desc\",tptm_znoinetsls as \"tptm_znoinetsls\",plant_mat_sts as \"plant_mat_sts\" , item_ctg_grp as \"item_ctg_grp\", price_ctg_grp as \"price_ctg_grp\",prod_attr2 as \"prod_attr2\" , special_proc_key as \"special_proc_key\" , to_char(replenis_lead_time) as \"replenis_lead_time\" , purchase_grp as \"purchase_grp\" , to_char(gr_process_time) as \"gr_process_time\" , mat_grp4 as \"mat_grp4\" from material_master_enriched2",
    "connection.attempts": "1000",
    "transforms": "createKey,extractInt",
    "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "batch.max.rows": "3000",
    "table.types": "query",
    "mode": "timestamp",
    "topic.prefix": "mm_topic5",
    "transforms.extractInt.field": "id",
    "connection.user": "***",
    "schema.pattern": "",
    "transforms.createKey.fields": "id",
    "poll.interval.ms": "10000",
    "name": "jdbc_mm_test",
    "connection.url": "jdbc:oracle:thin:@oragen3dev.***.com:1545:gen3webd"
  

【问题讨论】:

请提供您的 kafka 连接配置(源和接收器) 更新源和接收器 【参考方案1】:

我使用转换为字符串类型然后它工作正常

【讨论】:

以上是关于kafka Cassandra接收器连接器中的ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra

使用 kafka 和 cassandra 进行事件溯源的类别预测

Kafka Cassandra 连接器实际上并未写入数据库

Kafka Connect Sink 到 Cassandra :: java.lang.VerifyError: Bad return type

是否有开源 Kafka Cassandra 连接器配置的示例示例?

如何在没有 Confluent 的情况下使用 Kafka Connect for Cassandra