这两个并发事务之间的数据如何泄漏?

Posted

技术标签:

【中文标题】这两个并发事务之间的数据如何泄漏?【英文标题】:How can data be leaking between these 2 concurrent transactions? 【发布时间】:2018-01-05 23:44:32 【问题描述】:

我有一个项目在 java 8、spring boot 1.5.9、hsqldb 2.4.0、jdbi3-core 3.0.0 上运行。我正在编写测试来验证数据库行为。一组测试检查并发行为。

我最基本的测试是在 2 个事务之间以某种方式泄漏数据,我不知道为什么。我不知道我的测试是否有缺陷,jdbi 是否存在错误,或者 hqsldb 是否从根本上损坏。

连接地址jdbc:hsqldb:$...;create=true;hsqldb.tx=mvlocks;hsqldb.tx_level=serializable

注意 tx=mvlocks 和 tx_level=serializable。设置 tx=mvcc 使 tx_level 成为快照隔离不会改变结果,这更奇怪,因为 snapshots 在任何更改之前肯定不可能互相影响。

测试说明:

    创建一个包含 1 条记录的表 启动 2 个线程 在每个事务中,同时启动一个新事务(由 CyclicBarrier 同步确保,通过日志输出验证),这样两个 tx 都不能看到对方的更改 在线程 1 中,插入新行(共 2 行)并提交 在线程 2 中,统计行数

我的期望是线程 2 的事务在表中应该只有 1 条记录,因为线程 2 的事务是在线程 1 的事务提交之前启动的,并且隔离级别设置为可序列化。

    CyclicBarrier syncLock = new CyclicBarrier(2);
    Runnable sync = Unchecked.runnable(() -> syncLock.await(1, TimeUnit.SECONDS));

    jdbi.useTransaction(tx -> 
        Queries queries = tx.attach(Queries.class);
        queries.create();
        // inserts a row with id 1
        queries.insert(1, 1);
    );

    CompletableFuture<Void> first = CompletableFuture.runAsync(Unchecked.runnable(() -> 
        jdbi.useTransaction(tx -> 
            assertThat(tx.isInTransaction()).isTrue();
            assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

            LOGGER.info("first tx started");
            sync.run();

            Queries queries = tx.attach(Queries.class);
            queries.insert(2, 2);
        );
        LOGGER.info("first tx committed");

        Thread.sleep(100);

        sync.run();
    ));

    CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(Unchecked.supplier(() -> 
        int out = jdbi.inTransaction(tx -> 
            assertThat(tx.isInTransaction()).isTrue();
            assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

            LOGGER.info("second tx started");
            sync.run();
            sync.run();

            Queries queries = tx.attach(Queries.class);
            // counts the number of rows
            return queries.count();
        );
        LOGGER.info("second tx committed");

        return out;
    ));

    // capture exceptions from either thread
    CompletableFuture.allOf(first, subject).get();

    assertThat(subject.get()).isEqualTo(1);

输出:

01:28:16.255 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] main: took <1ms to execute query create table test(id int primary key, foo int) with arguments  positional:, named:, finder:[]
01:28:16.257 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] main: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments  positional:0:1,1:1, named:foo:1,id:1, finder:[]
01:28:16.313 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-2: second tx started
01:28:16.313 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-1: first tx started
01:28:16.315 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] ForkJoinPool.commonPool-worker-1: took <1ms to execute query insert into test(id, foo) values(:id, :foo) with arguments  positional:0:2,1:2, named:foo:2,id:2, finder:[]
01:28:16.315 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-1: first tx committed
01:28:16.317 INFO  (SQL) [net.leaumar.samstock.integration.db.Queries] ForkJoinPool.commonPool-worker-2: took <1ms to execute query select count(*) from test with arguments  positional:, named:, finder:[]
01:28:16.318 INFO  () [net.leaumar.samstock.integration.db.ConcurrencyTest] ForkJoinPool.commonPool-worker-2: second tx committed

org.junit.ComparisonFailure: 
Expected :1
Actual   :2

这个基本的 jdbc 测试没有显示这种“已提交”的行为。测试为绿色,证明同时启动的可序列化事务即使在提交后也看不到彼此的变化:

