如何在将记录异步放入运动流中时确保排序?

Posted

技术标签:

【中文标题】如何在将记录异步放入运动流中时确保排序?【英文标题】:How to ensure ordering while putting records in kinesis stream asynchronously? 【发布时间】:2018-04-18 17:28:17 【问题描述】:

我正在编写一个应用程序,它读取 mysql bin 日志并将更改推送到 Kinesis 流中。我的用例需要在我使用 putrecord 操作而不是 putrecordskinesis 流中对 mysql 事件进行完美排序,并且还包括 ' SequenceNumberForOrdering' 键。但是仍然存在一个故障点,即重试逻辑。作为一个 async 函数(使用 aws 的 js sdk),我如何确保在对 kinesis 的写入操作过程中出现故障时的顺序。

阻塞写入(阻塞事件循环直到接收到 put 记录的回调)是一个太糟糕的解决方案吗?还是有更好的办法?

【问题讨论】:

【参考方案1】:

在向流中添加记录时不要尝试强制排序,而是在读取记录时对其进行排序。在您的用例中,每个 binlog 条目都有唯一的文件序列、起始位置和结束位置。因此,对它们进行排序并找出任何差距是微不足道的。

如果您在阅读时确实发现了空白,消费者将不得不等到它们被填满。但是,假设没有灾难性故障,流中的所有记录都应该彼此靠近,因此缓冲量应该是最小的。

通过在生产者端强制执行排序,您将整体吞吐量限制为写入单个记录的速度。如果您可以跟上实际的数据库更改,那就没问题了。但是,如果你跟不上,即使消费者的负载可能很轻,你也会在管道中遇到越来越大的延迟。

此外,您只能在单个分片内强制执行订单,因此如果您的生产者需要摄取超过 1 MB/秒(或 > 1,000 条记录/秒)的数据,那么您就不走运了(根据我的经验,唯一的达到 1,000 条记录/秒的方法是通过 PutRecords;如果您一次写入一条记录,您将获得大约 20-30 条请求/秒)。

【讨论】:

谢谢,是的,这会起作用。基本上,我必须在生产者端维护一个不断增加的标识符,我需要将其附加到数据记录中,因为如果我只依赖 bin 日志文件序列和开始 pos 来检测数据记录之间的差距,然后它不会做,因为 mysql bin 日志偏移量之间已经有差距。【参考方案2】:

如果你想要完美的排序,那么你需要确保在插入下一个之前插入每个事件,所以是的,你必须等到一个 put 请求完成后再执行下一个。问题是您是否真的需要对所有事件进行完美排序,或者是否需要在某个子集中进行完美排序?因为您使用的是关系数据库,所以同一个表中的行之间不太可能存在关系。您更有可能在表之间的行之间存在关系,因此您可以使用一些技巧来利用批量放置请求。

批量放置请求的问题在于它在请求中是无序的。因为 bin 日志为您提供更改后行的完整图像,您实际上只关心 bin 日志中每个主键的最新条目,因此您可以做的是从bin log,应该是按时间排序的,按主键分组,然后只取binlog记录中的after_values图像作为每个主键组的最新记录。然后,您可以安全地对这些记录中的每一个使用批量放置请求,并确保您不会在该密钥的最新记录之前意外地将给定密钥的陈旧记录放入流中。

