MySql 查询在 Kafka-connect 中失败

Posted

技术标签:

【中文标题】MySql 查询在 Kafka-connect 中失败【英文标题】:MySql query fails in Kafka-connect 【发布时间】:2020-05-30 05:21:04 【问题描述】:

我正在使用 Kafka 连接 (confluentinc/cp-kafka-connect:5.4.0) 并在其中安装了 mysql 连接器。基本上是以下 Dockerfile:

FROM confluentinc/cp-kafka-connect:5.4.0

RUN echo "===> Installing MySQL connector" \
    && curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz" | tar -xzf - -C /usr/share/java/kafka-connect-jdbc/ --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar

我创建了一个具有以下配置的连接器:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '
        "name": "test-connector",
        "config": 
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url":"jdbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass",
                "topic.prefix": "test-connector-",
                "mode":"incrementing",
                "query":"$QUERY",
                "incrementing.column.name": "eventId",
                "validate.non.null": false
                
        '

我尝试用以下查询替换上面提到的“$QUERY”(粘贴格式化版本以提高可读性):

SELECT * FROM ( 
    select DISTINCT t1.id,
            t1.name,
            t1.email,
            t1.department,
            t1.modified
        from
            test as t1
            LEFT OUTER JOIN test as t2 ON t1.id > -1
        WHERE
            t1.id > -1) something

上述连接器已创建并按预期工作。但是,以下查询失败。和

SELECT * FROM(
        SELECT
            HSC.id eventId,
            HSC.actions,
            HSC.cause,
            HSC.metaData,
            CRED.uid uidCard,
            CRED.is_primary isPrimary,
            CRED.type cardType,
            PR.keys_ prin,
            HOUSE.uid orgUid,
            CHILD.uid childUid,
            USER.first_name fName,
            USER.last_name lName,
            CHILD.biz_phone bizPh,
            CHILD.cell_phone cellPh,
            AOF.state aof_state,
            RECORD.uid replacedCardUid
        FROM
            house_of_cards as HSC
            INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id
            INNER JOIN prin PR ON CRED.prin_id = PR.id
            INNER JOIN child CHILD ON CRED.employee_id = CHILD.id
            INNER JOIN user USER ON USER.id = CHILD.user_id
            INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id
            LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id
            LEFT JOIN address AOF ON CRED.address_on_file = AOF.id
            LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id
        WHERE
        HSC.type = 'cardActionKamen' ) playcards

编辑:1. 发布完整的错误日志(当然要修改敏感数据)


[2020-02-16 20:31:46,463] INFO AbstractConfig values:                                                                                                                                                                           
        batch.max.rows = 100                                                                                                                                                                                                    
        catalog.pattern = null                                                                                                                                                                                                  
        connection.attempts = 3                                                                                                                                                                                                 
        connection.backoff.ms = 10000                                                                                                                                                                                           
        connection.password = null                                                                                                                                                                                              
        connection.url = jdbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass                                                                                        
        connection.user = null
        db.timezone = UTC
        dialect.name =
        incrementing.column.name = eventId
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query = SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards
        quote.sql.identifiers = ALWAYS
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = card-view-test
        validate.non.null = false
 (org.apache.kafka.common.config.AbstractConfig)
[2020-02-16 20:31:46,595] INFO AbstractConfig values:
 (org.apache.kafka.common.config.AbstractConfig)
[2020-02-16 20:31:46,601] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Connector card-view-test config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,602] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2020-02-16 20:31:46,602] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Successfully joined group with generation 295 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Joined group at generation 295 with protocol version 2 and got assignment: Assignmenterror=0, leader='connect-1-d82cc98e-5
619-4972-ad78-3b0a53c3b5bb', leaderUrl='http://10.xx.yy.36:8083/', offset=433, connectorIds=[card-view-test], taskIds=[card-view-test-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0 with rebalance delay: 0 (org.apache
.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Starting connectors and tasks using config offset 433 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Starting connector card-view-test (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Starting task card-view-test-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,610] INFO Creating task card-view-test-0 (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,610] INFO ConnectorConfig values:
        config.action.reload = restart                                                                                                                                                                                          
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector                                                                                                                                                         
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig)
[2020-02-16 20:31:46,610] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-02-16 20:31:46,610] INFO Creating connector card-view-test of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,610] INFO ConnectorConfig values:
        config.action.reload = restart
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig)
[2020-02-16 20:31:46,610] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig)
[2020-02-16 20:31:46,610] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-02-16 20:31:46,610] INFO TaskConfig values:
        task.class = class io.confluent.connect.jdbc.source.JdbcSourceTask
 (org.apache.kafka.connect.runtime.TaskConfig)
