如何使用api rest通过flink流作为参数并返回转换后的流

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用api rest通过flink流作为参数并返回转换后的流相关的知识,希望对你有一定的参考价值。

我是Apache Flink的新手。我有一个flink scala项目,它消耗来自kafka集群的数据,我需要将流结果作为参数传递,以消耗返回此已转换流的api。这是我的代码

class Testing 
  def main(args: Array[String]): Unit = 
  def streamTest(): Unit = 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test).setParallelism(5)
    val api_test = "http://api-test.server.local/test/?msg=%s"
    // Here I need pass stream as parameter to api and return transformed stream
    env.execute()
     

是否有帮助?

答案

您应该使用您熟悉的任何http / rest库,然后使用asyncIO

以上是关于如何使用api rest通过flink流作为参数并返回转换后的流的主要内容,如果未能解决你的问题,请参考以下文章

如何将 SPARK/Flink 流数据处理创建为微服务(REST API)

如何使用 Swift URLSession 通过 REST api 将图像和其他参数作为表单数据发布?

第三章 flink流处理API - map和flatmap

Flink 监控指南 被动拉取 Rest API

如何通过 REST API 微服务(使用 Express 构建)将 MongoDB 更改流与节点 js 一起使用

Rest api如何获取参数?