spark常用操作

Posted wangbin2188

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark常用操作相关的知识,希望对你有一定的参考价值。

//spark读取数据
Dataset<Row> df = spark.read().textFile(currentSrcPath, 1);
Dataset<Row> df = spark.read().json(path);
Dataset<Row> df = spark.read().orc(path);
Dataset<Row> parquet = spark.read().parquet(path);

//spark写入数据
df.write().mode("overwrite").text(outputPath);
df.write().mode("overwrite").parquet(outputPath);
df.write().mode("overwrite").orc(outputPath);

//rdd转Dataset<Row>
Dataset<Row> df = spark.createDataFrame(rowRDD, AdjustSchema.row);

//list转Dataset
Dataset<String> dataset = spark.createDataset(Collections.singletonList(Long.toString(startTime)), Encoders.STRING());

 

//从spark获取hadoop FileSystem
FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());

 

//构建schema
public static StructType row = DataTypes.createStructType(
            Arrays.asList(
                    DataTypes.createStructField("phone_name", StringType, true),
                    DataTypes.createStructField("app_id", StringType, true)
...
));

 

//rdd/javaRDD转DataFrame(Dataset<Row>)
Dataset<Row> personDF = spark.createDataFrame(personRDD, Person.class);
spark.createDataFrame(personRDD, PersonSchema);
personDF = spark.createDataFrame(personJavaRDD, Person.class);

//rdd转Dataset
Encoder<Person> personEncoder = Encoders.bean(Person.class);
personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);

//list直接构建Dataset
Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);

//JavaRDD<Row>转Dataset<Row>
JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);

//Dataset<Person> -> JavaRDD<Person>
personJavaRDD = personDS.toJavaRDD();

//Dataset<Row> -> JavaRDD<Person>
personJavaRDD = personDF.toJavaRDD().map(row -> {
          String name = row.getAs("name");
          int age = row.getAs("age");
          return new Person(name, age);
      });

//Dataset<Person> -> Dataset<Row>
ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
      Dataset<Row> personDF_fromDS = personDS.map(
              (MapFunction<Person, Row>) person -> {
                  List<Object> objectList = new ArrayList<>();
                  objectList.add(person.name);
                  objectList.add(person.age);
                  return RowFactory.create(objectList.toArray());
              },
              rowEncoder
      );

//Dataset<Row> -> Dataset<Person>
personDS = personDF.map(new MapFunction<Row, Person>() {
          @Override
          public Person call(Row value) throws Exception {
              return new Person(value.getAs("name"), value.getAs("age"));
          }
      }, personEncoder);

 

以上是关于spark常用操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark闭包与序列化

python+spark程序代码片段

Spark:如何加速 foreachRDD?

提效小技巧——记录那些不常用的代码片段

Spark2 DataFrame数据框常用操作之统计指标:mean均值,variance方差,stddev标准差,corr(Pearson相关系数),skewness偏度,kurtosis峰度((代码

C#程序员经常用到的10个实用代码片段 - 操作系统