如何仅从 kafka 资源中获取值以激发火花?

Posted

技术标签:

【中文标题】如何仅从 kafka 资源中获取值以激发火花?【英文标题】:How to get only values from kafka sources to spark? 【发布时间】:2019-07-22 02:29:32 【问题描述】:

我正在从 kafka 来源获取日志,并将其放入 spark 中。 保存在我的 hadoop_path 中的日志格式如下所示"value":"\"Name\":\"Amy\",\"Age\":\"22\"""value":"\"Name\":\"Jin\",\"Age\":\"26\""

但是,我想让这个像\"Name\":\"Amy\",\"Age\":\"22\"\"Name\":\"Jin\",\"Age\":\"26\"

任何类型的解决方案都会很棒。 (使用纯 Java 代码、Spark SQL 或 Kafka)

        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("MYApp").getOrCreate();
        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", Kafka_source)
                .option("subscribe", Kafka_topic)
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss",false)
                .load();
        Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
        StreamingQuery queryone = dg.writeStream()
                .format("json")
                .outputMode("append")
                .option("checkpointLocation",Hadoop_path)
                .option("path",Hadoop_path)
                .start();

【问题讨论】:

【参考方案1】:

使用以下内容:

Dataframe<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", Kafka_source)
                .option("subscribe", Kafka_topic)
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss",false)
                .load();
df.printSchema();
StreamingQuery queryone = df.selectExpr("CAST(value AS STRING)")
            .writeStream()
            .format("json")
            .outputMode("append")
            .option("checkpointLocation",Hadoop_path)
            .option("path",Hadoop_path)
            .start();

确保架构包含 value 作为列。

【讨论】:

首先,它返回的结果和我的一样。这意味着,Hadoop_path 中的 json 文件也包含 ""value":" 部分。此外,我不确定你的代码和我的代码有什么区别,因为我的代码也使用dg=df.selectExpr("CAST(value AS STRING)")。请问您对为什么我不能只获取值而不包括 ""value":" 部分有何看法? 您是否检查了您的 df 或 ds 的架构?我的猜测是您的数据不是正确的 json 格式? Kafka 源抛出 7 维模式,包括键、值、时间戳等。“值”的值显然是 json。但是当我使用printschema 时,它只显示“value”:[something]。【参考方案2】:

您可以使用 Spark 获得预期的结果,如下所示:

SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("MYApp").getOrCreate();

Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", Kafka_source)
                .option("subscribe", Kafka_topic)
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss",false)
                .load();

Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)")
        .withColumn("Name", functions.json_tuple(functions.col("value"),"Name"))
        .withColumn("Age", functions.json_tuple(functions.col("value"),"Age"));

StreamingQuery queryone = dg.writeStream()
                .format("json")
                .outputMode("append")
                .option("checkpointLocation",Hadoop_path)
                .option("path",Hadoop_path)
                .start();

基本上,您必须为 value 列中 json 字符串内的每个字段创建单独的列。

【讨论】:

【参考方案3】:

我已经用 from_json 函数完成了!!

        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("MYApp").getOrCreate();
        Dataset<Row> df = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", Kafka_source)
                .option("subscribe", Kafka_topic)
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss",false)
                .load();
        Dataset<Row> dg = df.selectExpr("CAST(value AS STRING)");
        Dataset<Row> dz = dg.select(
                        from_json(dg.col("value"), DataTypes.createStructType(
                        new StructField[] 
                                DataTypes.createStructField("Name", StringType,true)
                        )).getField("Name").alias("Name")
                        ,from_json(dg.col("value"), DataTypes.createStructType(
                        new StructField[] 
                                DataTypes.createStructField("Age", IntegerType,true)
                        )).getField("Age").alias("Age")
        StreamingQuery queryone = dg.writeStream()
                .format("json")
                .outputMode("append")
                .option("checkpointLocation",Hadoop_path)
                .option("path",Hadoop_path)
                .start();

【讨论】:

以上是关于如何仅从 kafka 资源中获取值以激发火花?的主要内容,如果未能解决你的问题,请参考以下文章

XMPP:客户端仅从资源绑定接收通知

当资源不足时,火花作业将等待来自纱线的资源多长时间?

如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?

在 Kubernetes 中运行 Kafka 时如何管理页面缓存资源

如何将火花流 DF 写入 Kafka 主题

如何在火花结构化流式读取流中倒带 Kafka 偏移