SPARK 结构化流中的 StructField 是不是存在错误

Posted

技术标签:

【中文标题】SPARK 结构化流中的 StructField 是不是存在错误【英文标题】:Is there a bug about StructField in SPARK Structured StreamingSPARK 结构化流中的 StructField 是否存在错误 【发布时间】:2017-01-06 08:47:57 【问题描述】:

当我尝试这个时:

cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).getOrCreate()

lines = spark.readStream.load(format='socket', host='localhost', port=9999,
                              schema=StructType(StructField('value', StringType, True)))
words = lines.groupBy('value').count()
query = words.writeStream.format('console').outputMode("complete").start()

query.awaitTermination()

然后我得到一些错误:

AssertionError: dataType 应该是 DataType

我在 ./pyspark/sql/types.py 的第 403 行搜索源代码:

assert isinstance(dataType, DataType), "dataType should be DataType"

StringType 基于 AtomicType 而不是 DataType强>

class StringType(AtomicType):
    """String data type.
    """

    __metaclass__ = DataTypeSingleton

那么是不是搞错了?

【问题讨论】:

【参考方案1】:

在 Python 中,DataTypes 不用作单例。创建StructField 时,您必须使用实例。同样StructType 需要StructField 的序列:

StructType([StructField('value', StringType(), True)])

尽管如此,这在这里完全没有意义。 TextSocketSource is fixed and cannot be modified 的架构,带有架构参数。

【讨论】:

以上是关于SPARK 结构化流中的 StructField 是不是存在错误的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流中的临时视图

Spark 结构化流中的外部连接

带有自定义接收器的 Spark 结构化流中的输入行数

在索引 spark-shell/scala 处更改 Array[StructField] 中的数据类型

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?