如何使用 api rest 传递 flink 流作为参数并返回转换后的流

Posted

技术标签:

【中文标题】如何使用 api rest 传递 flink 流作为参数并返回转换后的流【英文标题】:How to consume api rest passing flink stream as parameter and return this stream transformed 【发布时间】:2020-02-08 17:02:33 【问题描述】:

我是 apache flink 的新手。我有一个使用来自 kafka 集群的数据的 flink scala 项目,我需要将流结果作为参数传递以使用返回转换后的流的 api。这是我的代码

class Testing 
  def main(args: Array[String]): Unit = 
  def streamTest(): Unit = 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test).setParallelism(5)
    val api_test = "http://api-test.server.local/test/?msg=%s"
    // Here I need pass stream as parameter to api and return transformed stream
    env.execute()
     

有什么帮助吗?

【问题讨论】:

将流结果作为参数传递是什么意思?您是否只想使用来自流的参数执行对 API 的请求?也许你可以提供一个例子?? 我认为 OP 想做一张地图。 stream.map(x -> call_api(x)) 我使用 stream.flatMap 调用函数来连接 api rest 与 out: Collector[String] 变量。谢谢 【参考方案1】:

您应该使用任何您熟悉的 http/rest 库,然后使用asyncIO

【讨论】:

【参考方案2】:

这是我的最终代码。希望对你有帮助

class Testing extends Serializable
  def main(args: Array[String]): Unit = 
  def streamTest(): Unit = 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test)
    // Here I need pass stream as parameter to api and return transformed stream
    val result = stream.flatMap
      (str, out: Collector[String]) =>
        val api_test = "http://api-test.server.local/test/?msg=%s"
        out.collect 
          getUrl(api_test.format(URLEncoder.encode(str, "UTF-8")))
                
        
    env.execute()
  

  def getUrl(url: String): String = 
    val timeout = 5
    val config = RequestConfig.custom.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build
    val client: CloseableHttpClient = HttpClientBuilder.create.setDefaultRequestConfig(config).build
    val request = new HttpGet(url)
    val response = client.execute(request)
    val entity = response.getEntity
    val get_result = EntityUtils.toString(entity)
    get_result
       

【讨论】:

以上是关于如何使用 api rest 传递 flink 流作为参数并返回转换后的流的主要内容,如果未能解决你的问题,请参考以下文章

如何将 SPARK/Flink 流数据处理创建为微服务(REST API)

Flink 监控指南 被动拉取 Rest API

如何在 REST API 中传递多个参数

如何将 JSON 传递给 REST API (IBKR)

如何通过restful api传递部分对象(json)

如何将 Azure 通知 REST API 与 Google 云消息传递一起使用