[2020-02-16 20:31:46,610] INFO Instantiated task card-view-test-0 with version 5.4.0 of type io.confluent.connect.jdbc.source.JdbcSourceTask (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,610] INFO Instantiated connector card-view-test with version 5.4.0 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,611] INFO Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector)
[2020-02-16 20:31:46,611] INFO JdbcSourceConnectorConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass
        connection.user = null
        db.timezone = UTC
        dialect.name =
        incrementing.column.name = eventId
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query = SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards
        quote.sql.identifiers = ALWAYS                                                                                                                                                                                          
        schema.pattern = null                                                                                                                                                                                                   
        table.blacklist = []                                                                                                                                                                                                    
        table.poll.interval.ms = 60000                                                                                                                                                                                          
        table.types = [TABLE]                                                                                                                                                                                                   
        table.whitelist = []                                                                                                                                                                                                    
        tables = []                                                                                                                                                                                                             
        timestamp.column.name = []                                                                                                                                                                                              
        timestamp.delay.interval.ms = 0                                                                                                                                                                                         
        topic.prefix = card-view-test-25                                                                                                                                                                                        
        validate.non.null = false                                                                                                                                                                                               
 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)                                                                                                                                                                        
[2020-02-16 20:31:46,621] INFO Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask)                                                                                                                       
[2020-02-16 20:31:46,681] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)                                                                                                                       
[2020-02-16 20:31:46,681] INFO WorkerSourceTaskid=card-view-test-0 Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)                                                          
[2020-02-16 20:31:46,681] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)                                                                                              
[2020-02-16 20:31:46,723] INFO [Producer clientId=connector-producer-card-view-test-0] Cluster ID: XXOJHDDIEHID-YYGDIEDi82eeh (org.apache.kafka.clients.Metadata)                                                                   
[2020-02-16 20:31:46,732] INFO Starting thread to monitor tables. (io.confluent.connect.jdbc.source.TableMonitorThread)                                                                                                         
[2020-02-16 20:31:46,735] INFO Finished creating connector card-view-test (org.apache.kafka.connect.runtime.Worker)                                                                                                             
[2020-02-16 20:31:46,736] INFO SourceConnectorConfig values:                                                                                                                                                                    
        config.action.reload = restart                                                                                                                                                                                          
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector                                                                                                                                                         
        errors.log.enable = false                                                                                                                                                                                               
        errors.log.include.messages = false                                                                                                                                                                                     
        errors.retry.delay.max.ms = 60000                                                                                                                                                                                       
        errors.retry.timeout = 0                                                                                                                                                                                                
        errors.tolerance = none                                                                                                                                                                                                 
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig)
[2020-02-16 20:31:46,737] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = card-view-test
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-02-16 20:31:46,949] INFO Begin using SQL query: SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, 
 CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, 
 USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid 
 FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id 
 INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id 
 LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id 
 LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards events WHERE `eventId` > ? ORDER BY `eventId` ASC (io.confluent.connect.jdbc.source.TableQuerier)

[2020-02-16 20:31:46,954] ERROR Failed to run query for table TimestampIncrementingTableQueriertable=null, query='SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, 
 CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, 
 USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid 
 FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id 
 INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id 
 LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id 
 LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = "cardActionKamen" )', topicPrefix='card-view-test-25', incrementingColumn='eventId', timestampColumns=[]
:  (io.confluent.connect.jdbc.source.JdbcSourceTask)

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'demo_db.HSC' doesn't exist
        at sun.reflect.GeneratedConstructorAccessor48.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
        at com.mysql.jdbc.Util.getInstance(Util.java:387)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2503)
        at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1369)
        at com.mysql.jdbc.Field.getCollation(Field.java:448)
        at com.mysql.jdbc.ResultSetMetaData.isCaseSensitive(ResultSetMetaData.java:552)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumn(GenericDatabaseDialect.java:713)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:655)
        at io.confluent.connect.jdbc.source.SchemaMapping.create(SchemaMapping.java:63)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:94)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:61)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-02-16 20:31:47,109] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Tasks [card-view-test-0] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:47,611] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:47,613] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Handling task config update by restarting tasks [card-view-test-0] (org.apache.kafka.connect.runtime.distributed.Distribute
dHerder)
[2020-02-16 20:31:47,613] INFO Stopping task card-view-test-0 (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:47,614] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2020-02-16 20:31:47,656] INFO Closing resources for JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2020-02-16 20:31:47,656] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2020-02-16 20:31:47,657] INFO WorkerSourceTaskid=card-view-test-0 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-02-16 20:31:47,657] INFO WorkerSourceTaskid=card-view-test-0 flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-02-16 20:31:47,657] INFO [Producer clientId=connector-producer-card-view-test-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)

