Docker,Debezium 不会将数据从 mssql 流式传输到 elasticsearch

Posted

技术标签:

【中文标题】Docker,Debezium 不会将数据从 mssql 流式传输到 elasticsearch【英文标题】:Docker, Debezium not streaming data from mssql to elasticsearch 【发布时间】:2021-09-02 07:20:39 【问题描述】:

我按照这个例子将数据从 mysql 流式传输到 elasticsearch https://github.com/debezium/debezium-examples/tree/master/unwrap-smt#elasticsearch-sink 该示例本身在我的本地机器上运行良好。

但就我而言,我想将数据从 mssql(位于另一台服务器,而不是 docker)流式传输到 elasticsearch。

所以在“docker-compose-es.yaml”文件中,我删除了“mysql”部分并删除了 mysql 链接。 并为 elastic 和 mssql 创建了我自己的连接器/接收器:


    "name": "Test-connector", 
    "config": 
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "database.hostname": "192.168.1.234", 
        "database.port": "1433", 
        "database.user": "user", 
        "database.password": "pass", 
        "database.dbname": "Test", 
        "database.server.name": "MyServer",
        "table.include.list": "dbo.TEST_A",
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.testA"
    


    "name": "elastic-sink-test",
    "config": 
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "TEST_A",
        "connection.url": "http://localhost:9200/",
        "transforms": "unwrap,key",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",    
        "transforms.unwrap.drop.tombstones": "false",    
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "SQ",                                                 
        "key.ignore": "false",                                                        
        "type.name": "TEST_A",
        "behavior.on.null.values": "delete"                                                     
    

添加这些时,kafka 连接 I/O 正在努力工作并且有超过 40GB 的输入,请参见下图:

在 kafka 日志中,它看起来像是遍历了所有表。这是表日志之一:

2021-06-17 10:20:10,414 - INFO [data-plane-kafka-request-handler-5:Logging@66] - [Partition MyServer.dbo.TemplateGroup-0 broker=1] Log loaded for partition MyServer.dbo.TemplateGroup-0 with initial high watermark 0
2021-06-17 10:20:10,509 - INFO [data-plane-kafka-request-handler-3:Logging@66] - Creating topic MyServer.dbo.TemplateMeter with configuration  and initial partition assignment Map(0 -> ArrayBuffer(1))
2021-06-17 10:20:10,516 - INFO [data-plane-kafka-request-handler-3:Logging@66] - [KafkaApi-1] Auto creation of topic MyServer.dbo.TemplateMeter with 1 partitions and replication factor 1 is successful
2021-06-17 10:20:10,526 - INFO [data-plane-kafka-request-handler-7:Logging@66] - [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(MyServer.dbo.TemplateMeter-0)
2021-06-17 10:20:10,528 - INFO [data-plane-kafka-request-handler-7:Logging@66] - [Log partition=MyServer.dbo.TemplateMeter-0, dir=/kafka/data/1] Loading producer state till offset 0 with message format version 2

数据库只有 2GB。我不知道为什么它有这么高的输入。

运行此命令时,在 elasticsearch 中未创建 test_a 索引: curl http://localhost:9200/_aliases?pretty=true

有人知道我如何从这里排除故障或指出正确的方向吗?

提前致谢!

【问题讨论】:

【参考方案1】:

我如何从这里进行故障排除

docker compose logs?

修改 Kafka Connect 和/或 Elasitcsearch 进程的 log4j.properties 以获得更多日志?

使用普通的 Kafka 消费者来查看数据是否真的被读入了 TEST_A 主题?

在“docker-compose-es.yaml”中......

如果 Debezium 在容器中运行,则 Elasticsearch 在 localhost:9200 不可​​用

将该值更改为http://elastic:9200、like shown in the es-sink.json

【讨论】:

太棒了!将 url 更改为elastic:9200,elasticsearch 创建了索引 test_a。但它没有用文档填充索引。运行此命令 curl "localhost:9092/topics" 不返回任何主题是正确的还是我应该创建一个主题? 9092端口上的Kafka不是HTTP服务器,所以我不明白你的问题。 默认情况下,源连接器应该创建一个主题,但当然,欢迎您在创建连接器之前创建一个主题。但是,您不会使用 curl 来执行此操作或检查它 我设法通过以下方式列出了主题: docker exec -it unwrap-smt_kafka_1 bash ./bin/kafka-topics.sh --list --zookeeper zookeeper:2181 我试图检查主题是否包含任何消息: ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_a --from-beginning 但收到此错误:Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) 任何想法,我如何修复代理? 我发现它应该是./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test_a --from-beginning 从那里我看到了来自我的测试数据库中所有表的很多主题,即使我有"table.include.list": "dbo.TEST_A" 我想知道为什么?除此之外,似乎“elastic-sink-test”创建了“test_a”主题,而“Test-connector”创建了“MyServer.dbo.TEST_A”,所以我将弹性连接器主题更改为“MyServer.dbo.TEST_A”并更新了我的 ES 数据库!

以上是关于Docker,Debezium 不会将数据从 mssql 流式传输到 elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Debezium从MS SQL中将250张表导入Kafka

Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题

如何在 Docker 中将 Debezium 连接到 MongoDB?

CDC 与 docker 中的 debezium

Debezium MS SQL Server 连接器问题

如何从 Debezium 创建的 avro 消息中获取字段?