如何仅从 kafka 资源中获取值以激发火花?
Posted
技术标签:
【中文标题】如何仅从 kafka 资源中获取值以激发火花?【英文标题】:How to get only values from kafka sources to spark? 【发布时间】:2019-07-22 02:29:32 【问题描述】:我正在从 kafka 来源获取日志,并将其放入 spark 中。
保存在我的 hadoop_path 中的日志格式如下所示"value":"\"Name\":\"Amy\",\"Age\":\"22\""
"value":"\"Name\":\"Jin\",\"Age\":\"26\""
但是,我想让这个像\"Name\":\"Amy\",\"Age\":\"22\"
\"Name\":\"Jin\",\"Age\":\"26\"
任何类型的解决方案都会很棒。 (使用纯 Java 代码、Spark SQL 或 Kafka)
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
【问题讨论】:
【参考方案1】:使用以下内容:
Dataframe<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
df.printSchema();
StreamingQuery queryone = df.selectExpr("CAST(value AS STRING)")
.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
确保架构包含 value
作为列。
【讨论】:
首先,它返回的结果和我的一样。这意味着,Hadoop_path 中的 json 文件也包含 ""value":" 部分。此外,我不确定你的代码和我的代码有什么区别,因为我的代码也使用dg=df.selectExpr("CAST(value AS STRING)")
。请问您对为什么我不能只获取值而不包括 ""value":" 部分有何看法?
您是否检查了您的 df 或 ds 的架构?我的猜测是您的数据不是正确的 json 格式?
Kafka 源抛出 7 维模式,包括键、值、时间戳等。“值”的值显然是 json。但是当我使用printschema
时,它只显示“value”:[something]。【参考方案2】:
您可以使用 Spark 获得预期的结果,如下所示:
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)")
.withColumn("Name", functions.json_tuple(functions.col("value"),"Name"))
.withColumn("Age", functions.json_tuple(functions.col("value"),"Age"));
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
基本上,您必须为 value 列中 json 字符串内的每个字段创建单独的列。
【讨论】:
【参考方案3】:我已经用 from_json 函数完成了!!
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MYApp").getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", Kafka_source)
.option("subscribe", Kafka_topic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss",false)
.load();
Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
Dataset<Row> dz = dg.select(
from_json(dg.col("value"), DataTypes.createStructType(
new StructField[]
DataTypes.createStructField("Name", StringType,true)
)).getField("Name").alias("Name")
,from_json(dg.col("value"), DataTypes.createStructType(
new StructField[]
DataTypes.createStructField("Age", IntegerType,true)
)).getField("Age").alias("Age")
StreamingQuery queryone = dg.writeStream()
.format("json")
.outputMode("append")
.option("checkpointLocation",Hadoop_path)
.option("path",Hadoop_path)
.start();
【讨论】:
以上是关于如何仅从 kafka 资源中获取值以激发火花?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?