Dapr-状态管理
Posted One To One
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dapr-状态管理相关的知识,希望对你有一定的参考价值。
前言:
前一篇对Dapr的服务调用方式进行了解,本篇继续对状态管理进行了解。
一、状态管理-解决的问题
在分布式应用程序中跟踪状态存在一下问题:
- 应用程序可能需要不同类型的数据存储。
- 访问和更新数据时可能需要不同的一致性级别。
- 多个用户可以同时更新数据,需要冲突解决。
- 在与数据存储交互时,服务必须重试发生的任何短暂的暂时性错误 。
Dapr 状态管理构建块解决了这些难题。 它简化了跟踪状态,而无需依赖关系或第三方存储 Sdk 上的学习曲线。
Dapr 状态管理提供 密钥/值 API。 此功能不支持关系数据存储或图形数据存储。
二、状态管理-工作原理
应用程序可以使用Dapr的状态管理API,使用状态存储组件保存和读取键/值对。通过 HTTP 或 gRPC 调用 API。
例如,通过使用HTTP POST可以保存键/值对,通过使用HTTP GET可以读取一个键并返回它的值。
三、状态管理-特点
-
可插拔状态存储
Dapr数据存储被建模为组件,可以在不修改你的服务代码的情况下进行替换。
-
一致性
CAP 定理是一组适用于存储状态的分布式系统的原则。下图展示了CAP定理的三个属性。
定理指出,分布式数据系统提供一致性、可用性和分区容差之间的权衡。 而且,任何数据存储只能 保证三个属性中的两个:
-
-
一致性 (C) 。 群集中的每个节点都将使用最新的数据做出响应,即使在所有副本更新之前,系统都必须阻止请求。 如果查询当前正在更新的项的 "一致性系统",则在所有副本都成功更新之前,将不会收到响应。 但是,您将始终接收最新的数据。
-
可用性 () 。 即使该响应不是最新的数据,每个节点都将返回立即响应。 如果您在 "可用系统" 中查询正在更新的项,则您将获得该服务在此时可以提供的最佳可能的答案。
-
分区容差 (P) 。 即使复制的数据节点发生故障或失去与其他复制的数据节点的连接,保证系统仍可继续运行。
-
分布式应用程序必须处理 P 属性。 随着服务彼此间的网络调用通信,会发生网络中断 (P) 。 考虑到这一点,分布式应用程序必须是 AP 或 CP。
AP 应用程序选择 "可用性一致性"。 Dapr 通过其 最终一致性 策略支持此选择。 请考虑使用基础数据存储(例如 Azure CosmosDB)将冗余数据存储在多个副本上。 对于最终一致性,状态存储会将更新写入一个副本并完成与客户端的写入请求。 此时间过后,存储将以异步方式更新其副本。 读取请求可以返回任何副本的数据,包括尚未收到最新更新的副本。
CP 应用程序选择一致性和可用性。 Dapr 通过其 强一致性 策略支持此选择。 在此方案中,状态存储将同步更新 所有 (或在某些情况下,在完成写入请求 之前) 必需副本的 仲裁。 读取操作将跨副本持续返回最新数据。
-
并发
在多用户应用程序中,有可能多个用户同时更新同一时间) (相同的数据。 Dapr 支持乐观并发控制 (OCC) 来管理冲突。 OCC 基于一个假设,因为用户处理数据的不同部分,所以更新冲突很少见。 更有效的方法是将更新成功,如果不成功,则重试。 实现悲观锁定的替代方法可能会影响长时间运行的锁定,导致数据争用。
Dapr 支持使用 Etag) (OCC 的乐观并发控制。 ETag 是与存储的键/值对的特定版本相关联的值。 键/值对的每次更新时,ETag 值也会更新。 当客户端检索键/值对时,响应包括当前 ETag 值。 当客户端更新或删除键/值对时,它必须在请求正文中发送回该 ETag 值。 如果其他客户端同时更新了数据,则 Etag 不会匹配,请求将失败。 此时,客户端必须检索更新的数据,重新进行更改,然后重新提交更新。 此策略称为 第一次写入-wins。
Dapr 还支持 最后写入 wins 策略。 使用此方法时,客户端不会将 ETag 附加到写入请求。 状态存储组件将始终允许更新,即使基础值在会话期间已更改也是如此。 最后写入-wins 对于数据争用较少的高吞吐量写入方案非常有用。 同样,可以容忍偶尔的用户更新。
-
事务
Dapr 可以将 多项更改 作为一个作为事务实现的操作写入数据存储区。 此功能仅适用于支持 ACID 事务的数据存储。 在撰写本文时,这些存储包括 Redis、MongoDB、PostgreSQL、SQL Server和 Azure CosmosDB。
在下面的示例中,多项操作将发送到单个事务中的状态存储。 所有操作都必须成功,事务才能提交。 如果一个或多个操作失败,则回滚整个事务。
四、.Net Core中应用
1、在项目【DaprFrontEnd】中添加控制器-DaprStateController 用于展示状态的各种操作
[Route("[controller]")] [ApiController] public class StateController : ControllerBase { private readonly ILogger<DaprStateController> _logger; private readonly DaprClient _daprClient; public StateController(ILogger<StateController> logger, DaprClient daprClient) { _logger = logger; _daprClient = daprClient; } /// <summary> /// 获取值 /// </summary> /// <returns></returns> [HttpGet] public async Task<ActionResult> GetAsync() { var result = await _daprClient.GetStateAsync<string>("statestore", "guid"); return Ok(result); } /// <summary> /// 保存值 /// </summary> /// <returns></returns> [HttpPost] public async Task<ActionResult> PostAsync() { await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong }); return Ok("done"); } /// <summary> /// 删除值 /// </summary> /// <returns></returns> [HttpDelete] public async Task<ActionResult> DeleteAsync() { await _daprClient.DeleteStateAsync("statestore", "guid"); return Ok("done"); } /// <summary> /// 通过tag防止并发冲突,保存值 /// </summary> /// <returns></returns> [HttpPost("withtag")] public async Task<ActionResult> PostWithTagAsync() { var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid"); await _daprClient.TrySaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), etag); return Ok("done"); } /// <summary> /// 通过tag防止并发冲突,删除值 /// </summary> /// <returns></returns> [HttpDelete("withtag")] public async Task<ActionResult> DeleteWithTagAsync() { var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid"); return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag)); } /// <summary> /// 从绑定获取值,健值name从路由模板获取 /// </summary> /// <param name="state"></param> /// <returns></returns> [HttpGet("frombinding/{name}")] public async Task<ActionResult> GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state) { return await Task.FromResult<ActionResult>(Ok(state.Value)); } /// <summary> /// 根据绑定获取并修改值,健值name从路由模板获取 /// </summary> /// <param name="state"></param> /// <returns></returns> [HttpPost("withbinding/{name}")] public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state) { state.Value = Guid.NewGuid().ToString(); return Ok(await state.TrySaveAsync()); } /// <summary> /// 获取多个值 /// </summary> /// <returns></returns> [HttpGet("list")] public async Task<ActionResult> GetListAsync() { var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10); return Ok(result); } /// <summary> /// 删除多个值 /// </summary> /// <returns></returns> [HttpDelete("list")] public async Task<ActionResult> DeleteListAsync() { var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10); var removeList = new List<BulkDeleteStateItem>(); foreach (var item in data) { removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag)); } await _daprClient.DeleteBulkStateAsync("statestore", removeList); return Ok("done"); } }
2、启动程序
dapr run --dapr-http-port 3501 --app-port 8230 --app-id frontend dotnet .\\DaprFrontEnd.dll
3、调用过程:
五、总结:
Dapr 状态管理构建块提供了一个 API,用于在各种数据存储区中存储键/值数据。 API 为以下内容提供支持:
- 批量操作
- 强一致性和最终一致性
- 乐观并发控制
- 多项事务
Dapr在Java中的实践 之 状态管理
文章目录
状态管理
状态管理(State Management)使用键值对作为存储机制,可以轻松的使长时运行、高可用的有状态服务和无状态服务共同运行在我们的服务中。
我们的服务可以利用Dapr的状态管理API在状态存储组件中保存、读取和查询键值对。
状态存储组件是可插拔的,目前支持使用Azure CosmosDB、 Azure SQL Server、 PostgreSQL,、AWS DynamoDB、Redis 作为状态存储介质。
编写示例代码
创建一个SpringBoot项目,命名为:state-management
,该项目的状态管理调用过程如下图:
在state-management
该项目的pom.xml
文件中添加如下依赖:
<dependency>
<groupId>io.dapr</groupId>
<artifactId>dapr-sdk-springboot</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
</dependency>
注入一个DaprClient
的bean:
@Configuration
public class DaprConfig
private static final DaprClientBuilder BUILDER = new DaprClientBuilder();
@Bean
public DaprClient buildDaprClient()
return BUILDER.build();
state-management
项目中一共有3个接口:
save
:保存状态get
:读取状态delete
:删除状态
具体源码如下:
package one.more.society.state.management;
import io.dapr.client.DaprClient;
import io.dapr.client.domain.State;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class StateManagementController
@Autowired
private DaprClient client;
private static final String STATE_STORE_NAME = "statestore";
private static final String STATE_STORE_KEY = "one.more.society";
/**
* 保存状态
*
* @param value value
* @return
*/
@RequestMapping(value = "/save", method = RequestMethod.GET)
public StateResponse save(String value)
log.info("save - value:", value);
client.saveState(STATE_STORE_NAME, STATE_STORE_KEY, value).block();
StateResponse response = new StateResponse();
response.setCode(1);
response.setStatus("save");
response.setValue(value);
return response;
/**
* 读取状态
*
* @return StateResponse
*/
@RequestMapping(value = "/get", method = RequestMethod.GET)
public StateResponse get()
log.info("get");
State<String> value = client.getState(STATE_STORE_NAME, STATE_STORE_KEY, String.class).block();
log.info("value: ", value.getValue());
StateResponse response = new StateResponse();
response.setCode(1);
response.setStatus("get");
response.setValue(value.getValue());
return response;
/**
* 删除状态
*
* @return
*/
@RequestMapping(value = "/delete", method = RequestMethod.GET)
public StateResponse delete()
log.info("delete");
client.deleteState(STATE_STORE_NAME, STATE_STORE_KEY).block();
StateResponse response = new StateResponse();
response.setCode(1);
response.setStatus("delete");
return response;
另外,在application.properties
中配置:
server.port=30003
启动服务
在启动之前先用mvn
命令打包:
mvn clean package
在state-management
项目的目录中执行以下命令,启动state-management
服务:
dapr run --app-id state-management --app-port 30003 --dapr-http-port 31003 -- java -jar target/state-management-0.0.1-SNAPSHOT.jar
在Dapr Dashboard中看到:
服务都已经启动成功。
先访问http://localhost:30003/get,可以看到:
读取状态返回为null,接下来访问http://localhost:30003/save?value=万猫学社,可以看到:
状态已经保存了,再访问http://localhost:30003/get验证一下:
状态被正确读取,再访问http://localhost:30003/delete,可以看到:
状态已经被删除了,再访问http://localhost:30003/get验证一下:
读取状态返回为null。
状态储存组件
初始化Dapr后,默认为我们指定的状态储存组件是Redis,在用户目录下的.dapr
文件夹中的components
文件夹中,可以找到statestore.yaml
文件:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
下面让我们来尝试一下,使用MySQL作为状态储存组件,把statestore.yaml
文件修改为:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "root:one.more.society@tcp(127.0.0.1:3306)/?allowNativePasswords=true"
重新启动服务,可以看到在日志中看到使用MySQL作为状态储存组件:
time="09:57:35.5632633+08:00" level=info msg="Creating MySql schema 'dapr_state_store'" app_id=state-management instance=JT-243137 scope=dapr.contrib type=log ver=1.7.3
time="09:57:35.5862126+08:00" level=info msg="Creating MySql state table 'state'" app_id=state-management instance=JT-243137 scope=dapr.contrib type=log ver=1.7.3
time="09:57:35.6563599+08:00" level=info msg="component loaded. name: statestore, type: state.mysql/v1" app_id=state-management instance=JT-243137 scope=dapr.runtime type=log ver=1.7.3
如果在MySQL中没有对应的库和表,Dapr默认为我们自动创建一个名为dapr_state_store
的库,还有一个名为state
的表,如下图:
其中,state
的表结构为:
CREATE TABLE `state` (
`id` varchar(255) NOT NULL,
`value` json NOT NULL,
`isbinary` tinyint(1) NOT NULL,
`insertDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`eTag` varchar(36) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
再访问一下http://localhost:30003/save?value=万猫学社,就可以在数据库中看到对应的数据:
值得注意的是:MySQL状态储存组件目前还处于Alpha状态,最好不要在生产环境使用。
更详细的配置说明见下表:
配置项 | 是否必填 | 说明 | 示例 |
---|---|---|---|
connectionString | Y | 用于连接到 MySQL 的连接字符串。 请不要将schema添加到连接字符串中。 | 非SSL连接: "<user>:<password>@tcp(<server>:3306)/?allowNativePasswords=true" Enforced SSL 连接: "<user>:<password>@tcp(<server>:3306)/?allowNativePasswords=true&tls=custom" |
schemaName | N | 要使用的schema名称。 如果指定的schema不存在,将会自动创建。默认值为"dapr_state_store" | "one_more_state_store" |
tableName | N | 要使用的表名。如果对应的表不存在,将被自动创建。默认值为 “state” | "one_more_state" |
pemPath | N | 使用 Enforced SSL 连接 时,指定要使用的 PEM 文件完整路径。 | “/one/more/society/file.pem” |
pemContents | N | 如果没有提供pemPath,用于Enforced SSL连接的PEM文件的内容。可以在K8s环境下使用。 | “pem value” |
配置示例:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "root:one.more.society@tcp(127.0.0.1:3306)/?allowNativePasswords=true&tls=custom"
- name: schemaName
value: "one_more_state_store"
- name: tableName
value: "one_more_state"
- name: pemPath
value: "/one/more/society/file.pem"
CSDN 社区图书馆,开张营业! 深读计划,写书评领图书福利~最后,感谢你这么帅,还给我点赞。
以上是关于Dapr-状态管理的主要内容,如果未能解决你的问题,请参考以下文章