在 Cassandra 中使用轻量级事务 (CAS) 时,我们如何避免丢失写入?
Posted
技术标签:
【中文标题】在 Cassandra 中使用轻量级事务 (CAS) 时,我们如何避免丢失写入?【英文标题】:How can we avoid losing writes when using lightweight transactions (CAS) in Cassandra? 【发布时间】:2014-12-05 10:05:56 【问题描述】:我正在对 Cassandra 进行一些测试,看看我们是否可以将它用于支持乐观并发的可扩展键值存储。
由于键值对存储只需要一个表,并且每一项都是通过键来访问的,看来lightweight transactions可以很容易地为我们的问题提供技术基础。
但是,在运行 a test which does a number of concurrent updates 时(只要检测到并发就会重试),我们看到我们丢失了写入。
测试创建表:
CREATE TABLE objects (key text, version int, PRIMARY KEY(key));
并使用以下方法插入多个键:
INSERT INTO objects (key, version) VALUES (?, 0) IF NOT EXISTS;
然后使用 CAS 操作将这些项目的版本递增多次:
-- client retrieves the current version
SELECT version FROM objects WHERE key = ?;
-- and updates the item using the retrieved version as version check
UPDATE objects SET version = ? WHERE key = ? IF version = ?;
更新后的客户端代码实际上是这样的:
private async Task<bool> CompareAndSet(string key, int currrentCount, PreparedStatement updateStatement)
// increment the version
IStatement statement = updateStatement.Bind(currrentCount + 1, key, currrentCount);
// execute the statement
RowSet result = await Session.ExecuteAsync(statement);
// check the result
Row row = result.GetRows().SingleOrDefault();
if (row == null)
throw new Exception("No row in update result.");
// check if the CAS operation was applied or not
return row.GetValue<bool>("[applied]");
如您所见,CAS 操作因并发而无法应用。因此,将重试此操作,直到成功。还处理写超时异常。 The rationale behind handling the write timeout exceptions is explained here.
private async Task Update(string key, PreparedStatement selectStatement, PreparedStatement updateStatement)
bool done = false;
// try update (increase version) until it succeeds
while (!done)
// get current version
TestItem item = null;
while (item == null)
item = await GetItem(key, selectStatement);
try
// update version using lightweight transaction
done = await CompareAndSet(key, item.Version, updateStatement);
// lightweight transaction (CAS) failed, because compare failed --> simply not updated
if (!done)
Interlocked.Increment(ref abortedUpdates);
catch (WriteTimeoutException wte)
// partial write timeout (some have been updated, so all must be eventually updated, because it is a CAS operation)
if (wte.ReceivedAcknowledgements > 0)
Interlocked.Increment(ref partialWriteTimeouts);
done = true;
else
// complete write timeout --> unsure about this one...
Interlocked.Increment(ref totalWriteTimeouts);
这是使用 100 个项目并更新每个项目 10 次的测试的输出:
Running test with 100 items and 10 updates per item.
Number of updates: 1000
Number of aborted updates due to concurrency: 3485
Number of total write timeouts: 18
Number of partial write timeouts: 162
LOST WRITES: 94 (or 9,40%)
Results:
Updates | Item count
10 | 35
9 | 43
8 | 17
7 | 3
6 | 2
Xunit.Sdk.EqualExceptionAssert.Equal() Failure
Expected: 0
Actual: 94
如您所见,这是一个高度并发的测试(查看必须重试更新的中止操作的数量)。 但是,坏消息是我们正在丢失写入。客户端认为应该执行 1000 次更新,但在这种情况下丢失了 94 次写入。
丢失的写入次数是写入超时次数的数量级。所以,它们似乎是联系在一起的。问题是:
我们是否需要以更好的方式处理超时异常? 在 Cassandra 上执行 CAS 操作时,有没有办法避免丢失写入?【问题讨论】:
这看起来像是一张出色的 JIRA 票证 - Cassandra 版本、使用的 JVM 和 DEBUG system.log 也可能会有所帮助 :) issues.apache.org/jira/browse/CASSANDRA 好的,首先我们将使用最新的 Cassandra 版本进行测试(到目前为止我们使用的是 2.0.9)。 使用 2.1.2 版本的问题是一样的。 我为此创建了一个 JIRA 票证:issues.apache.org/jira/browse/CASSANDRA-8446 【参考方案1】:WriteTimeoutException 表示 Cassandra 无法及时执行操作。通过您的测试,您将 Cassandra 置于繁重的负载下,任何操作都可能因超时异常而失败。因此,您需要做的是重做您的操作并通过反复尝试从问题中恢复。它类似于 SQLTimeoutException。你也需要防御它。
【讨论】:
无法从 WriteTimeoutException 判断 LWT 是否真的会成功?这是issues.apache.org/jira/browse/CASSANDRA-9328 吗? 这个问题是另一回事,如果您的公司的成功是基于数据完整性的,那么这个问题应该会阻止您使用 Cassandra,并且是一些人在某些情况下仍然使用关系数据库的原因,即使这些情况不规模。这个错误甚至使我的建议无效。无论如何,您都不应该重做交易。 所以基本上是的,就超时而言,你永远不会知道。在这个 bug 发生之前,发生超时但调用成功的可能性很小。窗口被超时击中,但有未完成的 ACK。这就是为什么我们过去将所有分片都放在本地并且超时时间合理的高。所以基本上除了争用和系统损坏之外没有机会超时,如果你重试就会发现。 问题还在于,如果您有写入超时,则其他副本可能会赶上那些已经接受该值的副本。重复写入值就像根据需要将其播种到尽可能多的分片中。这就是为什么我们将 Cassandra 用于媒体数据和日志,但不用于我们需要 100% 正确信任的数据。 您能否执行串行读取以找出实际发生的情况,或者 LWT 本质上是否已损坏?以上是关于在 Cassandra 中使用轻量级事务 (CAS) 时,我们如何避免丢失写入?的主要内容,如果未能解决你的问题,请参考以下文章
com.datastax.oss -> java-driver-core 和 com.datastax.cassandra -> cassandra-driver-core 之间的 Cas
[数据库事务与锁]详解八:底理解数据库事务乐观锁的一种实现方式——CAS