编辑 2. 添加连接器配置


    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "mode": "incrementing",
    "incrementing.column.name": "eventId",
    "topic.prefix": "test-connector",
    "validate.non.null": "false",
    "query": "SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards",
    "name": "test-connector-",
    "connection.url": "dbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass"

这两个查询都有 WHERE 子句、使用别名、有 JOINS 等。它们的功能似乎与我相同,为什么一个有效而另一个无效?查询的长度是否有限制?这和Mysql的版本有关吗(我用的是GCP提供的5.7)还是connector的版本(我已经下载了5.1.39,在上面的Dockerfile中可以看到)?

我还尝试为上述查询中提到的两个子查询(子 SELECT 查询)创建一个 MySQL 视图,这对这两个查询都有效。知道这里可能是什么问题吗?

【问题讨论】:

但是,下面的查询失败了 错误信息在哪里(完整!)?以及将查询包装成多余的SELECT * 的原因是什么? 因为有一个 WHERE 子句在没有外部 SELECT 的情况下会失败。看看this、this 和this 有一个 WHERE 子句在没有外部 SELECT 的情况下会失败。 ??这是 kafka 连接器功能 - 外部查询中没有 WHERE 子句??? JDBC 连接器无需SELECT * FROM (SELECT ...) alias 包装即可轻松处理查询。 请提供错误日志 @AmitYadav,好的,请显示curl -X GET http://localhost:8083/connectors/test-connecot/config 的输出。看起来您在连接器配置中输入了错字(evenId 而不是 eventId for incrementing.column.name 【参考方案1】:

我试图重现你的情况。

    demo_db中创建表:
mysql> show tables;
+-------------------+
| Tables_in_demo_db |
+-------------------+
| address           |
| child             |
| house             |
| house_of_cards    |
| play_card         |
| prin              |
| user              |
+-------------------+
7 rows in set (0.00 sec)
    连接器启动:
curl --location --request PUT 'http://myhost:10900/connectors/test-connector/config' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "mode": "incrementing",
    "incrementing.column.name": "eventId",
    "topic.prefix": "test-connector",
    "validate.non.null": "false",
    "query": "SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = '\''cardActionKamen'\'' ) playcards",
    "connection.url": "jdbc:mysql://myhost:3306/demo_db?user=isk&password=123"
'
    连接器日志:
[2020-02-17 02:17:17,589] INFO [test-connector|worker] Creating connector test-connector of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:251)
[2020-02-17 02:17:17,592] INFO [test-connector|worker] Instantiated connector test-connector with version 5.3.1 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:254)
[2020-02-17 02:17:17,592] INFO [test-connector|worker] Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector:69)
[2020-02-17 02:17:17,592] INFO [test-connector|worker] JdbcSourceConnectorConfig values: 
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://myhost:3306/demo_db?user=isk&password=123
        connection.user = null
        db.timezone = UTC
        dialect.name = 
        incrementing.column.name = eventId
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query = SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards
        quote.sql.identifiers = ALWAYS
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-connector
        validate.non.null = false
 (io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:347)
[2020-02-17 02:17:17,594] INFO [test-connector|worker] Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:87)
[2020-02-17 02:17:17,640] INFO [test-connector|worker] Starting thread to monitor tables. (io.confluent.connect.jdbc.source.TableMonitorThread:73)
[2020-02-17 02:17:17,641] INFO [test-connector|worker] Finished creating connector test-connector (org.apache.kafka.connect.runtime.Worker:273)

所以没有错误,一切正常。


使用过的版本:

    卡夫卡连接
curl -X GET http://localhost:10900 | jq '.version'

"2.4.0"
    Jdbc 源连接器
curl -X GET http://localhost:10900/connector-plugins | jq -c '.[] | select( .class == "io.confluent.connect.jdbc.JdbcSourceConnector") | .version'

"5.3.1"
    MySql jdbc 驱动
8.0.19

【讨论】:

所以问题是 mysql-connector-java-5.1.39 。我使用了 JDBC 驱动程序 8.0.19,所有查询都运行良好。这应该是某些文档或警告的一部分,而不是建议:| @AmitYadav,我一开始没注意你的版本,所以我用了最新的驱动。我希望我对你有帮助!万事如意! 您的回答直接或间接地帮助解决了这里的问题!谢谢,我将能够在接下来的 11 小时内奖励赏金,我会针对这个答案这样做。干杯伙伴!

以上是关于MySql 查询在 Kafka-connect 中失败的主要内容,如果未能解决你的问题,请参考以下文章

无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

Kafka-connect,Bootstrap 代理断开连接

Kafka-Connect实践

Kafka-Connect:在分布式模式下创建新连接器就是创建新组

Kafka-connect 是不是必须使用模式注册表?

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题