使用 Spring Data 中的 CassandraRepository 为 Cassandra 实现分页的正确方法

Posted

技术标签:

【中文标题】使用 Spring Data 中的 CassandraRepository 为 Cassandra 实现分页的正确方法【英文标题】:Correct way to implement paging for Cassandra with CassandraRepository from Spring Data 【发布时间】:2019-03-02 07:14:29 【问题描述】:

我正在寻找一种解决方案,以使用 Cassandra(版本 3.11.3)数据库为基于 Spring Boot 的 REST 服务实现分页。我们使用 Spring Boot 2.0.5.RELEASEspring-boot-starter-data-cassandra 作为依赖项。

由于 Spring Data 的 CassandraRepository<T, ID> 接口没有扩展 PagingAndSortingRepository,我们无法获得像 JPA 那样的完整分页功能。

我阅读了 Spring Data Cassandra 文档,并且可以找到一种可能的方法来使用 Cassandra 和 Spring Data 实现分页,因为CassandraRepository 接口具有以下可用方法Slice<T> findAll(Pageable pageable);。我知道 Cassandra 无法获取特定页面 adhoc 并且总是需要页面零来遍历所有页面,因为它记录在 CassandraPageRequest:

Cassandra 特定的 @link PageRequest 实现提供对 @link PagingState 的访问。此类允许创建第一个页面请求,并表示通过 Cassandra 分页基于获取页面的进度并允许仅向前导航。 访问特定页面需要获取所有页面,直到到达所需页面。

在我的用例中,我们有 > 1.000.000 个数据库条目,并希望在我们的单页应用程序中分页显示它们。

我目前的方法如下所示:

@RestController
@RequestMapping("/users")
public class UsersResource 

  @Autowired
  UserRepository    userRepository;

  @GetMapping
  public ResponseEntity<List<User>> getAllTests(
            @RequestParam(defaultValue = "0", name = "page") @Positive int requiredPage, 
            @RequestParam(defaultValue = "500", name = "size") int size) 

    Slice<User> resultList = userRepository.findAll(CassandraPageRequest.first(size));

    int currentPage = 0;

    while (resultList.hasNext() && currentPage <= requiredPage) 
      System.out.println("Current Page Number: " + currentPage);
      resultList = userRepository.findAll(resultList.nextPageable());
      currentPage++;
    

    return ResponseEntity.ok(resultList.getContent());
  

但是 使用这种方法,我必须找到请求的页面,同时将所有数据库条目提取到内存并迭代,直到找到正确的页面。是否有其他方法可以找到正确的页面,还是我必须使用当前的解决方案?

我的 Cassandra 表定义如下所示:

CREATE TABLE user (
  id int, firstname varchar, 
  lastname varchar, 
  code varchar, 
  PRIMARY KEY(id)
);

【问题讨论】:

【参考方案1】:

我所做的是创建一个包含内容和 pagingState 哈希的页面对象。

在初始页面中,我们有简单的分页

Pageable pageRequest = CassandraPageRequest.of(0,5);

一旦执行查找,我们就会得到切片

Slice&lt;Group&gt; slice = groupRepository.findAll(pageRequest);

通过切片可以得到分页状态

page.setPageHash(getPageHash((CassandraPageRequest) slice.getPageable()));

在哪里

private String getPageHash(CassandraPageRequest pageRequest) return Base64.toBase64String(pageRequest.getPagingState().toBytes());

最终返回一个 Page 对象,其 List 内容和 pagingState 为 pageHash

【讨论】:

【参考方案2】:

请参阅下面的代码。它可能会有所帮助。

    @GetMapping("/loadData")
    public Mono<DataTable> loadData(@RequestParam boolean reset, @RequestParam(required = false) String tag, WebSession session) 
        final String sessionId = session.getId();
        IMap<String, String> map = Context.get(HazelcastInstance.class).getMap("companygrouping-pageable-map");
        int pageSize = Context.get(EnvProperties.class).getPageSize();
        Pageable pageRequest;

        if (reset)
            map.remove(sessionId);

        String serializedPagingState = map.compute(sessionId, (k, v) -> (v == null) ? null : map.get(session.getId()));

        pageRequest = StringUtils.isBlank(serializedPagingState) ? CassandraPageRequest.of(0, pageSize)
                : CassandraPageRequest.of(PageRequest.of(0, pageSize), PagingState.fromString(serializedPagingState)).next();

        Mono<Slice<TagMerge>> sliceMono = StringUtils.isNotBlank(tag)
                ? Context.get(TagMergeRepository.class).findByKeyStatusAndKeyTag(Status.NEW, tag, pageRequest)
                : Context.get(TagMergeRepository.class).findByKeyStatus(Status.NEW, pageRequest);

        Flux<TagMerge> flux = sliceMono.map(t -> convert(t, map, sessionId)).flatMapMany(Flux::fromIterable);
        Mono<DataTable> dataTabelMono = createTableFrom(flux).doOnError(e -> log.error("", e));
        if (reset) 
            Mono<Long> countMono = Mono.empty();
            if (StringUtils.isNotBlank(tag))
                countMono = Context.get(TagMergeRepository.class).countByKeyStatusAndKeyTag(Status.NEW, tag);
            else
                countMono = Context.get(TagMergeRepository.class).countByKeyStatus(Status.NEW);
            dataTabelMono = dataTabelMono.zipWith(countMono, (t, k) -> 
                t.setTotalRows(k);
                return t;
            );
        
        return dataTabelMono;
    

private List<TagMerge> convert(Slice<TagMerge> slice, IMap<String, String> map, String id) 
        PagingState pagingState = ((CassandraPageRequest) slice.getPageable()).getPagingState();
        if (pagingState != null)
            map.put(id, pagingState.toString());
        return slice.getContent();
    


【讨论】:

【参考方案3】:

Cassandra 支持前向分页,这意味着您可以获取前 n 行,然后可以获取 n+1 和 2n 之间的行,依此类推,直到数据结束,但您不能直接获取 n+1 和 2n 之间的行。

【讨论】:

以上是关于使用 Spring Data 中的 CassandraRepository 为 Cassandra 实现分页的正确方法的主要内容,如果未能解决你的问题,请参考以下文章

如何忽略 spring-boot-cassandra 默认配置来加载 cassandra 连接实例

基于spring boot的项目中的spring data mongodb配置

使用 Eclipse Scala IDE 中的 spring-data 注入测试 playframework 2.4

使用Spring Data防止MongoDB中的重复(Spring Roo)

使用 Spring Data JPA 的服务层中的 Crud 方法

使用 Spring Data 保存在 Redis 中的值具有奇怪的前缀