如何在 Spark 中将 JavaPairInputDStream 转换为 DataSet/DataFrame

Posted

技术标签:

【中文标题】如何在 Spark 中将 JavaPairInputDStream 转换为 DataSet/DataFrame【英文标题】:How to convert JavaPairInputDStream into DataSet/DataFrame in Spark 【发布时间】:2017-04-17 01:17:18 【问题描述】:

我正在尝试从 kafka 接收流数据。在这个过程中,我能够接收流数据并将其存储到 JavaPairInputDStream 中。现在我需要分析这些数据而不将其存储到任何数据库中。所以我想将此 JavaPairInputDStream 转换为 DataSetDataFrame

到目前为止我尝试的是:

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;


import kafka.serializer.StringDecoder;
import scala.Tuple2;

//Streaming Working Code

public class KafkaToSparkStreaming 

    public static  void main(String arr[]) throws InterruptedException
    


        SparkConf conf = new SparkConf();
        conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
        //conf.set("spark.ui.port", "7077");    //Port for application's dashboard, which shows memory and workload data.
        conf.set("dynamicAllocation.enabled","false");  //Which scales the number of executors registered with this application up and down based on the workload
        //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");  //For serializing objects that will be sent over the network or need to be cached in serialized form.
        //conf.setMaster("local");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");

        JavaSparkContext sc = new JavaSparkContext(conf);
        // Create the context with 2 seconds batch size
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Map<String, String> kafkaParams = new HashMap<String, String>();

        kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. 
        kafkaParams.put("group.id", "testgroup");   //String that uniquely identifies the group of consumer processes to which this consumer belongs
        kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
        kafkaParams.put("request.required.acks", "1");  //Producer to require an acknowledgement from the Broker that the message was received.

        Set<String> topics = Collections.singleton("ny-2008.csv");

        //Create an input DStream for Receiving data from socket
        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, 
                String.class, 
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, topics);

    //System.out.println(directKafkaStream);
        directKafkaStream.print();


【问题讨论】:

【参考方案1】:

这是使用 Spark 2.0 的完整工作代码。

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;


public class KafkaToSparkStreaming 
    public static  void main(String arr[]) throws InterruptedException
    


        SparkConf conf = new SparkConf();
        conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
        //conf.set("spark.ui.port", "7077");    //Port for application's dashboard, which shows memory and workload data.
        conf.set("dynamicAllocation.enabled","false");  //Which scales the number of executors registered with this application up and down based on the workload
        //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");  //For serializing objects that will be sent over the network or need to be cached in serialized form.
        conf.setMaster("local");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");

        JavaSparkContext sc = new JavaSparkContext(conf);
        // Create the context with 2 seconds batch size
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Map<String, String> kafkaParams = new HashMap<String, String>();

        kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. 
        kafkaParams.put("group.id", "testgroup");   //String that uniquely identifies the group of consumer processes to which this consumer belongs
        kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
        kafkaParams.put("request.required.acks", "1");  //Producer to require an acknowledgement from the Broker that the message was received.

        Set<String> topics = Collections.singleton("ny-2008.csv");

        //Create an input DStream for Receiving data from socket
        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, 
                String.class, 
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, topics);

        //Create JavaDStream<String>
        JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() 
            @Override
            public String call(Tuple2<String, String> tuple2) 
              return tuple2._2();
            
          );
        //Create JavaRDD<Row>
        msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() 
              @Override
              public void call(JavaRDD<String> rdd)  
                  JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() 
                      @Override
                      public Row call(String msg) 
                        Row row = RowFactory.create(msg);
                        return row;
                      
                    );
        //Create Schema       
        StructType schema = DataTypes.createStructType(new StructField[] DataTypes.createStructField("Message", DataTypes.StringType, true));
        //Get Spark 2.0 session       
        SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
        Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);
        msgDataFrame.show();
              
        );

        ssc.start();            
        ssc.awaitTermination();  
    



class JavaSparkSessionSingleton 
      private static transient SparkSession instance = null;
      public static SparkSession getInstance(SparkConf sparkConf) 
        if (instance == null) 
          instance = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();
        
        return instance;
      
    

【讨论】:

这段代码不工作,JavaSparkSessionSingleton 是编译错误 我已经添加了完整的代码。这将适用于 Spark 2.0。如果要在 Spark 1.6.2 中运行它,则需要修改 JavaSparkSessionSingleton 并获取 SQLContext,如 1.6.2 版本的示例/流/JavaSqlNetworkWordCount.java 中给出的 感谢他的工作,我为这段代码苦苦挣扎了 3 天,你只解决了 1 个小时,你是非常聪明的人,我的邮件 ID 是:kumara1223@gmail.com,请联系我。谢谢你的朋友。【参考方案2】:

从技术上讲,DstreamRDDs 的序列,您不会将Dstream 转换为Datframe,而是将每个RDD 转换为Dataframe/Dataset,如下所示(Scala 代码请用Java 将其转换为您的案例):

stream.foreachRDD  rdd =>

  val dataFrame = rdd.map case (key, value) => Row(key, value).toDF()


【讨论】:

以上是关于如何在 Spark 中将 JavaPairInputDStream 转换为 DataSet/DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中将数组分解为多列

如何在 SparkR 中将额外的参数传递给 spark.lapply?

如何在 Spark 中将双行与阈值匹配?

如何在 zeppelin 中将数组从 spark 绑定到 javascript?

如何在 pyspark 中将 DenseMatrix 转换为 spark DataFrame?

我们如何在 SPARK 2.2.0 中将外部表转换为托管表?