Spark2 Kafka结构化流Java不知道from_json函数
Posted
技术标签:
【中文标题】Spark2 Kafka结构化流Java不知道from_json函数【英文标题】:Spark2 Kafka Structured Streaming Java doesn't know from_json function 【发布时间】:2018-09-24 05:47:32 【问题描述】:我有一个关于 Kafka 流上的 Spark 结构化流的问题。
我有一个类型的架构:
StructType schema = new StructType()
.add("field1", StringType)
.add("field2", StringType)
.add("field3", StringType)
.add("field4", StringType)
.add("field5", StringType);
我从 Kafka 主题引导我的流,例如:
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "brokerlist")
.option("zookeeper.connect", "zk_url")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load();
接下来转换成字符串,字符串类型:
Dataset<Row> df1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
现在我想将值字段(这是一个 JSON)转换为之前转换的模式,这应该会使 SQL 查询更容易:
Dataset<Row> df2 = df1.select(from_json("value", schema=schema).as("data").select("single_column_field");
Spark 2.3.1 好像不知道from_json
函数?
这是我的进口:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
关于如何解决这个问题的任何想法?请注意,我不是在寻找 Scala 解决方案,而是纯粹基于 Java 的解决方案!
【问题讨论】:
import static org.apache.spark.sql.functions.from_json
【参考方案1】:
这段代码对我有用。希望对你有帮助
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.34.216:9092")
.option("subscribe", "topicName")
.load()
//df.show();
import spark.implicits._
val comingXDR = df.select("value").as[String].withColumn("_tmp", split($"value", "\\,")).withColumn("MyNewColumnName1", $"_tmp".getItem(0)).withColumn("MyNewColumnName2", $"_tmp".getItem(1)).withColumn("MyNewColumnName3", $"_tmp".getItem(2)).withColumn("MyNewColumnName4", $"_tmp".getItem(3)).drop("value").drop("_tmp")
【讨论】:
以上是关于Spark2 Kafka结构化流Java不知道from_json函数的主要内容,如果未能解决你的问题,请参考以下文章
结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar
结构化流 + Kafka 集成 - SSL 和 Kerberos 集成?
我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?
Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?