这两个并发事务之间的数据如何泄漏?
Posted
技术标签:
【中文标题】这两个并发事务之间的数据如何泄漏?【英文标题】:How can data be leaking between these 2 concurrent transactions? 【发布时间】:2018-01-05 23:44:32 【问题描述】:我有一个项目在 java 8、spring boot 1.5.9、hsqldb 2.4.0、jdbi3-core 3.0.0 上运行。我正在编写测试来验证数据库行为。一组测试检查并发行为。
我最基本的测试是在 2 个事务之间以某种方式泄漏数据,我不知道为什么。我不知道我的测试是否有缺陷,jdbi 是否存在错误,或者 hqsldb 是否从根本上损坏。
连接地址jdbc:hsqldb:$...;create=true;hsqldb.tx=mvlocks;hsqldb.tx_level=serializable
注意 tx=mvlocks 和 tx_level=serializable。设置 tx=mvcc 使 tx_level 成为快照隔离不会改变结果,这更奇怪,因为 snapshots 在任何更改之前肯定不可能互相影响。
测试说明:
-
创建一个包含 1 条记录的表
启动 2 个线程
在每个事务中,同时启动一个新事务(由 CyclicBarrier 同步确保,通过日志输出验证),这样两个 tx 都不能看到对方的更改
在线程 1 中,插入新行(共 2 行)并提交
在线程 2 中,统计行数
我的期望是线程 2 的事务在表中应该只有 1 条记录,因为线程 2 的事务是在线程 1 的事务提交之前启动的,并且隔离级别设置为可序列化。
CyclicBarrier syncLock = new CyclicBarrier(2);
Runnable sync = Unchecked.runnable(() -> syncLock.await(1, TimeUnit.SECONDS));
jdbi.useTransaction(tx ->
Queries queries = tx.attach(Queries.class);
queries.create();
// inserts a row with id 1
queries.insert(1, 1);
);
CompletableFuture<Void> first = CompletableFuture.runAsync(Unchecked.runnable(() ->
jdbi.useTransaction(tx ->
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
LOGGER.info("first tx started");
sync.run();
Queries queries = tx.attach(Queries.class);
queries.insert(2, 2);
);
LOGGER.info("first tx committed");
Thread.sleep(100);
sync.run();
));
CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(Unchecked.supplier(() ->
int out = jdbi.inTransaction(tx ->
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
LOGGER.info("second tx started");
sync.run();
sync.run();
Queries queries = tx.attach(Queries.class);
// counts the number of rows
return queries.count();
);
LOGGER.info("second tx committed");
return out;
));
// capture exceptions from either thread
CompletableFuture.allOf(first, subject).get();
assertThat(subject.get()).isEqualTo(1);
输出:
01:28:16.255 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] main: took <1ms to execute query create table test(id int primary key, foo int) with arguments positional:, named:, finder:[]
01:28:16.257 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] main: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments positional:0:1,1:1, named:foo:1,id:1, finder:[]
01:28:16.313 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-2: second tx started
01:28:16.313 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-1: first tx started
01:28:16.315 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] ForkJoinPool.commonPool-worker-1: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments positional:0:2,1:2, named:foo:2,id:2, finder:[]
01:28:16.315 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-1: first tx committed
01:28:16.317 INFO (SQL) [net.leaumar.samstock.integration.db.Queries] ForkJoinPool.commonPool-worker-2: took <1ms to execute query select count(*) from test with arguments positional:, named:, finder:[]
01:28:16.318 INFO () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-2: second tx committed
org.junit.ComparisonFailure:
Expected :1
Actual :2
这个基本的 jdbc 测试没有显示这种“已提交”的行为。测试为绿色,证明同时启动的可序列化事务即使在提交后也看不到彼此的变化:
@Test
public void transactionsAreIsolated() throws SQLException
@Cleanup
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
statement.execute(CREATE_TABLE);
@Cleanup
Connection c1 = dataSource.getConnection();
@Cleanup
Connection c2 = dataSource.getConnection();
c1.setAutoCommit(false);
c2.setAutoCommit(false);
startTransaction(c1);
startTransaction(c2);
assertThat(count(c1)).isEqualTo(0);
assertThat(count(c2)).isEqualTo(0);
insert(c1, 1);
assertThat(count(c1)).isEqualTo(1);
assertThat(count(c2))
.describedAs("read uncommitted")
.isEqualTo(0);
c1.commit();
assertThat(count(c1)).isEqualTo(1);
assertThat(count(c2))
.describedAs("read committed")
.isEqualTo(0);
【问题讨论】:
在等待屏障之前尝试在“主题”中运行选择 (sync().run()) 【参考方案1】:我在 Jdbi 中复制了您的 JDBC 测试,它可以工作:
@Test
public void transactionsAreIsolated()
try (Handle h1 = jdbi.open();
Handle h2 = jdbi.open())
h1.begin();
h2.begin();
assertThat(count(h1)).isEqualTo(0);
assertThat(count(h2)).isEqualTo(0); // locks h2's txn to the current snapshot
insert(h1, 1, 1);
assertThat(count(h1)).isEqualTo(1);
assertThat(count(h2)).describedAs("read uncommitted").isEqualTo(0);
h1.commit();
assertThat(count(h1)).isEqualTo(1);
assertThat(count(h2)).describedAs("read committed").isEqualTo(0);
h2.rollback();
从测试看来,在您实际与事务中的数据库进行交互之前,事务实际上并未锁定到数据库快照。
在上面的测试中,我们通过h2
观察行数,然后通过h1
插入行。此交互设置了事务快照,这就是您的 JDBC 测试有效的原因。
但是,如果我们修改上述测试以在观察h2
的计数之前结束h1
事务:
@Test
public void transactionsLockToStateWhenObserved()
try (Handle h1 = jdbi.open();
Handle h2 = jdbi.open())
h1.begin();
h2.begin();
insert(h1, 1, 1);
assertThat(count(h1)).isEqualTo(1);
h1.commit();
assertThat(count(h2))
.describedAs("_now_ we're locked to a snapshot")
.isEqualTo(1);
h2.rollback();
您的原始测试有两个同步点(事务已启动,事务 1 已提交),但它需要四个才能完全测试您的场景:
@Test
public void concurrentTransactionsAreIsolated() throws Exception
CyclicBarrier barrier = new CyclicBarrier(2);
Runnable sync = uncheckedRunnable(() -> barrier.await(1, TimeUnit.SECONDS));
jdbi.useTransaction(handle -> insert(handle, 1, 1));
CompletableFuture<Void> first = CompletableFuture.runAsync(uncheckedRunnable(() ->
jdbi.useTransaction(tx ->
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
log.info("first tx started");
sync.run(); // wait for both transactions to start
insert(tx, 2, 2);
log.info("first tx inserted row");
sync.run(); // let the second txn check uncommitted reads
sync.run(); // wait for second txn to check the uncommitted reads
);
log.info("first tx committed");
sync.run(); // transaction closed, let second transaction check committed reads
));
CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(uncheckedSupplier(() ->
int out = jdbi.inTransaction(tx ->
assertThat(tx.isInTransaction()).isTrue();
assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);
log.info("second tx started");
sync.run(); // wait for both transactions to start
sync.run(); // wait for first txn to insert
log.info("second tx checking uncommitted read");
assertThat(count(tx)).isEqualTo(1);
sync.run(); // let the first txn commit
sync.run(); // wait for first txn to commit
log.info("second tx checking committed read");
return count(tx);
);
log.info("second tx committed");
return out;
));
// capture exceptions from either thread
CompletableFuture.allOf(first, subject).get();
assertThat(subject.get()).isEqualTo(1);
【讨论】:
需要一个初始查询来实际开始隔离 - 我从来没有想过它:) 谢谢以上是关于这两个并发事务之间的数据如何泄漏?的主要内容,如果未能解决你的问题,请参考以下文章