当 AMQ 主题中没有数据可读取时如何停止流式传输

Posted

技术标签:

【中文标题】当 AMQ 主题中没有数据可读取时如何停止流式传输【英文标题】:How to stop streaming when no data is left to read in the AMQ topic 【发布时间】:2021-11-10 22:53:05 【问题描述】:

我正在使用火花流从 AMQ 中读取数据。我希望在消息队列中没有数据时停止流式传输。我创建了一个自定义接收器,它连接到 AMQ 主题并开始读取数据,但是工作人员如何告诉驱动程序没有剩余数据,以便它可以停止流式传输。

class CustomReceiver(brokerURL, topic, ...)

    def onStart() 
      new Thread("AMQ Receiver") 
        override def run()  receive() 
      .start()
    

    def onStop() 

    private def receive() 
      activeMQStream = new ActiveMQStream(broker, topic, ...)
      val topicSubscriber = activeMQStream.getTopicSubscriber()

      while(!isStopped && !ActiveMQReceiver.stop)
         val message = topicSubscriber.receive(timeOutInMilliseconds)
         if (message != null && message.isInstanceOf[TextMessage]) 
             val textMessage = message.asInstanceOf[TextMessage];
             val text = textMessage.getText();
             store(text)
             println("ActiveMQReceiver: there is data from AMQ ....")
          else 
             ActiveMQReceiver.stop = true
             println("ActiveMQReceiver: No more data from AMQ .....")
         
    

    def checkStatus(): Boolean =
        ActiveMQReceiver.stop
    



object ActiveMQReceiver
  @volatile var stop: Boolean = false

正如您在上面看到的,当没有数据可供读取时,我尝试将停止标志设置为 true,但是当我运行以下命令时,该标志始终为 False,在搜索后我发现工作人员不共享变量。我尝试用 Accumulator 替换它,但也没有用。

var ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val customReceiver = new CustomReceiver(brokerURL, topic, ...)
val stream: DStream[String] = ssc.receiverStream(customReceiver)
var driverList = List[String]()
stream.foreachRDD  rdd =>
  if(rdd.count() > 0)
    val fromWorker = rdd.collect().toList
    driverList = driverList:::fromWorker
  
 

var stopFlag = false
var isStopped = false
val checkIntervalMillis = 10000
while (!isStopped) 
  isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
  println("Check if stop flag was raised")
  stopFlag = customReceiver.checkStatus()

  if (!isStopped && stopFlag) 
    var seq = driverList.toSeq
    import spark.implicits._
    val df = seq.toDS()
    println("Request to stop")
    ssc.stop(false, true)
  

【问题讨论】:

【参考方案1】:

依靠receive() 返回null 来表示没有剩余数据在生产中是不可靠的。这种方法消除了任何自我修复和故障转移支持,并引入了一个计时/竞争条件,您可能会变得“不走运”。作为替代方案,请查看使用消息组并将流中最后一条消息的标头设置为使用明确定义的 message 发出信号。

Message Groups

【讨论】:

以上是关于当 AMQ 主题中没有数据可读取时如何停止流式传输的主要内容,如果未能解决你的问题,请参考以下文章

当我们使用 WebView 流式传输视频时如何获得对 MoviePlayer 的控制

如何从本地目录中读取,kmeans 流式传输 pyspark

在 C++ 中连续流式传输 PCM 数据?

将数据流式传输到BigQuery

Spark 流式传输作业在被驱动程序停止后失败

iOS 8 在 10 分钟后停止在后台流式传输音频