带有旧数据库的JDBC Kafka连接器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了带有旧数据库的JDBC Kafka连接器相关的知识,希望对你有一定的参考价值。

我正在尝试使用kafka-jdbc-connector(源和接收器)和一个非常古老的数据库(cloudscape)。我有这个数据库的JDBC驱动程序。我将驱动程序放在Confluent(版本5)的“/ share / java / kafka / connect / jdbc”文件夹中,并创建了属性文件。

name=test-source-cloud-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
mode=incrementing
incrementing.column.name=id
topic.prefix=test-cloud-jdbc-

当我启动连接器时,日志是:

[2019-01-31 11:23:36,582] DEBUG Finding best dialect for JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:119)
[2019-01-31 11:23:36,582] DEBUG Dialect Db2DatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect DerbyDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect GenericDatabaseDialect scored 10 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect mysqlDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect OracleDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect PostgreSqlDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SapHanaDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SqlServerDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SqliteDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SybaseDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect VerticaDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Using dialect GenericDatabaseDialect with score 10 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:132)
[2019-01-31 11:23:36,587] ERROR Failed to create job for ./etc/kafka-connect-jdbc/CloudscapeProperties.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2019-01-31 11:23:36,588] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value java.sql.SQLException: No suitable driver found for jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB for configuration Couldn't open connection to jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
Invalid value java.sql.SQLException: No suitable driver found for jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB for configuration Couldn't open connection to jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)

我想有一个问题是JDBC驱动程序很老(它使用JAVA 1.3)。驱动程序使用RMI协议进行通信。如果我运行一个非常简单的JAVA RMI客户端来使用RmiJdbc.jar和cloudscape.jar(驱动程序)查询数据库,我只能查询数据库并获得结果。

您认为是与Java版本相关的问题吗?并且,它是否有意义/是否可以实现自定义Kafka驱动程序来读取此数据库中的数据?有关此问题的任何建议或如何为旧数据库实现自定义Kafka驱动程序?

答案

我不会立即得出结论,驱动程序/数据库是旧的...

您不能将plugin.path直接指向JDBC驱动程序。

plugin.path更新为顶级/path/to/confluent-x.y.z/usr/share/java的绝对路径,并将驱动程序复制到kafka-connect-jdbc文件夹中。

重新启动Connect,然后找到驱动程序。


您可以通过查找kafka/connect-log4j.properties和set来进一步调试

log4j.rootLogger=DEBUG, stdout

然后你会看到这样的日志(MySQL的例子)

[2019-01-15 09:45:26,949] DEBUG Registered java.sql.Driver: com.mysql.cj.jdbc.Driver@37303f12 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:277)

以上是关于带有旧数据库的JDBC Kafka连接器的主要内容,如果未能解决你的问题,请参考以下文章

Kafka JDBC连接器加载所有数据,然后增量

Kafka JDBC Sink 句柄数组数据类型

部分代码片段

使用 Kafka Connect API JDBC Sink 连接器示例到 Oracle 数据库的 Kafka 主题

需要分布式模式的 jdbc Kafka 连接配置设置作为 docker 容器的参考文档或代码

提取和转换 jdbc sink 连接器的 kafka 消息特定字段