反应式编程:Spring WebFlux:如何构建微服务调用链?
Posted
技术标签:
【中文标题】反应式编程:Spring WebFlux:如何构建微服务调用链?【英文标题】:Reactive Programming: Spring WebFlux: How to build a chain of micro-service calls? 【发布时间】:2020-07-06 13:57:45 【问题描述】:Spring Boot
申请:
@RestController
接收以下有效负载:
"cartoon": "The Little Mermaid",
"characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
我需要按如下方式处理:
-
获取每个角色名称的唯一 ID:对“cartoon-characters”微服务进行 HTTP 调用,该微服务按名称返回 ID
控制器接收到的转换数据:
将角色名称替换为上一步从“卡通角色”微服务接收到的适当 id。
"cartoon": "The Little Mermaid",
"characterIds": [1, 2, 3, 4]
使用转换后的数据向“cartoon-db”微服务发送 HTTP POST 请求。
将来自“cartoon-db”的响应映射到作为控制器返回值的内部表示。我遇到的问题:
我需要使用Spring WebFlux
(Mono
|Flux
) 和Spring Reactive WebClient
的范式Reactive Programming
(非阻塞\异步处理)来实现所有这些步骤 - 但我对此的经验为零堆栈,尽可能多地阅读它,加上谷歌搜索很多,但仍然有一堆未回答的问题,例如:
第一季度。我已经配置了响应式 webClient,它向“cartoon-characters”微服务发送请求:
public Mono<Integer> getCartoonCharacterIdbyName(String characterName)
return WebClient.builder().baseUrl("http://cartoon-characters").build()
.get()
.uri("/character/characterName", characterName)
.retrieve()
.bodyToMono(Integer.class);
如您所见,我有一个卡通人物名称列表,我需要为每个人调用 getCartoonCharacterIdbyName(String name)
方法,我不确定串联调用它的正确选项,相信正确的选项:并行执行。
写了如下方法:
public List<Integer> getCartoonCharacterIds(List<String> names)
Flux<Integer> flux = Flux.fromStream(names.stream())
.flatMap(this::getCartoonCharacterIdbyName);
return StreamSupport.stream(flux.toIterable().spliterator(), false)
.collect(Collectors.toList());
但我怀疑此代码是否并行执行 WebClient
,并且代码调用 flux.toIterable()
会阻塞线程,因此使用此实现我失去了非阻塞机制。
我的假设正确吗?
如何将其重写为具有并行性和非阻塞性?
第二季度。
技术上是否可以将控制器接收到的输入数据(我的意思是用 id 替换名称)转换为反应式:当我们使用 Flux<Integer>
characterIds 操作时,而不是使用 characterIds 的 List<Integer>
操作时?
Q3. 是否有可能在 步骤 2 之后不仅获得转换后的 Data 对象,而且获得 Mono 可以由 步骤中的另一个 WebClient 使用3?
【问题讨论】:
你应该使用非阻塞的collectList操作符而不是toIterable,它给你一个Mono。 【参考方案1】:实际上,这是一个很好的问题,因为了解 WebFlux 或项目反应器框架,在链接微服务时需要几个步骤。
首先要意识到WebClient
应该接受发布者并返回发布者。将此推断为 4 种不同的方法签名以帮助思考。
当然,在所有情况下,它都只是 Publisher->Publisher,但在您更好地理解事情之前不要这样做。前两个很明显,你只是使用.map(...)
来处理流中的对象,但是你需要学习如何处理后两个。如上所述,从 Flux->Mono 可以使用.collectList()
或.reduce(...)
完成。从 Mono->Flux 开始似乎通常使用 .flatMapMany
或 .flatMapIterable
或它的一些变体来完成。可能还有其他技术。你不应该在任何 WebFlux 代码中使用.block()
,如果你尝试这样做,通常你会得到一个运行时错误。
在你的例子中你想去
(Mono->Flux)->(Flux->Flux)->(Flux->Flux)正如你所说,你想要
单声道->通量->通量第二部分是了解链接流。你可以这样做
p3(p2(p1(object)));这会链接 p1->p2->p3,但我总是发现改为创建“服务层”更容易理解。
o2 = p1(对象); o3 = p2(o2); 结果 = p3(o3);这段代码更容易阅读和维护,而且随着时间的推移,你会理解这句话的价值。
我对您的示例的唯一问题是使用WebClient
作为@RequestBody
进行Flux<String>
。不工作。见WebClient bodyToFlux(String.class) for string list doesn't separate individual values。除此之外,它是一个非常简单的应用程序。当您调试它时,您会发现它在到达Flux<Integer> ids = mapNamesToIds(fn)
行之前到达.subscribe(System.out::println)
行。这是因为 Flow 是在执行之前设置的。理解这一点需要一些时间,但这是项目反应器框架的重点。
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner
public static void main(String[] args)
SpringApplication.run(DemoApplication.class, args);
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception
String[] names = new String[] "Ariel", "Prince Eric", "Sebastian", "Flounder";
characters = Arrays.asList( new CartoonCharacter[]
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"),
new CartoonCharacter(names[1].hashCode(), names[1], "Human"),
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"),
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
@Bean
WebClient localClient()
return WebClient.create("http://localhost:8080/demo/");
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest)
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
// Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names)
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids)
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names)
return names.map(name->name.getString().hashCode());
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids)
return ids.map(characters::get);
还有:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper
private String string;
@Data
@Builder
public class CartoonRequest
private String cartoon;
private List<String> characterNames;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter
Integer id;
String name;
String species;
【讨论】:
以上是关于反应式编程:Spring WebFlux:如何构建微服务调用链?的主要内容,如果未能解决你的问题,请参考以下文章
Spring 5 之 WebFlux 开发反应式 Web 应用