这并不适用于所有情况,但在许多 CDC (https://en.wikipedia.org/wiki/Change_data_capture) 设置中,这足以将数据准确地复制到其他系统中。

假设你的 bin 日志中有以下记录(格式取自 https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/):

"table": "Users", "row": "values": "id": 1, "Name": "Foo User", "idUsers": 123, "type": "WriteRowsEvent", "schema": "kinesistest"
"table": "Users", "row": "before_values": "id": 1", "Name": "Foo User", "idUsers": 123, "after_values": "id": 1, "Name": "Bar User", "idUsers": 123, "type": "UpdateRowsEvent", "schema": "kinesistest"
"table": "Users", "row": "values": "id": 2, "Name": "User A", "idUsers": 123, "type": "WriteRowsEvent", "schema": "kinesistest"
"table": "Users", "row": "before_values": "id": 1", "Name": "Bar User", "idUsers": 123, "after_values": "id": 1, "Name": "Baz User", "idUsers": 123, "type": "UpdateRowsEvent", "schema": "kinesistest"
"table": "Users", "row": "values": "id": 3, "Name": "User C", "idUsers": 123, "type": "WriteRowsEvent", "schema": "kinesistest"

在此示例中,主键 id 标识了三行。 id=1 的行被插入然后更新两次,id=2 的行被插入,id=3 的行被插入。您需要分别处理每种类型的事件(写入、更新、删除),并且只收集每个 id 的最新状态。因此,对于写入,您将使用values 作为该行,对于更新,您将使用after_values 作为该行,对于deletes,您将该行放入一批删除中。在此示例中,唯一重要的三个条目是:

"table": "Users", "row": "values": "id": 2, "Name": "User A", "idUsers": 123, "type": "WriteRowsEvent", "schema": "kinesistest"
"table": "Users", "row": "before_values": "id": 1", "Name": "Bar User", "idUsers": 123, "after_values": "id": 1, "Name": "Baz User", "idUsers": 123, "type": "UpdateRowsEvent", "schema": "kinesistest"
"table": "Users", "row": "values": "id": 3, "Name": "User B", "idUsers": 123, "type": "WriteRowsEvent", "schema": "kinesistest"

这是因为它们是每个 id 的最新版本。您可以对包含这三个写入的批处理使用批量放置,而不必担心它们在大多数情况下会出现故障,除非您在单个表中的条目之间存在相互依赖关系或其他一些非常具体的要求。

如果您有删除,您只需将它们放在单独的批量删除中,然后在批量放置记录之后执行。在过去,我通过执行此压缩和批处理过程看到了非常好的吞吐量改进。但同样,如果您确实需要阅读 每个 事件,而不仅仅是将最新数据复制到其他各种商店,那么这可能行不通。

【讨论】:

感谢 Brian 的回答,但正如您所说,这在我们的用例中不起作用,因为我们需要捕获 mysql 的每个事件,因为我们也在执行其他处理任务,例如。我们正在维护 mysql 表中唯一电话号码的哈希,因此我们需要删除更新事件的“之前”键中的手机号码并添加“之后”键。 另外,在您的解决方案中,如果一个批次的批量放置请求失败(包含 id 1 的数据),然后另一个请求(包含 id 1 的新数据)成功,如何我的重试逻辑能否确保只有 id 1 的最新数据被推送到 kinesis 流中?【参考方案3】:

我能够通过使用内部 FIFO 队列实现完美排序。我将每个事件推送到 FIFO 队列中,该队列正在被一个递归函数读取,该函数将事件推送到 Kinesis 流中(一次一个)。我还在每次成功的 putRecord 操作时将 bin 日志偏移量存储在外部存储器(在我的情况下为 redis)中,如果对 kinesis 的任何写入失败,我可以重新启动服务器并从最后一次成功的偏移量值开始再次读取。

对此解决方案或其他解决方案的任何建议将不胜感激。

这是我从 fifo 队列中读取的递归函数的代码 sn-p。

const fetchAndPutEvent = () => 
let currentEvent = eventQueue.shift(); // dequeue from the fifo queue

if (currentEvent) 
    currentEvent = JSON.parse(currentEvent);
    // put in the kinesis stream with sequence number of last putRecord operation to achieve ordering of events
    return kinesis.putRecord(currentEvent, sequenceNumber, (err, result) => 
        if (err) 
            // in case of error while putting in kinesis stream kill the server and replay from the last successful offset
            logger.fatal('Error in putting kinesis record', err);
            return setTimeout(() => 
                process.exit(0);
            , 10000);
        
        try 
            //store the binlog offset and kinesis sequence number in an external memory
            sequenceNumber = result.SequenceNumber;
            let offsetObject = 
                binlogName: currentEvent.currentBinlogName,
                binlogPos: currentEvent.currentBinlogPos,
                sequenceNumber: sequenceNumber
            ;
            redisClient.hmset(redisKey, offsetObject);
        
        catch (ex) 
            logger.fatal('Exception in putting kinesis record', ex);
            setTimeout(function() 
                process.exit(0);
            , 10000);
        
        return setImmediate(function() 
            return fetchAndPutEvent();
        );
    );

else 
    // in case of empty queue just recursively call the function again
    return setImmediate(function() 
        return fetchAndPutEvent();
    );

;

【讨论】:

以上是关于如何在将记录异步放入运动流中时确保排序?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用运动检测触发相机?

如何在将无限的文本流写入文件之前对其进行预处理?

Packery 在将项目垂直拖放到可排序网格中时,垂直对齐中断

Python 中的 Bigquery:如何将查询结果放入表中?

如何扇出 AWS 运动流?

如何在将JSON文件加载到BigQuery表中时管理/处理架构更改