具有响应式Cassandra 的Spring Data

Posted SpringForAll社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了具有响应式Cassandra 的Spring Data相关的知识,希望对你有一定的参考价值。

原文链接:https://www.baeldung.com/spring-data-cassandra-reactive

译者:xuli

1.简介

在本教程中,我们将学习如何使用Spring Data Cassandra的响应式数据访问功能。

特别地,这是Spring Data Cassandra文章系列的第三篇文章。在本文中,我们将使用REST API暴露Cassandra数据库。

我们可以在本系列的第一篇和第二篇文章中阅读有关Spring Data Cassandra的更多信息 。

2.Maven依赖

事实上,Spring Data Cassandra支持Project Reactor和RxJava反应类型。为了演示,我们将在本教程中使用Project reactor的反应类型Flux和Mono 。

首先,让我们添加教程所需的依赖项:

 
   
   
 
  1. <dependency>

  2.    <groupId>org.springframework.data</groupId>

  3.    <artifactId>spring-data-cassandra</artifactId>

  4.    <version>2.1.2.RELEASE</version>

  5. </dependency>

  6. <dependency>

  7.    <groupId>io.projectreactor</groupId>

  8.    <artifactId>reactor-core</artifactId>

  9. </dependency>

最新版本的 spring-data-cassandra 可以在这里找到。

现在,我们将通过REST API从数据库中暴露SELECT操作。所以,我们也为RestController添加依赖项:

 
   
   
 
  1. <dependency>

  2.    <groupId>org.springframework.boot</groupId>

  3.    <artifactId>spring-boot-starter-web</artifactId>

  4. </dependency>

3.实施我们的应用程序

由于我们将持久化数据,让我们首先定义我们的实体对象:

 
   
   
 
  1. @Table

  2. public class Employee {

  3.    @PrimaryKey

  4.    private int id;

  5.    private String name;

  6.    private String address;

  7.    private String email;

  8.    private int age;

  9. }

接下来,是时候创建一个从ReactiveCassandraRepository扩展的EmployeeRepository 。重要的是要注意此接口支持响应类型:

 
   
   
 
  1. public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {

  2.    @AllowFiltering

  3.    Flux<Employee> findByAgeGreaterThan(int age);

  4. }

3.1.用于CRUD操作的Rest控制器

为了便于说明,我们将使用一个简单的Rest Controller 公开一些基本的 SELECT操作:

 
   
   
 
  1. @RestController

  2. @RequestMapping("employee")

  3. public class EmployeeController {


  4.    @Autowired

  5.    EmployeeService employeeService;


  6.    @PostConstruct

  7.    public void saveEmployees() {

  8.        List<Employee> employees = new ArrayList<>();

  9.        employees.add(new Employee(123, "John Doe", "Delaware", "jdoe@xyz.com", 31));

  10.        employees.add(new Employee(324, "Adam Smith", "North Carolina", "asmith@xyz.com", 43));

  11.        employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24));

  12.        employees.add(new Employee(643, "Mike Lauren", "New York", "mlauren@xyz.com", 41));

  13.        employeeService.initializeEmployees(employees);

  14.    }


  15.    @GetMapping("/list")

  16.    public Flux<Employee> getAllEmployees() {

  17.        Flux<Employee> employees = employeeService.getAllEmployees();

  18.        return employees;

  19.    }


  20.    @GetMapping("/{id}")

  21.    public Mono<Employee> getEmployeeById(@PathVariable int id) {

  22.        return employeeService.getEmployeeById(id);

  23.    }


  24.    @GetMapping("/filterByAge/{age}")

  25.    public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {

  26.        return employeeService.getEmployeesFilterByAge(age);

  27.    }

  28. }

最后,让我们添加一个简单的EmployeeService:

 
   
   
 
  1. @Service

  2. public class EmployeeService {


  3.    @Autowired

  4.    EmployeeRepository employeeRepository;


  5.    public void initializeEmployees(List<Employee> employees) {

  6.        Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);

  7.        savedEmployees.subscribe();

  8.    }


  9.    public Flux<Employee> getAllEmployees() {

  10.        Flux<Employee> employees =  employeeRepository.findAll();

  11.        return employees;

  12.    }


  13.    public Flux<Employee> getEmployeesFilterByAge(int age) {

  14.        return employeeRepository.findByAgeGreaterThan(age);

  15.    }


  16.    public Mono<Employee> getEmployeeById(int id) {

  17.        return employeeRepository.findById(id);

  18.    }

  19. }

3.2.数据库配置

然后,让我们在application.properties中指定用于连接Cassandra的密钥空间和端口:

 
   
   
 
  1. spring.data.cassandra.keyspace-name=practice

  2. spring.data.cassandra.port=9042

4.测试端点

最后,是时候测试我们的API端点了。

4.1.手动测试

