CSV 数据源不支持二进制数据类型

Posted

技术标签:

【中文标题】CSV 数据源不支持二进制数据类型【英文标题】:CSV data source does not support binary data type 【发布时间】:2019-04-26 16:31:12 【问题描述】:

我正在尝试运行一个 spark-streaming 应用程序,它从 kafka 流中读取数据并对其进行处理。我正在运行以下内容。

val schema = new StructType()
      .add("InvoiceNo", LongType)
      .add("StockCode", LongType)
      .add("Description", StringType)
      .add("Quantity", ShortType)
      .add("InvoiceDate", StringType)
      .add("UnitPrice", DoubleType)
      .add("CustomerID", IntegerType)
      .add("Country", StringType)


    val df = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", conf.get("spark.kafka_bootstrap_servers")).
      option("subscribe", "webserver").
      option("kafka.security.protocol", "SASL_SSL").
      option("kafka.sasl.mechanism", "PLAIN").
      option("kafka.ssl.protocol", "TLSv1.2").
      option("kafka.ssl.enabled.protocols", "TLSv1.2").
      option("failOnDataLoss", "false").
      load()

我收到以下错误。

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: CSV data source does not support binary data type.

我在流中给出的 csv 是

536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,01/12/10 8:26,2.55,17850,United Kingdom

此错误的原因可能是什么?

【问题讨论】:

【参考方案1】:

spark.readStream.format("kafka") 始终将数据读取为二进制,而不是字符串。

值总是使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化值 - https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

不清楚您在哪里使用了.csv()schema 变量。

你可以在docs how it casts keys and values to strings看到

【讨论】:

我如何执行架构? 好吧,首先您必须将 kafka 消息转换为至少一个字符串。我没有流式传输 CSV 的经验,因为这只是放入 Kafka 的一种糟糕的格式。 Json 或 Avro 效果最好。例如databricks.com/blog/2017/04/26/… 你可能想看到这个***.com/questions/47903276/…

以上是关于CSV 数据源不支持二进制数据类型的主要内容,如果未能解决你的问题,请参考以下文章

除Innodb和MyISAM外MySQL所支持的存储引擎

java之二进制与数据类型

将数据插入十进制类型的数据库表字段(12,2)

redis常见面试题

Qt生成CSV 文件

如何查询blob类型中存的是啥格式的文件