使用面临问题 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL synt

Posted

技术标签:

【中文标题】使用面临问题 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; 无法接收对 mysql 的更改;【英文标题】:unable to sink changes to mysql using facing an issue com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; 【发布时间】:2020-10-10 16:19:52 【问题描述】:

我使用debezium 获取数据事件并使用 kafka-connect-jdbc 接收更改

这是我正在使用的 Dockerfile mysql-connector-java-5.1.39.jar

FROM debezium/connect:1.1
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc

ENV MYSQL_DRIVER_VERSION 5.1.39

ARG KAFKA_JDBC_VERSION=5.3.1

RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-$MYSQL_DRIVER_VERSION.tar.gz" \
    | tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-$MYSQL_DRIVER_VERSION-bin.jar

RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar 

这是我的源 mysql 数据库:

docker run -it --rm --name mysqltes -p 3308:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw 
debezium/example-mysql:1.1

这是我的第二个 mysql 数据库,必须接受更改:

docker run -it --rm --name mysql -p 3307:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw \
  debezium/example-mysql:1.1

这是我的源mysql配置:

curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
  -d '
  "name": "inventory-connector",
  "config": 
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysqltes",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory"
  
'

这是我的接收器连接器配置

curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
  -d '
  "name": "inventory-connector-sink",
  "config": 
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql:3306/inventory?useSSL=false",
    "connection.user": "debezium",
    "connection.password": "dbz",
    "topics": "dbserver1.inventory.customers",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
  
'

我正面临这个问题,我不知道为什么要帮我解决这个问题

2020-06-20 13:36:58,299 INFO   ||  Attempting to open connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,352 INFO   ||  JdbcDbWriter Connected   [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2020-06-20 13:36:58,415 INFO   ||  Checking MySql dialect for existence of table "dbserver1"."inventory"."customers"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,423 INFO   ||  Using MySql dialect table "dbserver1"."inventory"."customers" absent   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,428 INFO   ||  Creating table with sql: CREATE TABLE `dbserver1`.`inventory`.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name` VARCHAR(256) NOT NULL,
`email` VARCHAR(256) NOT NULL,
PRIMARY KEY(`id`))   [io.confluent.connect.jdbc.sink.DbStructure]
2020-06-20 13:36:58,443 WARN   ||  Create failed, will attempt amend if table already exists   [io.confluent.connect.jdbc.sink.DbStructure]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
        at com.mysql.jdbc.Util.getInstance(Util.java:387)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
        at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
        at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
        at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,508 WARN   ||  Write of 4 records failed, remainingRetries=7   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
        at com.mysql.jdbc.Util.getInstance(Util.java:387)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
        at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
        at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
        at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,562 INFO   ||  Closing connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,582 INFO   ||  Initializing writer using SQL dialect: MySqlDatabaseDialect   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
2020-06-20 13:36:58,584 ERROR  ||  WorkerSinkTaskid=inventory-connector-sink-0 RetriableException from SinkTask:   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1

        ... 12 more

【问题讨论】:

我认为你应该删除'last_name'、'id'等字段中的' 【参考方案1】:

尝试将"quote.sql.identifiers": "never" 属性添加到接收器连接器配置中。在这种情况下,CREATE TABLE 语句应该是:

CREATE TABLE dbserver1.inventory.customers (
  last_name VARCHAR(256) NOT NULL,
  id INT NOT NULL,
  first_name VARCHAR(256) NOT NULL,
  email VARCHAR(256) NOT NULL,
  PRIMARY KEY(id)
)

附:它看起来像是一种解决方法,但 JDBC 连接器中可能存在错误。

【讨论】:

以上是关于使用面临问题 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL synt的主要内容,如果未能解决你的问题,请参考以下文章

在 android 上使用 smack 面临问题

使用 appium 在 Eclipse 中进行移动测试期间面临的问题

在使用 pyserial 和 Arduino 时面临 TkInter 的问题

面临启用导航按钮和使用 vis.js 在时间线图中获取数据的问题

使用面临未找到反向问题的类基视图

我正在尝试使用 safec lib 函数但面临链接问题