Alpakka UDP:如何通过已绑定的套接字响应收到的数据报?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Alpakka UDP:如何通过已绑定的套接字响应收到的数据报?相关的知识,希望对你有一定的参考价值。

我正在使用Alpakkas UDP.bindFlow将传入的UDP数据报转发给Kafka代理。发送这些数据报的旧应用程序需要来自发送消息的同一端口的UDP响应。我正在努力模拟这种行为,因为它需要我将流的输出连接到它的输入。

我尝试了这个解决方案,但它不起作用,因为响应数据报是从不同的源端口发送的:

import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.alpakka.udp.Datagram
import akka.stream.alpakka.udp.scaladsl.Udp
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

object UdpInput extends App {

  implicit val system: ActorSystem = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  val socket = new InetSocketAddress("0.0.0.0", 40000)
  val udpBindFlow = Udp.bindFlow(socket)
  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  val kafkaSink = Flow[Datagram].map(toProducerRecord).to(Producer.plainSink(producerSettings))

  def toProducerRecord(datagram: Datagram) = new ProducerRecord[String, String]("udp", datagram.data.utf8String)
  def toResponseDatagram(datagram: Datagram) = Datagram(ByteString("OK"), datagram.remote)

  // Does not model the behaviour I'm looking for because
  // the response datagram is sent from a different source port
  Source.asSubscriber
    .via(udpBindFlow)
    .alsoTo(kafkaSink)
    .map(toResponseDatagram)
    .to(Udp.sendSink)
    .run
}
答案

我最终使用GraphDSL来实现循环图。感谢dvim指出我正确的方向!

import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.alpakka.udp.Datagram
import akka.stream.alpakka.udp.scaladsl.Udp
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, MergePreferred, RunnableGraph, Source}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

object UdpInput extends App {

  implicit val system: ActorSystem = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
  val socket = new InetSocketAddress("0.0.0.0", 40000)
  val udpBindFlow = Udp.bindFlow(socket)
  val udpResponseFlow = Flow[Datagram].map(toResponseDatagram)
  val kafkaSink = Flow[Datagram].map(toProducerRecord).to(Producer.plainSink(producerSettings))

  def toProducerRecord(datagram: Datagram) = new ProducerRecord[String, String]("udp", datagram.data.utf8String)
  def toResponseDatagram(datagram: Datagram) = Datagram(ByteString("OK"), datagram.remote)

  RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    val merge = b.add(MergePreferred[Datagram](1))
    val bcast = b.add(Broadcast[Datagram](2))

    Source.asSubscriber ~> merge           ~>   udpBindFlow   ~> bcast ~> kafkaSink
                           merge.preferred <~ udpResponseFlow <~ bcast
    ClosedShape
  }).run
}

以上是关于Alpakka UDP:如何通过已绑定的套接字响应收到的数据报?的主要内容,如果未能解决你的问题,请参考以下文章

在 node.js 中,如何创建一个自动绑定到可用端口的 UDP 套接字?

Linux中的双UDP套接字绑定

重新绑定 UDP 套接字

绑定多播 (UDP) 套接字是啥意思?

实现UDP高效接收/响应

使用 C 绑定到 IOCP 的 UDP 套接字