@Tailable(spring-data-reactive-mongodb) 等效于 spring-data-r2dbc

Posted

技术标签:

【中文标题】@Tailable(spring-data-reactive-mongodb) 等效于 spring-data-r2dbc【英文标题】:@Tailable(spring-data-reactive-mongodb) equivalent in spring-data-r2dbc 【发布时间】:2020-07-04 07:03:54 【问题描述】:

我正在尝试使用 spring-data-r2dbc。我在 Postgresql 上试试这个。我之前尝试过 spring-data-mongodb-reactive 。我忍不住比较了两者。

我发现查询派生还不支持。但我想知道@Tailable 是否有等价物。这样我就可以实时收到数据库更改的通知。任何人都可以分享与此相关的任何代码示例。

我了解底层数据库应该支持这一点。我相信 Postgresql 确实支持这种使用逻辑解码的东西(如果我在这里错了,请纠正我)。

spring-data-r2dbc 中是否有 @Tailable 等效项?

【问题讨论】:

【参考方案1】:

我遇到了同样的问题,不确定您是否找到了解决方案,但我能够通过执行以下操作来完成类似的事情。首先,我在表中添加了触发器

CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

每当更新、删除或插入行时,这都会在表上设置触发器。然后它会调用我设置的触发函数,看起来像这样:

CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

这将允许我从我的 Spring Boot 项目中“收听”任何这些更新,并将整个行作为有效负载发送。 接下来,在我的 Spring Boot 项目中,我配置了与我的数据库的连接。

@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration 
    @Override
    @Bean
    public ConnectionFactory connectionFactory() 
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    

我将其自动装配(依赖注入)到我的服务类的构造函数中,并将其转换为 r2dbc PostgressqlConnection 类,如下所示:

this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

现在我们想“监听”我们的表并在对我们的表执行一些更新时得到通知。为此,我们使用 @PostContruct 注释设置了一个在依赖注入后执行的初始化方法

@PostConstruct
private void postConstruct() 
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();

请注意,我们会监听我们在 pg_notify 方法中放置的任何名称。我们还想设置一个方法来在 bean 即将被扔掉时关闭连接,如下所示:

@PreDestroy
private void preDestroy() 
    postgresqlConnection.close().subscribe();

现在我只需创建一个方法,该方法返回当前表中任何内容的 Flux,并将它与通知合并,正如我在通知以 json 形式出现之前所说的那样,所以我必须反序列化它,然后我决定使用 ObjectMapper。所以,它看起来像这样:

private Flux<YourClass> getUpdatedRows() 
    return postgresqlConnection.getNotifications().map(notification -> 
        try 
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
         catch (IOException e) 
            //handle exception
        
    );


public Flux<YourClass> getDocuments() 
    return documentRepository.findAll().share().concatWith(getUpdatedRows());

希望这会有所帮助。 干杯!

【讨论】:

以上是关于@Tailable(spring-data-reactive-mongodb) 等效于 spring-data-r2dbc的主要内容,如果未能解决你的问题,请参考以下文章

pymongo bugfix后记