@Test
public void transactionsAreIsolated() throws SQLException 
    @Cleanup
    Connection connection = dataSource.getConnection();
    Statement statement = connection.createStatement();
    statement.execute(CREATE_TABLE);

    @Cleanup
    Connection c1 = dataSource.getConnection();
    @Cleanup
    Connection c2 = dataSource.getConnection();

    c1.setAutoCommit(false);
    c2.setAutoCommit(false);

    startTransaction(c1);
    startTransaction(c2);

    assertThat(count(c1)).isEqualTo(0);
    assertThat(count(c2)).isEqualTo(0);

    insert(c1, 1);
    assertThat(count(c1)).isEqualTo(1);
    assertThat(count(c2))
        .describedAs("read uncommitted")
        .isEqualTo(0);

    c1.commit();
    assertThat(count(c1)).isEqualTo(1);
    assertThat(count(c2))
        .describedAs("read committed")
        .isEqualTo(0);

【问题讨论】:

在等待屏障之前尝试在“主题”中运行选择 (sync().run()) 【参考方案1】:

我在 Jdbi 中复制了您的 JDBC 测试,它可以工作:

@Test
public void transactionsAreIsolated() 
  try (Handle h1 = jdbi.open();
       Handle h2 = jdbi.open()) 
    h1.begin();
    h2.begin();

    assertThat(count(h1)).isEqualTo(0); 
    assertThat(count(h2)).isEqualTo(0); // locks h2's txn to the current snapshot

    insert(h1, 1, 1);

    assertThat(count(h1)).isEqualTo(1);
    assertThat(count(h2)).describedAs("read uncommitted").isEqualTo(0);

    h1.commit();

    assertThat(count(h1)).isEqualTo(1);
    assertThat(count(h2)).describedAs("read committed").isEqualTo(0);

    h2.rollback();
  

从测试看来,在您实际与事务中的数据库进行交互之前,事务实际上并未锁定到数据库快照。

在上面的测试中,我们通过h2观察行数,然后通过h1插入行。此交互设置了事务快照,这就是您的 JDBC 测试有效的原因。

但是,如果我们修改上述测试以在观察h2 的计数之前结束h1 事务:

@Test
public void transactionsLockToStateWhenObserved() 
  try (Handle h1 = jdbi.open();
       Handle h2 = jdbi.open()) 
    h1.begin();
    h2.begin();

    insert(h1, 1, 1);

    assertThat(count(h1)).isEqualTo(1);

    h1.commit();

    assertThat(count(h2))
        .describedAs("_now_ we're locked to a snapshot")
        .isEqualTo(1);

    h2.rollback();
  

您的原始测试有两个同步点(事务已启动,事务 1 已提交),但它需要四个才能完全测试您的场景:

@Test
public void concurrentTransactionsAreIsolated() throws Exception 
  CyclicBarrier barrier = new CyclicBarrier(2);
  Runnable sync = uncheckedRunnable(() -> barrier.await(1, TimeUnit.SECONDS));

  jdbi.useTransaction(handle -> insert(handle, 1, 1));

  CompletableFuture<Void> first = CompletableFuture.runAsync(uncheckedRunnable(() -> 
    jdbi.useTransaction(tx -> 
      assertThat(tx.isInTransaction()).isTrue();
      assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

      log.info("first tx started");
      sync.run(); // wait for both transactions to start

      insert(tx, 2, 2);

      log.info("first tx inserted row");
      sync.run(); // let the second txn check uncommitted reads
      sync.run(); // wait for second txn to check the uncommitted reads
    );

    log.info("first tx committed");
    sync.run(); // transaction closed, let second transaction check committed reads
  ));

  CompletableFuture<Integer> subject = CompletableFuture.supplyAsync(uncheckedSupplier(() -> 
    int out = jdbi.inTransaction(tx -> 
      assertThat(tx.isInTransaction()).isTrue();
      assertThat(tx.getTransactionIsolationLevel()).isEqualTo(TransactionIsolationLevel.SERIALIZABLE);

      log.info("second tx started");
      sync.run(); // wait for both transactions to start
      sync.run(); // wait for first txn to insert

      log.info("second tx checking uncommitted read");
      assertThat(count(tx)).isEqualTo(1);

      sync.run(); // let the first txn commit
      sync.run(); // wait for first txn to commit

      log.info("second tx checking committed read");
      return count(tx);
    );
    log.info("second tx committed");

    return out;
  ));

  // capture exceptions from either thread
  CompletableFuture.allOf(first, subject).get();

  assertThat(subject.get()).isEqualTo(1);

【讨论】:

需要一个初始查询来实际开始隔离 - 我从来没有想过它:) 谢谢

以上是关于这两个并发事务之间的数据如何泄漏?的主要内容,如果未能解决你的问题,请参考以下文章

分布式事务实践: 事务的介绍

DataBase事务

弱隔离级别 & 事务并发问题

分布式事务简介

Mysql事务,并发问题,锁机制

数据库的四大特性,以及事务的隔离级别