多个 Debezium Source 连接器一次不工作

Posted

技术标签:

【中文标题】多个 Debezium Source 连接器一次不工作【英文标题】:Multiple Debezium Source connectors not working at a time 【发布时间】:2021-05-14 04:50:00 【问题描述】:

Docker 编写

kafka:
image: confluentinc/cp-enterprise-kafka:6.0.0
container_name: kafka
depends_on:
  - zookeeper
ports:
  - 9092:9092
environment:
  KAFKA_BROKER_ID: 1
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
  KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
  KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
  CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
  CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
  CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
  CONFLUENT_METRICS_ENABLE: 'true'
  CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
  KAFKA_LOG_RETENTION_MS: 100000000 
  KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 5000

连接器 1:Debezium 源连接器(像这样我需要 8 个连接器用于 8 个表)

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    
        "name": "mysql5-mdmembers",
        "config": 
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "10",
            "database.hostname": "13.232.63.40",
            "database.port": "3307",
            "database.user": "root",
            "database.password": "secret",
            "database.server.id": "11",
            "database.server.name": "dbserver",
            "database.whitelist": "indianmo_imc_new",
            "table.whitelist": "indianmo_imc_new.md_members_cdc",
            "database.history.kafka.bootstrap.servers": "kafka:29092",
            "database.history.kafka.topic": "mysql5_md_members",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "transforms": "unwrap,dropTopicPrefix,selectFields,renameFields,addTopicPrefix,convertTS",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
            "transforms.dropTopicPrefix.replacement":"$1",
            "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.selectFields.whitelist": "mem_id,mem_mobile,mem_fname,mem_email,mem_gender,mem_dob,mem_marital_status,mem_state,mem_city,mem_zip,mem_primary_lang,mem_created_on,mem_updated_on,mem_tot_ttt,transfer_count,first_transfer_date,last_transfer_date,dnd_status",
            "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.renameFields.renames": "mem_fname:mem_name,mem_primary_lang:mem_primary_language,mem_tot_ttt:mem_total_talktime,transfer_count:mem_transfer_count,first_transfer_date:mem_first_transferred_on,last_transfer_date:mem_last_transferred_on,dnd_status:mem_dnd_status",
            "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicPrefix.regex":"(.*)",
            "transforms.addTopicPrefix.replacement":"mdt_$1",
            "transforms.convertTS.type"       : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.convertTS.field"      : "mem_created_on,first_transfer_date,last_transfer_date",
            "transforms.convertTS.format"     : "YYYY-MM-dd H:mm:ss",
            "transforms.convertTS.target.type": "unix"
        
    '

连接器 2:同一 db 的 Debezium 源连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
    
        "name": "mysql5-connector2",
        "config": 
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "2",
            "database.hostname": "13.232.63.40",
            "database.port": "3307",
            "database.user": "root",
            "database.password": "secret",
            "database.server.id": "11",
            "database.server.name": "dbserver",
            "database.whitelist": "indianmo_imc_new",
            "table.whitelist": "indianmo_imc_new.associate_leads_cdc",
            "database.history.kafka.bootstrap.servers": "kafka:29092",
            "database.history.kafka.topic": "mysql5table",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "transforms": "unwrap,dropTopicPrefix,selectFields,renameFields,addTopicPrefix,convertTS",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
            "transforms.dropTopicPrefix.replacement":"$1",
            "transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.selectFields.whitelist": "AL_Id,A_Id,L_Id,cityID,rtitle,lead_price,selling_price,lead_trans_type,R_Id,rank,bought_by,bought_ip,leadSentBy,pushed_date,pushed_on,tbSendDate,tbSendOn",
            "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.renameFields.renames": "AL_Id:al_id,A_Id:associate_id,L_Id:mem_id,cityID:city_id,rtitle:product_id,lead_price:lead_actual_price,selling_price:lead_selling_price,lead_trans_type:lead_type_msql5,R_Id:requirement_id,rank:lead_rating,bought_by:lead_bought_by,bought_ip:lead_bought_ip,leadSentBy:lead_sent_by,pushed_date:lead_pushed_date,pushed_on:lead_pushed_on,tbSendDate:lead_sold_date,tbSendOn:lead_sold_on",
            "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicPrefix.regex":"(.*)",
            "transforms.addTopicPrefix.replacement":"ldt_lm_$1",
            "transforms.convertTS.type"       : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
            "transforms.convertTS.field"      : "pushed_on,tbSendOn",
            "transforms.convertTS.format"     : "YYYY-MM-dd H:mm:ss",
            "transforms.convertTS.target.type": "unix"
        
    '

Sink 连接器(这样我需要下沉到 8 个表中)

curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
    -H "Content-Type: application/json" -d '
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://65.0.213.250:3306/demo",
        "topics": "ldt_lm_associate_leads_cdc",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "connection.user": "root",
        "connection.password": "secret",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "delete.enabled": true,
        "pk.mode": "record_key",
        "pk.fields": "al_id",
        "transforms": "RenameKey",
        "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
        "transforms.RenameKey.renames": "AL_Id:al_id"
    '

我从 kafka connect 得到的错误是

2021-02-10 13:14:45,326] INFO [mysql5-connector2|task-0] Connector task finished all work and is now shutdown (io.debezium.connector.mysql.MySqlConnectorTask:496)

我在这里有一位经纪人。 如果我使用多个经纪人,我的问题会解决吗? 多个代理的 docker yml 配置是什么? 我在源数据库中有多个表。我想全部沉没。为此,我只需要多个连接器。但是,每当我运行多个源连接器时,它都会停止以前的连接器(仅使用两个源连接器并且出现问题,我至少需要 8 个源连接器和 8 个接收器连接器)。我该怎么办请帮忙。提前致谢!

【问题讨论】:

【参考方案1】:

task.max 应该设置为 1 而不是更多

https://debezium.io/documentation/reference/connectors/mysql.html#mysql-connector-properties

【讨论】:

以上是关于多个 Debezium Source 连接器一次不工作的主要内容,如果未能解决你的问题,请参考以下文章

Debezium Kafka 连接器 mongodb:将 kafka 连接器连接到 mongodb 时出错

如何为 mysql 数据库创建多个 Debezium 连接器

为数据库中的多个表配置 debezium 连接器

一个 Mysql DB 的多个 debezium 连接器

Debezium 将 Avro 数据视为二进制

在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器