多个 Debezium Source 连接器一次不工作
Posted
技术标签:
【中文标题】多个 Debezium Source 连接器一次不工作【英文标题】:Multiple Debezium Source connectors not working at a time 【发布时间】:2021-05-14 04:50:00 【问题描述】:Docker 编写
kafka:
image: confluentinc/cp-enterprise-kafka:6.0.0
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_LOG_RETENTION_MS: 100000000
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 5000
连接器 1:Debezium 源连接器(像这样我需要 8 个连接器用于 8 个表)
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
"name": "mysql5-mdmembers",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "10",
"database.hostname": "13.232.63.40",
"database.port": "3307",
"database.user": "root",
"database.password": "secret",
"database.server.id": "11",
"database.server.name": "dbserver",
"database.whitelist": "indianmo_imc_new",
"table.whitelist": "indianmo_imc_new.md_members_cdc",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "mysql5_md_members",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap,dropTopicPrefix,selectFields,renameFields,addTopicPrefix,convertTS",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.selectFields.whitelist": "mem_id,mem_mobile,mem_fname,mem_email,mem_gender,mem_dob,mem_marital_status,mem_state,mem_city,mem_zip,mem_primary_lang,mem_created_on,mem_updated_on,mem_tot_ttt,transfer_count,first_transfer_date,last_transfer_date,dnd_status",
"transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames": "mem_fname:mem_name,mem_primary_lang:mem_primary_language,mem_tot_ttt:mem_total_talktime,transfer_count:mem_transfer_count,first_transfer_date:mem_first_transferred_on,last_transfer_date:mem_last_transferred_on,dnd_status:mem_dnd_status",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mdt_$1",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "mem_created_on,first_transfer_date,last_transfer_date",
"transforms.convertTS.format" : "YYYY-MM-dd H:mm:ss",
"transforms.convertTS.target.type": "unix"
'
连接器 2:同一 db 的 Debezium 源连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
"name": "mysql5-connector2",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "2",
"database.hostname": "13.232.63.40",
"database.port": "3307",
"database.user": "root",
"database.password": "secret",
"database.server.id": "11",
"database.server.name": "dbserver",
"database.whitelist": "indianmo_imc_new",
"table.whitelist": "indianmo_imc_new.associate_leads_cdc",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "mysql5table",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap,dropTopicPrefix,selectFields,renameFields,addTopicPrefix,convertTS",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.selectFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.selectFields.whitelist": "AL_Id,A_Id,L_Id,cityID,rtitle,lead_price,selling_price,lead_trans_type,R_Id,rank,bought_by,bought_ip,leadSentBy,pushed_date,pushed_on,tbSendDate,tbSendOn",
"transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames": "AL_Id:al_id,A_Id:associate_id,L_Id:mem_id,cityID:city_id,rtitle:product_id,lead_price:lead_actual_price,selling_price:lead_selling_price,lead_trans_type:lead_type_msql5,R_Id:requirement_id,rank:lead_rating,bought_by:lead_bought_by,bought_ip:lead_bought_ip,leadSentBy:lead_sent_by,pushed_date:lead_pushed_date,pushed_on:lead_pushed_on,tbSendDate:lead_sold_date,tbSendOn:lead_sold_on",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"ldt_lm_$1",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "pushed_on,tbSendOn",
"transforms.convertTS.format" : "YYYY-MM-dd H:mm:ss",
"transforms.convertTS.target.type": "unix"
'
Sink 连接器(这样我需要下沉到 8 个表中)
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
-H "Content-Type: application/json" -d '
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://65.0.213.250:3306/demo",
"topics": "ldt_lm_associate_leads_cdc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.user": "root",
"connection.password": "secret",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "upsert",
"delete.enabled": true,
"pk.mode": "record_key",
"pk.fields": "al_id",
"transforms": "RenameKey",
"transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameKey.renames": "AL_Id:al_id"
'
我从 kafka connect 得到的错误是
2021-02-10 13:14:45,326] INFO [mysql5-connector2|task-0] Connector task finished all work and is now shutdown (io.debezium.connector.mysql.MySqlConnectorTask:496)
我在这里有一位经纪人。 如果我使用多个经纪人,我的问题会解决吗? 多个代理的 docker yml 配置是什么? 我在源数据库中有多个表。我想全部沉没。为此,我只需要多个连接器。但是,每当我运行多个源连接器时,它都会停止以前的连接器(仅使用两个源连接器并且出现问题,我至少需要 8 个源连接器和 8 个接收器连接器)。我该怎么办请帮忙。提前致谢!
【问题讨论】:
【参考方案1】:task.max 应该设置为 1 而不是更多
https://debezium.io/documentation/reference/connectors/mysql.html#mysql-connector-properties
【讨论】:
以上是关于多个 Debezium Source 连接器一次不工作的主要内容,如果未能解决你的问题,请参考以下文章
Debezium Kafka 连接器 mongodb:将 kafka 连接器连接到 mongodb 时出错