将 JavaDStream<String> 转换为 JavaRDD<String>

Posted

技术标签:

【中文标题】将 JavaDStream<String> 转换为 JavaRDD<String>【英文标题】:Convert JavaDStream<String> to JavaRDD<String> 【发布时间】:2014-10-31 04:11:58 【问题描述】:

我有一个从外部源获取数据的 JavaDStream。我正在尝试集成 Spark Streaming 和 SparkSQL。众所周知,JavaDStream 是由 JavaRDD 的 .当我有 JavaRDD 时,我只能应用函数 applySchema()。请帮助我将其转换为 JavaRDD。我知道 scala 中有一些函数,而且它更容易。但是请帮助我使用 Java。

【问题讨论】:

【参考方案1】:

您不能将 DStream 转换为 RDD。正如您所提到的, DStream 包含 RDD。访问 RDD 的方法是使用 foreachRDD 对 DStream 的每个 RDD 应用一个函数。查看文档:https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#foreachRDD(org.apache.spark.api.java.function.Function2)

【讨论】:

谢谢。帮了我很多!【参考方案2】:

您必须首先使用 forEachRDD 访问 DStream 中的所有 RDD:

javaDStream.foreachRDD( rdd => 
    rdd.collect.foreach(
        ...
    )
)

【讨论】:

【参考方案3】:

我希望这有助于将 JavaDstream 转换为 JavaRDD!

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);

    //Create JavaRDD<Row>
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() 
        @Override
        public void call(JavaRDD<String> rdd) 
            JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() 
                @Override
                public Row call(String msg) 
                    Row row = RowFactory.create(msg);
                    return row;
                
            );
            //Create Schema
            StructType schema = DataTypes.createStructType(new StructField[] 
                    DataTypes.createStructField("value", DataTypes.StringType, true));
            //Get Spark 2.0 session
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
            Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
            msgDataFrame.show();

【讨论】:

以上是关于将 JavaDStream<String> 转换为 JavaRDD<String>的主要内容,如果未能解决你的问题,请参考以下文章

将 System.List<string> 转换为 Java.Lang.String[]

Java递归方法遍历二叉树的代码

使用点分隔符将 List 转换为 String 到 Map<String, Object>

使用 JPA 存储 Map<String,String>

参数类型 'List<Series<dynamic, dynamic>>' 不能分配给参数类型 'List<Series<dynamic, String>&g

javascript将object转string字符串