通过将单个消息发布到 kafka 来更新 mysql 中的多个字段

Posted

技术标签:

【中文标题】通过将单个消息发布到 kafka 来更新 mysql 中的多个字段【英文标题】:Update multiple fields in mysql by publishing single message to kafka 【发布时间】:2021-04-06 19:13:19 【问题描述】:

我需要通过 kafka connect 将过去 10 天记录的 mysql 'Table1' 中的状态更新为 De Active。 我将如何实现将一条记录发布到 kafka 主题,因为 mysql 提供了在单个查询中执行选择和更新。

【问题讨论】:

提供 = 防止? 在 mysql 中选择和更新都可以在单个查询中进行吗? 在 MySQL 中,不可能在单个查询中执行 SELECT 和 UPDATE。 在 MySQL 中,您可以创建执行任意数量语句的存储过程 - 从客户端应用程序端您将只执行一个 CALL procname(parameters) 查询。 在更新查询里面我们可以指定选择查询对吗? 【参考方案1】:

演示示例。

-- create stored procedure (once)

CREATE PROCEDURE execute_many_queries (queries_text TEXT)
BEGIN
REPEAT
    SET @sql := SUBSTRING_INDEX(queries_text, ';', 1);
    SET queries_text := TRIM(LEADING ';' FROM TRIM(LEADING @sql FROM queries_text));
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DROP PREPARE stmt;
UNTIL queries_text = '' END REPEAT;
END
-- create testing table

CREATE TABLE test (id INT, val INT);
-- execute 3 queries by 1 statement

CALL execute_many_queries ('INSERT INTO test VALUES (1,11), (2,22); UPDATE test SET val = 222 WHERE id = 2; SELECT * FROM test;');
编号 |值 -: | --: 1 | 11 2 | 222
-- execute more 2 queries by 1 statement

CALL execute_many_queries ('UPDATE test SET val = 111 WHERE id = 1; SELECT * FROM test;');
编号 |值 -: | --: 1 | 111 2 | 222

db小提琴here

谨慎使用! SP 中没有检查 - 查询必须没有错误。并且可以注射。

【讨论】:

感谢您的努力。在这里您调用了执行 mysql 查询的过程,而不是我需要通过 kafka 连接器执行此查询 Kafka 连接器自己执行查询,而不是按原样发送到 MySQL?对不起... 我只想知道是否可以通过kafka实现

以上是关于通过将单个消息发布到 kafka 来更新 mysql 中的多个字段的主要内容,如果未能解决你的问题,请参考以下文章

如何将业务数据实时集成到Kafka?

将多个 oracle 表发送到单个 kafka 主题中

kafka 消息的 Thrift 序列化 - 每个结构的单个主题

kafka3-核心概念

《Kafka权威指南》摘抄

如何将 Kafka 消息通过管道传输到 docker 容器中?