首先,让我们从数据库中获取员工记录:

 
   
   
 
  1. curl localhost:8080/employee/list

结果,我们得到了所有员工:

 
   
   
 
  1. [

  2.    {

  3.        "id": 324,

  4.        "name": "Adam Smith",

  5.        "address": "North Carolina",

  6.        "email": "asmith@xyz.com",

  7.        "age": 43

  8.    },

  9.    {

  10.        "id": 123,

  11.        "name": "John Doe",

  12.        "address": "Delaware",

  13.        "email": "jdoe@xyz.com",

  14.        "age": 31

  15.    },

  16.    {

  17.        "id": 355,

  18.        "name": "Kevin Dunner",

  19.        "address": "Virginia",

  20.        "email": "kdunner@xyz.com",

  21.        "age": 24

  22.    },

  23.    {

  24.        "id": 643,

  25.        "name": "Mike Lauren",

  26.        "address": "New York",

  27.        "email": "mlauren@xyz.com",

  28.       "age": 41

  29.    }

  30. ]

继续,让我们尝试通过他的id找到一个特定的员工:

 
   
   
 
  1. curl localhost:8080/employee/643

结果,我们让Mike Lauren先生回来了:

 
   
   
 
  1. {

  2.    "id": 643,

  3.    "name": "Mike Lauren",

  4.    "address": "New York",

  5.    "email": "mlauren@xyz.com",

  6.    "age": 41

  7. }

最后,让我们看看我们的年龄过滤器是否有效:

 
   
   
 
  1. curl localhost:8080/employee/filterByAge/35

正如预期的那样,我们得到所有年龄超过35岁的员工:

 
   
   
 
  1. [

  2.    {

  3.        "id": 324,

  4.        "name": "Adam Smith",

  5.        "address": "North Carolina",

  6.        "email": "asmith@xyz.com",

  7.        "age": 43

  8.    },

  9.    {

  10.        "id": 643,

  11.        "name": "Mike Lauren",

  12.        "address": "New York",

  13.        "email": "mlauren@xyz.com",

  14.        "age": 41

  15.    }

  16. ]

4.2.集成测试

另外,让我们通过编写测试用例来测试相同的功能:

 
   
   
 
  1. @RunWith(SpringRunner.class)

  2. @SpringBootTest

  3. public class ReactiveEmployeeRepositoryIntegrationTest {


  4.    @Autowired

  5.    EmployeeRepository repository;


  6.    @Before

  7.    public void setUp() {

  8.        Flux<Employee> deleteAndInsert = repository.deleteAll()

  9.          .thenMany(repository.saveAll(Flux.just(

  10.            new Employee(111, "John Doe", "Delaware", "jdoe@xyz.com", 31),

  11.            new Employee(222, "Adam Smith", "North Carolina", "asmith@xyz.com", 43),

  12.            new Employee(333, "Kevin Dunner", "Virginia", "kdunner@xyz.com", 24),

  13.            new Employee(444, "Mike Lauren", "New York", "mlauren@xyz.com", 41))));


  14.        StepVerifier

  15.          .create(deleteAndInsert)

  16.          .expectNextCount(4)

  17.          .verifyComplete();

  18.    }


  19.    @Test

  20.    public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {

  21.        Mono<Long> saveAndCount = repository.count()

  22.          .doOnNext(System.out::println)

  23.          .thenMany(repository

  24.            .saveAll(Flux.just(

  25.            new Employee(325, "Kim Jones", "Florida", "kjones@xyz.com", 42),

  26.            new Employee(654, "Tom Moody", "New Hampshire", "tmoody@xyz.com", 44))))

  27.          .last()

  28.          .flatMap(v -> repository.count())

  29.          .doOnNext(System.out::println);


  30.        StepVerifier

  31.          .create(saveAndCount)

  32.          .expectNext(6L)

  33.          .verifyComplete();

  34.    }


  35.    @Test

  36.    public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {

  37.        StepVerifier

  38.          .create(repository.findByAgeGreaterThan(35))

  39.          .expectNextCount(2)

  40.          .verifyComplete();

  41.    }

  42. }

5.结论

概括一下,我们学习了如何使用Spring Data Cassandra来构建非阻塞应用程序。与往常一样,可在GitHub上查看本教程的源代码。


推荐: 

上一篇:

关注公众号

点击原文阅读更多



以上是关于具有响应式Cassandra 的Spring Data的主要内容,如果未能解决你的问题,请参考以下文章

响应式流——响应式Spring的道法术器

使用 Spring-data-cassandra 查询具有复合主键的表

使用Spring-data-cassandra查询具有复合主键的表

@PrimaryKeyColumn 注释必须具有 PARTITIONED 类型,用于 scala Cassandra Spring Data 应用程序

如何为 cassandra 设置读取请求超时

Spring Boot:ReactiveCrudRepository 没有被任何 bean 实现