如何设置 Spark 流式接收器频率?
Posted
技术标签:
【中文标题】如何设置 Spark 流式接收器频率?【英文标题】:How to set the Spark streaming receiver frequency? 【发布时间】:2017-06-12 06:19:02 【问题描述】:我的要求是处理股票市场的每小时数据。 即,每个流式传输间隔从源获取一次数据并通过 DStream 进行处理。
我已经实现了一个自定义接收器,通过实现 onStart() 和 onStop() 方法及其工作来废弃/监控网站。
遇到的挑战:
接收线程连续获取数据,即每个间隔多次获取数据。 无法协调接收器和 DStream 执行时间间隔。我尝试过的选项:
-
接收线程休眠几秒钟(等于流传输间隔)。
在这种情况下,数据不是处理时的最新数据。
class CustomReceiver(interval: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)
def onStart()
new Thread("Website Scrapper")
override def run() receive()
.start()
def onStop()
/** Create a socket connection and receive data until receiver is stopped */
private def receive()
println("Entering receive:" + new Date());
try
while (!isStopped)
val scriptsLTP = StockMarket.getLiveStockData()
for ((script, ltp) <- scriptsLTP)
store(script + "," + ltp)
println("sent data")
System.out.println("going to sleep:" + new Date());
Thread.sleep(3600 * 1000);
System.out.println("awaken from sleep:" + new Date());
println("Stopped receiving")
restart("Trying to connect again")
catch
case t: Throwable =>
restart("Error receiving data", t)
println("Exiting receive:" + new Date());
如何使 Spark Streaming 接收器与 DStream 处理同步?
【问题讨论】:
是否可以在流式传输间隔开始时获取数据? 【参考方案1】:这个用例似乎不太适合 Spark Streaming。间隔足够长,可以将其视为常规批处理作业。这样,我们可以更好地利用集群资源。
我将通过并行化目标代码将其重写为 Spark 作业,使用 mapPartitions
将执行程序用作分布式网络抓取工具,然后按预期进行处理。
然后使用 cron
或更高级的替代方案(例如 Chronos)安排 Spark 作业每小时运行一次。
【讨论】:
根据您的观点,流式作业的最大可行间隔是多少? @VijayInnamuri 这取决于用例,所以我不会很乐意输入数字。如果您必须处理的数据不断出现,但您只需要 1 个报告/小时,我会考虑使用 Spark Streaming。这里的重点是 Spark Streaming 是关于 流式传输 数据的,您的用例更像是需要在特定时间运行的批处理作业。另请注意,Spark Streaming 将在其运行时分配集群资源。您的用例看起来像每小时处理几分钟。不断阻塞大量资源以供如此低的使用率是有害的, 感谢您的澄清。以上是关于如何设置 Spark 流式接收器频率?的主要内容,如果未能解决你的问题,请参考以下文章
Spark 2.4重磅发布:优化深度学习框架集成,提供更灵活的流式接收器
如何在 Spark Streaming 中使用基于数据集的转换?
使用自定义接收器从数据集中接收流数据 [Spark Streaming]