如何从 Zeppelin 中的控制台流接收器获取输出?

Posted

技术标签:

【中文标题】如何从 Zeppelin 中的控制台流接收器获取输出?【英文标题】:How to get the output from console streaming sink in Zeppelin? 【发布时间】:2018-05-01 15:07:20 【问题描述】:

当从 Zeppelin 运行时,我正在努力让 console 接收器与 PySpark Structured Streaming 一起工作。基本上,我没有看到任何结果打印到屏幕或我找到的任何日志文件中。

我的问题:有没有人有一个使用 PySpark 结构化流和接收器的工作示例,该接收器产生在 Apache Zeppelin 中可见的输出?理想情况下,它也会使用套接字源,因为这很容易测试。

我正在使用:

Ubuntu 16.04 spark-2.2.0-bin-hadoop2.7 zeppelin-0.7.3-bin-all Python3

我的代码基于structured_network_wordcount.py example。它在从 PySpark shell (./bin/pyspark --master local[2]) 运行时工作;我看到每批的表格。

%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, '10 seconds', '1 seconds'),
    words.word
).count().orderBy('window')

# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate', 'false')\
    .start()

print("Starting...")
query.awaitTermination(20)

我希望看到每批结果的打印输出,但我只看到Starting...,然后是False,返回值query.awaitTermination(20)

在一个单独的终端中,我在上面运行时将一些数据输入到nc -lk 9999 netcat 会话中。

【问题讨论】:

【参考方案1】:

zeppelin-0.7.3-bin-all 使用 Spark 2.1.0(不幸的是,没有rate 格式来测试结构化流)。


确保当您start 时,带有socketnc -lk 9999 的流式查询已经启动(否则查询将停止)。

还要确保查询确实已启动并正在运行。

val lines = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load
val q = lines.writeStream.format("console").start

您确实无法在 Zeppelin 笔记本中看到输出可能,因为:

    流式查询从它们自己的线程开始(这似乎超出了 Zeppelin 的范围)

    console sink writes to standard output(在该单独线程上使用 Dataset.show 运算符)。

所有这些都使得“拦截”输出在 Zeppelin 中不可用。

所以我们来回答真正的问题:

Zeppelin 中的标准输出写入何处?

好吧,由于对 Zeppelin 内部的了解非常有限,我认为它可能是 logs/zeppelin-interpreter-spark-[hostname].log,但遗憾的是找不到来自 console 接收器的输出。在那里您可以找到使用 log4j 但 console sink 不使用的 Spark(尤其是结构化流)的日志。

看起来您唯一的长期解决方案是编写自己的 console 类自定义接收器并使用 log4j 记录器。老实说,这并不像听起来那么难。关注the sources of console sink。

【讨论】:

仅作记录 - Zeppelin 可以与用户提供的 Spark 安装一起使用,独立于嵌入式安装,它支持 Spark 2.2(如问题所示)。因此,您可以根据需要使用“rate”格式。【参考方案2】:

控制台接收器不是基于笔记本的交互式工作流的好选择。即使在可以捕获输出的 Scala 中,它也需要在同一段落中调用 awaitTermination(或等效项),从而有效地阻止注释。

%spark

spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "9999")
  .option("includeTimestamp", "true")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination() // Block execution, to force Zeppelin to capture the output

链式awaitTermination 可以替换为独立调用在同一段落中也可以:

%spark

val query = df
  .writeStream
  ...
  .start()

query.awaitTermination()

没有它,Zeppelin 没有理由等待任何输出。 PySpark 只是在此之上增加了另一个问题 - 间接执行。因此,即使在这里阻止查询也无济于事。

此外,在浏览笔记时,流的连续输出可能会导致呈现问题和内存问题(可以通过InterpreterContext 或 REST API 使用 Zeppelin 显示系统,以实现更明智的行为,其中输出为被覆盖或定期清除)。

使用 Zeppelin 进行测试的更好选择是 memory sink。这样您就可以在不阻塞的情况下启动查询:

%pyspark

query = (windowedCounts
  .writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("some_name")
  .start())

并在另一个段落中按需查询结果:

%pyspark

spark.table("some_name").show()

它可以与reactive streams 或类似的解决方案结合使用,以提供基于间隔的更新。

也可以使用 StreamingQueryListener 和 Py4j 回调来耦合 rxonQueryProgress 事件,尽管 PySpark 不支持查询侦听器,并且需要一些代码来将事情粘合在一起。 Scala 接口:

package com.example.spark.observer

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

trait PythonObserver 
  def on_next(o: Object): Unit


class PythonStreamingQueryListener(observer: PythonObserver) 
    extends StreamingQueryListener 
  override def onQueryProgress(event: QueryProgressEvent): Unit = 
    observer.on_next(event)
  
  override def onQueryStarted(event: QueryStartedEvent): Unit = 
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = 

构建一个 jar,调整构建定义以反映所需的 Scala 和 Spark 版本:

scalaVersion := "2.11.8"  

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion
)

把它放在 Spark 类路径上,补丁 StreamingQueryManager:

%pyspark

from pyspark.sql.streaming import StreamingQueryManager
from pyspark import SparkContext

def addListener(self, listener):
    jvm = SparkContext._active_spark_context._jvm
    jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
        listener
    )
    self._jsqm.addListener(jlistener)
    return jlistener


StreamingQueryManager.addListener = addListener

启动回调服务器:

%pyspark

sc._gateway.start_callback_server()

并添加监听器:

%pyspark

from rx.subjects import Subject

class StreamingObserver(Subject):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

observer = StreamingObserver()
spark.streams.addListener(observer)

最后你可以使用subscribe 并阻止执行:

%pyspark

(observer
    .map(lambda p: p.progress().name())
    # .filter() can be used to print only for a specific query
    .subscribe(lambda n: spark.table(n).show() if n else None))
input()  # Block execution to capture the output 

最后一步应该在您开始流式查询后执行。

也可以跳过rx 并像这样使用最少的观察者:

class StreamingObserver(object):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

    def on_next(self, value):
        try:
            name = value.progress().name()
            if name:
                spark.table(name).show()
        except: pass

它比Subject 提供的控制要少一些(需要注意的是,这会干扰其他代码打印到标准输出,并且只能由removing listener 停止。使用Subject,您可以轻松地dispose @987654347 @观察者,一旦你完成),但在其他方面应该或多或少相同。

请注意,任何阻塞操作都足以捕获侦听器的输出,而不必在同一个单元格中执行。例如

%pyspark

observer = StreamingObserver()
spark.streams.addListener(observer)

%pyspark

import time

time.sleep(42)

将以类似的方式工作,在定义的时间间隔内打印表格。

为了完整起见,您可以实现StreamingQueryManager.removeListener

【讨论】:

这确实会在查询运行时定期在 Zeppelin 中打印表格,哇。 由于这个答案来自 2017 年,我想知道现在 PySpark 中是否可以直接使用 StreamingQueryListener API?

以上是关于如何从 Zeppelin 中的控制台流接收器获取输出?的主要内容,如果未能解决你的问题,请参考以下文章

Java实例——从控制台接收输入的身份证号

结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar

从android中的firebase函数接收json数据给出空值

如何在 Zeppelin 中基于 HadoopGroupProvider 中的组配置角色,使用 Knox 提供基于 SAML 的 SSO?

如何从 Google 助理接收答案作为字符串,而不是作为音频流

在接收端获取 Chromecast 中的队列项目