Spark 结构化流式 Elasticsearch 集成问题。数据源es不支持流式写入

Posted

技术标签:

【中文标题】Spark 结构化流式 Elasticsearch 集成问题。数据源es不支持流式写入【英文标题】:Spark structured streaming Elasticsearch integration issue. Data source es does not support streamed writing 【发布时间】:2017-09-26 07:28:22 【问题描述】:

我正在编写一个 Spark 结构化流应用程序,其中使用 Spark 处理的数据需要下沉到弹性搜索。

这是我的开发环境,因此我有一个独立的 Elastic 搜索。

我尝试了以下两种方法将DataSet中的数据下沉到ES。

1.ds.writeStream().format("org.elasticsearch.spark.sql").start("spark/orders"); 2.ds.writeStream().format("es").start("spark/orders");

在这两种情况下,我都会收到以下错误:

原因:

java.lang.UnsupportedOperationException: Data source es does not support streamed writing
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287) ~[spark-sql_2.11-2.1.1.jar:2.1.1]
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:272) ~[spark-sql_2.11-2.1.1.jar:2.1.1]
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:213) ~[spark-sql_2.11-2.1.1.jar:2.1.1]

pom.xml:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>

    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>

    </dependency>
    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.11</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>5.6.1</version>
    </dependency>


Appreciate any help in resolving this issue.

【问题讨论】:

我正在运行 Spark 2.1.1 和 ES 5.6.1 你们成功了吗?我现在遇到了同样的问题 【参考方案1】:

你可以试试

 ds.write.format("org.elasticsearch.spark.sql").option("es.resource",ES_INDEX+"/"+ES_TYPE).option("es.mapping.id",ES_ID).mode("overwrite").save()

【讨论】:

它是一个流媒体应用程序。我将无法使用 write,我必须使用 writeStream 不是吗?【参考方案2】:

Elasticsearch 接收器不支持流式写入,这意味着您无法将输出流式传输到 Elasticsearch。 您可以将流输出写入 kafka 并使用 logstash 从 kafka 读取到 elasticsearch。

【讨论】:

【参考方案3】:

更新

使用 Spark 2.2.0 时,Elasticsearch 6.x 版本现在支持流式写入。

依赖:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.2.4</version>
</dependency>

写流代码:

 ds
  .writeStream
  .outputMode(OutputMode.Append()) // only append mode is currently supported
  .format("es")
  .option("checkpointLocation", "/my/checkpointLocation")
  .option("es.mapping.id", "MY_OPTIONAL_ID_ATTRIBUTE")
  .trigger(Trigger.ProcessingTime(5, TimeUnit.SECONDS))
  .start("index/type")

【讨论】:

这个答案解决了问题吗?我试图实现一个类似的解决方案,唯一的区别是我必须使用.format("org.elasticsearch.spark.sql")。我看到以下错误org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [structuredTest/testType]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?) 你能分享你的代码吗?可能是您的映射问题 我已经解决了这个问题,这是由于索引中的大写字符造成的。我不知道他们是不允许的。

以上是关于Spark 结构化流式 Elasticsearch 集成问题。数据源es不支持流式写入的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流式蓝/绿部署

如何在 Spark 结构化流式连接中选择最新记录

与 deviceid 对应的 Spark 结构化流式水印

Spark 结构化流式传输 - 为不同的 GroupBy 键使用不同的 Windows

从多个 Kafka 主题读取的 Spark 结构化流式应用程序

Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)