Spring Data (数据)MongoDB
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Data (数据)MongoDB相关的知识,希望对你有一定的参考价值。
10.21. 更改流
从MongoDB 3.6开始,Change Streams允许应用程序获得有关更改的通知,而无需跟踪oplog。
更改流支持仅适用于副本集或分片集群。 |
更改流可以与命令式和反应式MongoDB Java驱动程序一起使用。强烈建议使用反应式变体,因为它的资源密集度较低。但是,如果您无法使用反应式 API,您仍然可以使用 Spring 生态系统中已经流行的消息传递概念来获取更改事件。
可以在集合和数据库级别同时监视,而数据库级别变体发布 数据库中所有集合的更改。订阅数据库更改流时,请确保使用 事件类型的合适类型,因为转换可能无法正确应用于不同的实体类型。 有疑问,请使用。Document
10.21.1. 使用 更改流MessageListener
使用同步驱动程序侦听更改流会创建一个长时间运行的阻止任务,需要将其委派给单独的组件。 在这种情况下,我们需要首先创建一个,这将是运行特定任务的主要入口点。 Spring Data MongoDB已经附带了一个默认的实现,该实现可以在其上运行,并且能够为论坛创建和运行实例。MessageListenerContainer
SubscriptionRequest
MongoTemplate
Task
ChangeStreamRequest
以下示例演示如何将更改流与实例一起使用:MessageListener
例 116.使用实例更改流MessageListener
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty());
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);
// ...
container.stop();
启动容器将初始化资源并启动已注册实例的实例。启动后添加的请求将立即运行。 |
定义收到 ais 时调用的侦听器。将转换为请求的域类型。用于接收未经转换的原始结果。 |
设置要收听的集合并提供其他选项。 |
注册请求。返回的可用于检查当前状态并取消它以释放资源。 |
一旦确定不再需要容器,请不要忘记停止它。这样做会停止容器内所有正在运行的实例。 |
处理时的错误将传递给 。如果未另行说明,则默认应用日志追加。 |
10.21.2. 反应性变更流
使用反应式 API 订阅更改流是处理流的更自然的方法。尽管如此,诸如此类的基本构建块保持不变。以下示例演示如何使用更改流发射器:ChangeStreamOptions
ChangeStreamEvent
例 117.更改流发射ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class)
.watchCollection("people")
.filter(where("age").gte(38))
.listen();
基础文档应转换为的事件目标类型。省略这一点以接收原始结果而无需转换。 |
使用聚合管道或仅使用查询来筛选事件。 |
Obtain a of change stream events. The is converted to the requested domain type from (2). |
10.21.3. Resuming Change Streams
Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume token or the last known server time (in UTC). Use to set the value accordingly.ChangeStreamOptions
The following example shows how to set the resume offset using server time:
Example 118. Resume a Change Stream
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1))
.listen();
You may obtain the server time of an through the method or use the exposed through . |
在某些情况下,在恢复更改流时,可能不是一个足够精确的度量。为此,请使用MongoDB原生的BsonTimestamp。 |
10.22. 时间序列
MongoDB 5.0引入了时间序列集合,这些集合经过优化,可以有效地存储一段时间内的文档,例如测量或事件。 在插入任何数据之前,需要这样创建这些集合。 可以通过运行命令、定义时序集合选项或从注释中提取选项来创建集合,如以下示例所示。createCollection
@TimeSeries
例 119.创建时序集合
通过MongoDB驱动程序创建时间序列
template.execute(db ->
com.mongodb.client.model.CreateCollectionOptions options = new CreateCollectionOptions();
options.timeSeriesOptions(new TimeSeriesOptions("timestamp"));
db.createCollection("weather", options);
return "OK";
);
创建时序集合CollectionOptions
template.createCollection("weather", CollectionOptions.timeSeries("timestamp"));
创建从批注派生的时间序列集合
@TimeSeries(collection="weather", timeField = "timestamp")
public class Measurement
String id;
Instant timestamp;
// ...
template.createCollection(Measurement.class);
上面的代码片段可以很容易地转移到提供相同方法的反应式 API。 请确保正确订阅返回的发布者。
10.23. 可观测性
Spring Data MongoDB目前拥有最新的代码来支持MongoDB应用程序中的可观测性。 然而,Spring Boot(尚未)接受这些更改。 在应用这些更改之前,如果您希望使用Spring Data MongoDB的可观测性风格,则必须执行以下步骤。
- 首先,您必须通过自定义您的类或您的配置类之一来选择加入Spring Data MongoDB的配置设置。
MongoClientSettings
@SpringBootApplication
例 120。注册 MongoDB 千分尺定制器设置
@Bean
MongoClientSettingsBuilderCustomizer mongoMetricsSynchronousContextProvider(ObservationRegistry registry)
return (clientSettingsBuilder) ->
clientSettingsBuilder.contextProvider(ContextProviderFactory.create(registry))
.addCommandListener(new MongoObservationCommandListener(registry));
;
- 您的项目必须包括弹簧启动执行器。
- 禁用 Spring Boot 自动配置的 MongoDB 命令侦听器,并通过将以下属性添加到您的
application.properties
例 121.要应用的自定义设置
# Disable Spring Boots autoconfigured tracing
management.metrics.mongo.command.enabled=false
# Enable it manually
management.tracing.enabled=true
请务必根据千分尺的参考文档添加配置正在使用的示踪剂所需的任何其他相关设置。
这应该可以!您现在正在使用Spring Data MongoDB对Spring Observability的API的使用。Observation
reference/observability.adoc 中未解析的指令 - include::../../../../target/_conventions.adoc[]
reference/observability.adoc 中未解析的指令 - include::../../../../target/_metrics.adoc[]
reference/observability.adoc 中未解析的指令 - include::../../../../target/_spans.adoc[]
另请参阅开放遥测语义约定以获取进一步参考。
11. MongoDB 会话
从3.6版本开始,MongoDB支持会话的概念。会话的使用启用了MongoDB的因果一致性模型,该模型保证以尊重其因果关系的顺序运行操作。这些被拆分为实例和实例。在本节中,当我们谈到会话时,我们指的是。ServerSession
ClientSession
ClientSession
客户端会话中的操作不与会话外部的操作隔离。 |
Bothand提供了网关方法,用于绑定a到操作.并使用实现MongoDB的集合和数据库接口的会话代理对象,因此您无需在每次调用时添加会话。这意味着潜在的调用 to 被委派给。MongoOperations
ReactiveMongoOperations
ClientSession
MongoCollection
MongoDatabase
MongoCollection#find()
MongoCollection#find(ClientSession)
诸如返回本机MongoDB Java驱动程序网关对象之类的方法,这些对象本身提供了专用方法。这些方法不是会话代理的。在直接与 aorand 交互时,您应该提供所需的位置,而不是通过其中一个回调。 |
11.1. 同步支持。ClientSession
以下示例显示了会话的使用情况:
例 122.withClientSession
MongoOperations
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions);
template.withSession(() -> session)
.execute(action ->
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class);
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth);
return azoth;
);
session.close()
从服务器获取新会话。 | |
像以前一样使用方法。自动应用。 | |
确保关闭。 | |
关闭会话。 |
在处理实例时,尤其是延迟加载的实例时,在加载所有数据之前不要关闭。否则,延迟提取将失败。 |
11.2. 反应式支持ClientSession
响应式对应项使用与命令式相同的构建块,如以下示例所示:
例 123.客户端会话ReactiveMongoOperations
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions);
template.withSession(session)
.execute(action ->
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo ->
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth);
);
, ClientSession::close)
.subscribe();
获取用于新会话检索的 a。 |
像以前一样使用方法。这是自动获得和应用的。 |
确保关闭。 |
在您订阅之前,什么都不会发生。有关详细信息,请参阅项目反应器参考指南。 |
通过使用提供实际会话的 a,您可以将会话获取推迟到实际订阅点。 尽管如此,您仍然需要在完成后关闭会话,以免使用陈旧的会话污染服务器。当您不再需要会话时,使用钩子到呼叫。 如果您希望对会话本身有更多的控制,则可以通过驱动程序获取并通过 a 提供它。Publisher
doFinally
execute
ClientSession#close()
ClientSession
Supplier
反应性使用仅限于模板 API 使用。目前没有与反应式存储库的会话集成。 |
12. 蒙戈数据库事务
从版本4开始,MongoDB支持事务。事务建立在会话之上,因此需要活动。ClientSession
除非您在应用程序上下文中指定 a,否则事务支持将被禁用。您可以使用参与正在进行的非本机MongoDB事务。 |
若要获得对事务的完全编程控制,可能需要启用会话回调。MongoOperations
以下示例显示 a 中的编程事务控制:SessionCallback
例 124.程序化交易
ClientSession session = client.startSession(options);
template.withSession(session)
.execute(action ->
session.startTransaction();
try
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction();
catch (RuntimeException e)
session.abortTransaction();
, ClientSession::close)
获取新的。 |
开始事务。 |
如果一切按预期进行,请提交更改。 |
有些东西坏了,所以回滚一切。 |
完成后不要忘记关闭会话。 |
前面的示例允许您在回调中使用会话作用域实例时完全控制事务行为,以确保会话传递到每个服务器调用。 为了避免此方法带来的一些开销,您可以使用 ato 消除手动事务流的一些噪音。MongoOperations
TransactionTemplate
12.1. 交易TransactionTemplate
Spring Data MongoDB 事务支持 a。以下示例演示如何创建和使用:TransactionTemplate
TransactionTemplate
例 125。交易TransactionTemplate
template.setSessionSynchronization(ALWAYS);
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);
txTemplate.execute(new TransactionCallbackWithoutResult()
@Override
protected void doInTransactionWithoutResult(TransactionStatus status)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
;
);
在模板 API 配置期间启用事务同步。 |
创建使用提供的。 |
在回调中,和事务已经注册。 |
更改运行时的状态(如前面清单的第 1 项中可能认为的那样)可能会导致线程和可见性问题。 |
12.2. 交易MongoTransactionManager
MongoTransactionManager
是通往众所周知的 Spring 事务支持的门户。它允许应用程序使用Spring的托管事务功能。 绑定到线程。检测会话并相应地对这些与事务关联的资源进行操作。还可以参与其他正在进行的事务。以下示例演示如何使用 创建和使用事务:MongoTransactionManager
ClientSession
MongoTemplate
MongoTemplate
MongoTransactionManager
例 126。交易MongoTransactionManager
@Configuration
static class Config extends AbstractMongoClientConfiguration
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory)
return new MongoTransactionManager(dbFactory);
// ...
@Component
public class StateService
@Transactional
void someBusinessFunction(Step step)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
;
);
在应用程序上下文中注册。 |
将方法标记为事务性。 |
|
12.3. 反应式交易
与反应性支持相同,提供专用的操作方法 在事务中,无需担心根据操作结果提交或停止操作。ClientSession
ReactiveMongoTemplate
在事务流中使用普通的MongoDB反应式驱动程序API可能如下所示。delete
例 127.本机驱动程序支持
Mono<DeleteResult> result = Mono
.from(client.startSession())
.flatMap(session ->
session.startTransaction();
return Mono.from(collection.deleteMany(session, ...))
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))
.doFinally(signal -> session.close());
);
首先,我们显然需要启动会话。 |
一旦我们有了手头,就开始交易。 |
通过传递给操作在事务中操作。 |
如果操作异常完成,我们需要停止事务并保留错误。 |
或者,当然,在成功的情况下提交更改。仍保留操作结果。 |
最后,我们需要确保关闭会话。 |
上述操作的罪魁祸首是保持主流而不是交易结果 通过 Bothor 发布,这会导致设置相当复杂。DeleteResult
commitTransaction()
abortTransaction()
12.4. 交易TransactionalOperator
Spring Data MongoDB 事务支持 a。以下示例演示如何创建和使用:TransactionalOperator
TransactionalOperator
例 128。交易TransactionalOperator
template.setSessionSynchronization(ALWAYS);
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition());
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional)
.then();
为事务参与启用事务同步。 |
创建使用提供的。 |
|
12.5. 交易ReactiveMongoTransactionManager
ReactiveMongoTransactionManager
是通往众所周知的 Spring 事务支持的门户。 它允许应用程序利用Spring的托管事务功能。 绑定到订阅者。检测会话并相应地对这些与事务关联的资源进行操作。还可以参与其他正在进行的事务。 以下示例演示如何使用 创建和使用事务:ReactiveMongoTransactionManager
ClientSession
Context
ReactiveMongoTemplate
ReactiveMongoTemplate
ReactiveMongoTransactionManager
例 129。交易ReactiveMongoTransactionManager
@Configuration
public class Config extends AbstractReactiveMongoConfiguration
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory)
return new ReactiveMongoTransactionManager(factory);
// ...
@Service
public class StateService
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
;
);
在应用程序上下文中注册。 |
将方法标记为事务性。 |
|
12.6. 交易内的特殊行为
在事务内部,MongoDB服务器的行为略有不同。
连接设置
MongoDB 驱动程序提供专用的副本集名称配置选项,将驱动程序纳入自动检测 模式。此选项有助于在事务期间识别主副本集节点和命令路由。
确保添加到MongoDB URI。有关更多详细信息,请参阅连接字符串选项。 |
收集操作
MongoDB不支持事务中的集合操作,例如集合创建。这也 影响首次使用时发生的动态集合创建。因此,请确保具备所有必需的内容 结构到位。
暂时性错误
MongoDB可以为事务操作期间引发的错误添加特殊标签。这些可能表示暂时性故障 仅重试操作即可消失。 我们强烈建议出于这些目的进行Spring 重试。不过 可以覆盖以实现 MongoDB 参考手册中概述的重试提交操作行为。如何在 Spring-data 中更改/定义 Mongodb 的默认数据库?
spring-data-mongodb 在一个 Mongo 实例中连接多个数据库
Spring Boot 反应式和 mongodb '命令插入需要身份验证'