以反应方式从流中删除已经在数据库中的对象

Posted

技术标签:

【中文标题】以反应方式从流中删除已经在数据库中的对象【英文标题】:Remove objects from a stream which are already in a database in a reactive manner 【发布时间】:2019-06-21 02:17:42 【问题描述】:

我正在扫描目录中的文件,然后处理结果。我想在进一步处理之前从数据存储中已有的扫描结果中删除文件。

尝试使用响应式 mongodb 以响应式方式执行此操作。我只是不确定如何以使用数据库查询结果的方式实现过滤器。

@Override
public Flux<File> findFiles(Directory directory) 

    // Only get these file types as we can't process anything else
    final Predicate<Path> extensions = path ->
            path.toString().endsWith(".txt") ||
                    path.toString().endsWith(".doc") ||
                    path.toString().endsWith(".pdf");

    final Set<File> files = fileService.findAll(Paths.get(directory.getPath()), extensions);

    final Stream<Video> fileStream = files
            .stream()
            .map(this::convertFileToDocument)

            // This is wrong (doesn't compile for a start), but how do I do something similar or of this nature? 
            .filter(file -> fileRepository.findById(file.getId()));

    return Flux.fromStream(fileStream);

convertFileToDocument 只是将文件映射到 POJO,那里没有什么好玩的。

如何根据findById 的结果添加过滤器,或者有更好的方法来实现这一点?

【问题讨论】:

【参考方案1】:

如果fileRepository.findById 返回单声道,我建议您将流转换为通量,然后使用filterWhen 进行过滤;检查 Mono 是否有元素。类似的东西

final Stream<Video> fileStream = files
        .stream()
        .map(this::convertFileToDocument);
return Flux.fromStream(fileStream).filterWhen(file -> fileRepository.findById(file.getId()).hasElement().map(b -> !b));

这将过滤掉所有为findById 返回非空 Mono 或存在于数据库中的文件。如果我误解了什么,请告诉我。

【讨论】:

我认为这几乎是正确的,我只需要反转它...我想从数据库中的fileStream 中删除所有元素。据我所知,这仍然是处理数据库中已经存在的元素。除非我执行不正确。 我已更改我的答案,将 hasElement 中的 Mono 反转,因此它会过滤掉数据库中已有的所有文件 冠军得主。谢谢大家!

以上是关于以反应方式从流中删除已经在数据库中的对象的主要内容,如果未能解决你的问题,请参考以下文章

连续从流中读取?

从流中删除 AtomicInteger 计数器

WPF中的MediaElement从流中播放视频?

Jackson ObjectWriter 仅从流中写入第一个条目

NAudio在改变音高而不是文件时寻找一种从流中读取的方法

cin作为判断条件时