kafka Cassandra接收器连接器中的ClassCastException
Posted
技术标签:
【中文标题】kafka Cassandra接收器连接器中的ClassCastException【英文标题】:ClassCastException in kafka Cassandra sink connector 【发布时间】:2021-02-26 16:44:59 【问题描述】:我将源用作oracle,将接收器用作cassandra。
-
来源 replenis_lead_time(类型编号 (3,0))。在源连接器中,我将字段更改为喜欢 kafka 的 to_char(replenis_lead_time)。
接收replenis_lead_time(输入文本)。为了修复以下异常,我尝试将接收器 Cassandra 字段类型更改为 double、int、text,但我仍然看到演员表异常。
非常感谢任何帮助。
java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
com.datastax.dsbulk.commons.codecs.string.StringToStringCodec.externalToInternal(StringToStringCodec.java:14) at
com.datastax.dsbulk.commons.codecs.ConvertingCodec.encode(ConvertingCodec.java:63) at
com.datastax.kafkaconnector.RecordMapper.bindColumn(RecordMapper.java:239) at
com.datastax.kafkaconnector.RecordMapper.map(RecordMapper.java:140) at
com.datastax.kafkaconnector.DseSinkTask.mapAndQueueRecord(DseSinkTask.java:226) at
com.datastax.kafkaconnector.DseSinkTask.lambda$put$1(DseSinkTask.java:132) at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more
[2020-11-12 13:03:34,022] ERROR WorkerSinkTaskid=dse-gcp-connector-mm-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561) at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629) ... 3 more Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
com.datastax.dsbulk.commons.codecs.string.StringToStringCodec.externalToInternal(StringToStringCodec.java:14) at
com.datastax.dsbulk.commons.codecs.ConvertingCodec.encode(ConvertingCodec.java:63) at
com.datastax.kafkaconnector.RecordMapper.bindColumn(RecordMapper.java:239) at
com.datastax.kafkaconnector.RecordMapper.map(RecordMapper.java:140) at
com.datastax.kafkaconnector.DseSinkTask.mapAndQueueRecord(DseSinkTask.java:226) at
com.datastax.kafkaconnector.DseSinkTask.lambda$put$1(DseSinkTask.java:132) at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more
[2020-11-12 13:03:34,023] ERROR WorkerSinkTaskid=dse-gcp-connector-mm-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
沉:
"name": "dse-gcp-connector-mm",
"config":
"connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
"queryExecutionTimeout": "300",
"loadBalancing.localDc": "GCE-UC1A",
"auth.password": "*****",
"tasks.max": "3",
"topics": "mm_topic5",
"contactPoints": "uc1a-ecomdev-cas31.**.com,uc1a-ecomdev-cas30.**.com",
"auth.provider": "None",
"ignoreErrors": "false",
"topic.mm_topic5.middle_layer.mm_temp.consistencyLevel": "LOCAL_QUORUM",
"topic.mm_topic5.masterdata_middle_layer.material_master.consistencyLevel": "LOCAL_QUORUM",
"topic.mm_topic5.middle_layer.mm_temp.ttl": "864000",
"connectionPoolLocalSize": "12",
"topic.mm_topic5.codec.date": "yyyy-MM-dd",
"topic.mm_topic5.codec.timestamp": "yyyy-MM-dd HH:mm:ss",
"topic.mm_topic5.middle_layer.mm_temp.mapping": "datebucket=value.datebucket,country=value.country, brand=value.brand, mat_num=value.mat_num, sales_org=value.sales_org,plant=value.plant,cas_num=value.cas_num,mat_desc=value.mat_desc,mat_grp=value.mat_grp,mat_type=value.mat_type,old_mat_num=value.old_mat_num,prod_hier=value.prod_hier,sales_unit=value.sales_unit,strg_cond=value.strg_cond,updt_ts=value.updt_ts2,act_flg=value.act_flg,apo_rlv=value.apo_rlv,base_unit_of_msr=value.base_unit_of_msr,changed_by=value.changed_by,created_by=value.created_by,created_on=value.created_on,env_rlv=value.env_rlv,lab_mat_grp=value.lab_mat_grp,language=value.language,last_change=value.last_change,mat_ctg=value.mat_ctg, plant_desc=value.plant_desc,price_brand_ctg=value.price_brand_ctg,sys_id=value.sys_id,xplant_mat_sts=value.xplant_mat_sts ,division=value.division,prod_grp_sbu=value.prod_grp_sbu,glb_prod_hier=value.glb_prod_hier,portfolio_brand=value.portfolio_brand,xdchain_sts=value.xdchain_sts,dchain_sts=value.dchain_sts,mara_prod_hier=value.mara_prod_hier,tarif=value.tarif,brand_desc=value.brand_desc,tptm_znoinetsls=value.tptm_znoinetsls,plant_mat_sts=value.plant_mat_sts,item_ctg_grp=value.item_ctg_grp,price_ctg_grp=value.price_ctg_grp,prod_attr2=value.prod_attr2,special_proc_key=value.special_proc_key,replenis_lead_time=value.replenis_lead_time,purchase_grp=value.purchase_grp,gr_process_time=value.gr_process_time,mat_grp4=value.mat_grp4",
"name": "dse-gcp-connector-mm",
"topic.mm_topic5.masterdata_middle_layer.material_master.mapping": "country=value.country, brand=value.brand, mat_num=value.mat_num, sales_org=value.sales_org,plant=value.plant,cas_num=value.cas_num,mat_desc=value.mat_desc,mat_grp=value.mat_grp,mat_type=value.mat_type,old_mat_num=value.old_mat_num,prod_hier=value.prod_hier,sales_unit=value.sales_unit,strg_cond=value.strg_cond,updt_ts=value.updt_ts2,act_flg=value.act_flg,apo_rlv=value.apo_rlv,base_unit_of_msr=value.base_unit_of_msr,changed_by=value.changed_by,created_by=value.created_by,created_on=value.created_on,env_rlv=value.env_rlv,lab_mat_grp=value.lab_mat_grp,language=value.language,last_change=value.last_change,mat_ctg=value.mat_ctg, plant_desc=value.plant_desc,price_brand_ctg=value.price_brand_ctg,sys_id=value.sys_id,xplant_mat_sts=value.xplant_mat_sts ,division=value.division,prod_grp_sbu=value.prod_grp_sbu,glb_prod_hier=value.glb_prod_hier,portfolio_brand=value.portfolio_brand,xdchain_sts=value.xdchain_sts,dchain_sts=value.dchain_sts,mara_prod_hier=value.mara_prod_hier,tarif=value.tarif,brand_desc=value.brand_desc,tptm_znoinetsls=value.tptm_znoinetsls,plant_mat_sts=value.plant_mat_sts,item_ctg_grp=value.item_ctg_grp,price_ctg_grp=value.price_ctg_grp,prod_attr2=value.prod_attr2,special_proc_key=value.special_proc_key,replenis_lead_time=value.replenis_lead_time,purchase_grp=value.purchase_grp,gr_process_time=value.gr_process_time,mat_grp4=value.mat_grp4",
"auth.username": "sam"
来源:
"name": "jdbc_mm_test",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "UPDT_TS",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"connection.password": "****",
"tasks.max": "1",
"query": "select country||'-'||mat_num||'-'||sales_org||'-'||plant||'-'||sys_id as \"id\",act_flg as \"act_flg\",apo_rlv as \"apo_rlv\",base_unit_of_msr as \"base_unit_of_msr\",brand as \"brand\",cas_num as \"cas_num\",to_char(created_on,'YYYY-MM-DD HH24:MI:SS') as \"created_on\",country as \"country\",created_by as \"created_by\",changed_by as \"changed_by\",env_rlv as \"env_rlv\",lab_mat_grp as \"lab_mat_grp\",language as \"language\",to_char(last_change, 'YYYY-MM-DD HH24:MI:SS') as \"last_change\",mat_num as \"mat_num\",mat_ctg as \"mat_ctg\",mat_desc as \"mat_desc\",mat_grp as \"mat_grp\",mat_type as \"mat_type\",old_mat_num as \"old_mat_num\",plant as \"plant\",plant_desc as \"plant_desc\",price_brand_ctg as \"price_brand_ctg\",prod_hier as \"prod_hier\",sales_org as \"sales_org\",sales_unit as \"sales_unit\",strg_cond as \"strg_cond\",to_char(updt_ts, 'YYYY-MM-DD HH24:MI:SS') as \"updt_ts2\",updt_ts,to_char(updt_ts, 'YYYY-MM-DD') as \"datebucket\", sys_id as \"sys_id\",xplant_mat_sts as \"xplant_mat_sts\",division as \"division\",prod_grp_sbu as \"prod_grp_sbu\",glb_prod_hier as \"glb_prod_hier\",portfolio_brand as \"portfolio_brand\",xdchain_sts as \"xdchain_sts\", dchain_sts as\"dchain_sts\", mara_prod_hier as \"mara_prod_hier\",tarif as \"tarif\",brand_desc as \"brand_desc\",tptm_znoinetsls as \"tptm_znoinetsls\",plant_mat_sts as \"plant_mat_sts\" , item_ctg_grp as \"item_ctg_grp\", price_ctg_grp as \"price_ctg_grp\",prod_attr2 as \"prod_attr2\" , special_proc_key as \"special_proc_key\" , to_char(replenis_lead_time) as \"replenis_lead_time\" , purchase_grp as \"purchase_grp\" , to_char(gr_process_time) as \"gr_process_time\" , mat_grp4 as \"mat_grp4\" from material_master_enriched2",
"connection.attempts": "1000",
"transforms": "createKey,extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"batch.max.rows": "3000",
"table.types": "query",
"mode": "timestamp",
"topic.prefix": "mm_topic5",
"transforms.extractInt.field": "id",
"connection.user": "***",
"schema.pattern": "",
"transforms.createKey.fields": "id",
"poll.interval.ms": "10000",
"name": "jdbc_mm_test",
"connection.url": "jdbc:oracle:thin:@oragen3dev.***.com:1545:gen3webd"
【问题讨论】:
请提供您的 kafka 连接配置(源和接收器) 更新源和接收器 【参考方案1】:我使用转换为字符串类型然后它工作正常
【讨论】:
以上是关于kafka Cassandra接收器连接器中的ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章
Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra
使用 kafka 和 cassandra 进行事件溯源的类别预测
Kafka Connect Sink 到 Cassandra :: java.lang.VerifyError: Bad return type