使用面临问题 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL synt
Posted
技术标签:
【中文标题】使用面临问题 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; 无法接收对 mysql 的更改;【英文标题】:unable to sink changes to mysql using facing an issue com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; 【发布时间】:2020-10-10 16:19:52 【问题描述】:我使用debezium 获取数据事件并使用 kafka-connect-jdbc 接收更改
这是我正在使用的 Dockerfile mysql-connector-java-5.1.39.jar
FROM debezium/connect:1.1
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 5.1.39
ARG KAFKA_JDBC_VERSION=5.3.1
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-$MYSQL_DRIVER_VERSION.tar.gz" \
| tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-$MYSQL_DRIVER_VERSION-bin.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
这是我的源 mysql 数据库:
docker run -it --rm --name mysqltes -p 3308:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw
debezium/example-mysql:1.1
这是我的第二个 mysql 数据库,必须接受更改:
docker run -it --rm --name mysql -p 3307:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.1
这是我的源mysql配置:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
-d '
"name": "inventory-connector",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqltes",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
'
这是我的接收器连接器配置
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
-d '
"name": "inventory-connector-sink",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql:3306/inventory?useSSL=false",
"connection.user": "debezium",
"connection.password": "dbz",
"topics": "dbserver1.inventory.customers",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
'
我正面临这个问题,我不知道为什么要帮我解决这个问题
2020-06-20 13:36:58,299 INFO || Attempting to open connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,352 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2020-06-20 13:36:58,415 INFO || Checking MySql dialect for existence of table "dbserver1"."inventory"."customers" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,423 INFO || Using MySql dialect table "dbserver1"."inventory"."customers" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,428 INFO || Creating table with sql: CREATE TABLE `dbserver1`.`inventory`.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name` VARCHAR(256) NOT NULL,
`email` VARCHAR(256) NOT NULL,
PRIMARY KEY(`id`)) [io.confluent.connect.jdbc.sink.DbStructure]
2020-06-20 13:36:58,443 WARN || Create failed, will attempt amend if table already exists [io.confluent.connect.jdbc.sink.DbStructure]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,508 WARN || Write of 4 records failed, remainingRetries=7 [io.confluent.connect.jdbc.sink.JdbcSinkTask]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,562 INFO || Closing connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,582 INFO || Initializing writer using SQL dialect: MySqlDatabaseDialect [io.confluent.connect.jdbc.sink.JdbcSinkTask]
2020-06-20 13:36:58,584 ERROR || WorkerSinkTaskid=inventory-connector-sink-0 RetriableException from SinkTask: [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
... 12 more
【问题讨论】:
我认为你应该删除'last_name'、'id'等字段中的' 【参考方案1】:尝试将"quote.sql.identifiers": "never"
属性添加到接收器连接器配置中。在这种情况下,CREATE TABLE
语句应该是:
CREATE TABLE dbserver1.inventory.customers (
last_name VARCHAR(256) NOT NULL,
id INT NOT NULL,
first_name VARCHAR(256) NOT NULL,
email VARCHAR(256) NOT NULL,
PRIMARY KEY(id)
)
附:它看起来像是一种解决方法,但 JDBC 连接器中可能存在错误。
【讨论】:
以上是关于使用面临问题 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL synt的主要内容,如果未能解决你的问题,请参考以下文章
使用 appium 在 Eclipse 中进行移动测试期间面临的问题
在使用 pyserial 和 Arduino 时面临 TkInter 的问题