Kafka Connect Sink 到 Cassandra :: java.lang.VerifyError: Bad return type
Posted
技术标签:
【中文标题】Kafka Connect Sink 到 Cassandra :: java.lang.VerifyError: Bad return type【英文标题】:Kafka Connect Sink to Cassandra :: java.lang.VerifyError: Bad return type 【发布时间】:2020-04-14 23:01:27 【问题描述】:我正在尝试设置 Kafka Connect Sink 以使用 Datastax 连接器将主题中的数据收集到 Cassandra 表中:https://downloads.datastax.com/#akc
运行直接在代理上运行的独立工作者,运行 Kafka 0.10.2.2-1:
name=dse-sink
connector.class=com.datastax.kafkaconnector.DseSinkConnector
tasks.max=1
datastax-java-driver.advanced.protocol.version = V4
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
plugin.path=/usr/share/java/kafka-connect-dse/kafka-connect-dse-1.2.1.jar
topics=connect-test
contactPoints=172.16.0.48
loadBalancing.localDc=datacenter1
port=9042
ignoreErrors=true
topic.connect-test.cdrs.test.mapping= kafkakey=key, value=value
topic.connect-test.cdrs.test.consistencyLevel=LOCAL_QUORUM
但我有以下错误:
2019-12-23 16:58:43,165] ERROR Task dse-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.VerifyError: Bad return type
Exception Details:
Location:
com/fasterxml/jackson/databind/cfg/MapperBuilder.streamFactory()Lcom/fasterxml/jackson/core/TokenStreamFactory; @7: areturn
Reason:
Type 'com/fasterxml/jackson/core/JsonFactory' (current frame, stack[0]) is not assignable to 'com/fasterxml/jackson/core/TokenStreamFactory' (from method signature)
Current Frame:
bci: @7
flags:
locals: 'com/fasterxml/jackson/databind/cfg/MapperBuilder'
stack: 'com/fasterxml/jackson/core/JsonFactory'
Bytecode:
0x0000000: 2ab4 0002 b600 08b0
at com.fasterxml.jackson.databind.json.JsonMapper.builder(JsonMapper.java:114)
at com.datastax.dsbulk.commons.codecs.json.JsonCodecUtils.getObjectMapper(JsonCodecUtils.java:36)
at com.datastax.kafkaconnector.codecs.CodecSettings.init(CodecSettings.java:131)
at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$buildInstanceState$9(LifeCycleManager.java:423)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1625)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.datastax.kafkaconnector.state.LifeCycleManager.buildInstanceState(LifeCycleManager.java:457)
at com.datastax.kafkaconnector.state.LifeCycleManager.lambda$startTask$0(LifeCycleManager.java:106)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at com.datastax.kafkaconnector.state.LifeCycleManager.startTask(LifeCycleManager.java:101)
at com.datastax.kafkaconnector.DseSinkTask.start(DseSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:244)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
在 cassandra 或 Kafka 方面没有其他错误。 我在 cassandra 节点上看到活动连接,但密钥空间中没有任何内容。
知道为什么吗?
【问题讨论】:
主题内数据的表结构和架构是什么? 我将从 json 有效负载中提取一些字段。但即使我在主题中尝试使用简单的字符串有效负载,我也会遇到上述错误。 【参考方案1】:恕我直言,这是由于使用带有 BigDecimal 数据 (see related SO question) 的 JSON 内部转换器导致的问题。如the following blog post 中所述,internal.key.converter
和 internal.value.converter
自 Kafka 2.0 起已弃用,不应显式设置。你能注释掉所有internal.
属性并重试吗?
附:另请参阅JSON + Decimal has changed in Kafka 2.4
【讨论】:
如果我这样做,我有以下错误:“缺少所需的配置“internal.key.converter”,它没有默认值。” (我使用的是 kafka 0.10.2.2-1,显然需要默认值)以上是关于Kafka Connect Sink 到 Cassandra :: java.lang.VerifyError: Bad return type的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis
Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?
Kafka Connect Sink 到 Cassandra :: java.lang.VerifyError: Bad return type