使用 postgresql 字段类型文本过滤 kafka 消息时出错

Posted

技术标签:

【中文标题】使用 postgresql 字段类型文本过滤 kafka 消息时出错【英文标题】:getting error whille filtering kafka messages with postgresql field type text 【发布时间】:2020-10-16 20:25:46 【问题描述】:

错误 || WorkerSourceTaskid=ptl_connector-0 任务抛出未捕获且不可恢复的异常 [org.apache.kafka.connect.runtime.WorkerTask] org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出容限 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) 在 org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:320) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) 在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 java.base/java.lang.Thread.run(Thread.java:834) 引起:io.debezium.DebeziumException:为记录“SourceRecordsourcePartition=server=testdev_ptl005”评估表达式“value.after.brandid == BrandA”时出错,sourceOffset=last_snapshot_record=false, lsn=27649944, txId=707 , ts_usec=1594357573069000, 快照=true 时间戳=null, headers=ConnectHeaders(headers=)' 在 io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:116) 在 io.debezium.transforms.Filter.doApply(Filter.java:33) 在 io.debezium.transforms.ScriptingTransformation.apply(ScriptingTransformation.java:189) 在 org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 更多 引起:javax.script.ScriptException:groovy.lang.MissingPropertyException:没有这样的属性:BrandA for class:Script1 在 org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:320) 在 org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:71) 在 java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:89) 在 io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107) ... 16 更多 引起:groovy.lang.MissingPropertyException:没有这样的属性:BrandA for class:Script1 在 org.codehaus.groovy.runtime.ScriptBytecodeAdapter.unwrap(ScriptBytecodeAdapter.java:65) 在 org.codehaus.groovy.runtime.callsite.PogoGetPropertySite.getProperty(PogoGetPropertySite.java:51) 在 org.codehaus.groovy.runtime.callsite.AbstractCallSite.callGroovyObjectGetProperty(AbstractCallSite.java:341) 在 Script1.run(Script1.groovy:1) 在 org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:317) ... 19 更多

【问题讨论】:

您好,请提供错误的 FULL 堆栈跟踪。没有它,就无法诊断问题。作为初学者,您是否在类路径中添加了脚本语言 JAR? 【参考方案1】:

请将 JAR 文件放到连接器 (so debezium-connector-*) 目录中

groovy-3.0.4.jar groovy-jsr223-3.0.4.jar groovy-json-3.0.4.jar

【讨论】:

将jar文件“groovy-3.0.4.jar”放入kafka/libs/后出现同样的错误 对不起,我编辑了我的回复,因为我描述了与旧 Groovy 版本一起使用的过程。上面列举了这些文件,但请将它们放在连接器而不是全局库目录中 我的连接器配置:“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“plugin.name”:“wal2json”,“database.hostname”:“ ..*.*”、“database.port”:“5432”、“database.user”:“postgres”、“database.password”:“*******”、“database. dbname”:“ptl”,“database.whitelist”:“ptl”,“transforms”:“filter,changetopic”,“transforms.filter.type”:“io.debezium.transforms.Filter”,“transforms.filter。语言”:“jsr223.groovy”,“transforms.filter.condition”:“value.after.brandid == BrandA”, 我将三个jar文件放在/kafka/connect/debezium-connector-postgres中,但得到了同样的错误,上面的消息是我的连接器配置。 我的配置:: "transforms": "filter", "transforms.filter.type": "io.debezium.transforms.Filter", "transforms.filter.language": "jsr223.groovy ", "transforms.filter.condition": "value.after.brandid == 'BrandA'" 并出现这样的错误原因:io.debezium.DebeziumException: 评估表达式'value.after.brandid == BrandA'时出错【参考方案2】:

您是否考虑过为此使用 kafka 连接器? 也许这会有所帮助: https://docs.confluent.io/current/connect/transforms/filter.html

【讨论】:

我正在使用 Docker Debezium Kafka 连接器,但我不了解如何在现有环境中进行配置。

以上是关于使用 postgresql 字段类型文本过滤 kafka 消息时出错的主要内容,如果未能解决你的问题,请参考以下文章

使用postgresql TEXT类型按字段排序时如何删除重复项?

postgresql copy from 字符串转换为时间类型

PostgreSQL。按数组字段值的交集过滤

如何使用 Postgresql 将文本拆分为多个字段?

postgresql - 替换文本字段中字符串的所有实例

如何强制 PostgreSQL 使用某个索引?