如何创建具有指定架构的空 DataFrame?

Posted

技术标签:

【中文标题】如何创建具有指定架构的空 DataFrame?【英文标题】:How to create an empty DataFrame with a specified schema? 【发布时间】:2015-07-17 13:58:53 【问题描述】:

我想在 Scala 中使用指定架构在 DataFrame 上创建。我曾尝试使用 JSON 读取(我的意思是读取空文件),但我认为这不是最佳做法。

【问题讨论】:

【参考方案1】:

假设您想要一个具有以下架构的数据框:

root
 |-- k: string (nullable = true)
 |-- v: integer (nullable = false)

您只需为数据框定义架构并使用空的RDD[Row]

import org.apache.spark.sql.types.
    StructType, StructField, StringType, IntegerType
import org.apache.spark.sql.Row

val schema = StructType(
    StructField("k", StringType, true) ::
    StructField("v", IntegerType, false) :: Nil)

// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema) 
spark.createDataFrame(sc.emptyRDD[Row], schema)

PySpark 等效项几乎相同:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])

# or df = sc.parallelize([]).toDF(schema)

# Spark < 2.0 
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)

使用带有 Product 类型的隐式编码器(仅限 Scala),例如 Tuple

import spark.implicits._

Seq.empty[(String, Int)].toDF("k", "v")

或案例类:

case class KV(k: String, v: Int)

Seq.empty[KV].toDF

spark.emptyDataset[KV].toDF

【讨论】:

这是最合适的答案 - 完整,如果您想快速重现现有数据集的架构,它也很有用。我不知道为什么它不被接受。 如何使用 trait 而不是 case 类创建 df:***.com/questions/64276952/…【参考方案2】:

从 Spark 2.0.0 开始,您可以执行以下操作。

案例分类

让我们定义一个Person 案例类:

scala> case class Person(id: Int, name: String)
defined class Person

导入sparkSparkSession 隐式Encoders

scala> import spark.implicits._
import spark.implicits._

并使用 SparkSession 创建一个空的Dataset[Person]

scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]

架构 DSL

您还可以使用模式“DSL”(请参阅​​org.apache.spark.sql.ColumnName 中的DataFrames 的支持函数)。

scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)

scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)

scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType

scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> emptyDF.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

【讨论】:

嗨,编译器说我的模块上不存在spark.emptyDataset,如何使用它?有一些(正确的)类似于(不正确的)val df = apache.spark.emptyDataset[RawData]? @PeterKrauss spark 是您使用 SparkSession.builder 创建的值,而不是 org.apache.spark 包的一部分。有两个 spark 名称正在使用中。这是开箱即用的spark spark-shell 谢谢雅克。我更正了:SparkSession.builder 对象作为参数传递(似乎是最好的解决方案)从第一次常规初始化开始,现在正在运行。 有没有办法使用特征而不是案例类来创建空数据框:***.com/questions/64276952/…【参考方案3】:
import scala.reflect.runtime.universe => ru
def createEmptyDataFrame[T: ru.TypeTag] =
    hiveContext.createDataFrame(sc.emptyRDD[Row],
      ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
    )
  case class RawData(id: String, firstname: String, lastname: String, age: Int)
  val sourceDF = createEmptyDataFrame[RawData]

【讨论】:

【参考方案4】:

在这里,您可以使用 Scala 中的 StructType 创建架构并传递 Empty RDD,以便您能够创建空表。 以下代码是相同的。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType



//import org.apache.hadoop.hive.serde2.objectinspector.StructField

object EmptyTable extends App 
  val conf = new SparkConf;
  val sc = new SparkContext(conf)
  //create sparksession object
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

  //Created schema for three columns 
   val schema = StructType(
    StructField("Emp_ID", LongType, true) ::
      StructField("Emp_Name", StringType, false) ::
      StructField("Emp_Salary", LongType, false) :: Nil)

      //Created Empty RDD 

  var dataRDD = sc.emptyRDD[Row]

  //pass rdd and schema to create dataframe
  val newDFSchema = sparkSession.createDataFrame(dataRDD, schema)

  newDFSchema.createOrReplaceTempView("tempSchema")

  sparkSession.sql("create table Finaltable AS select * from tempSchema")


【讨论】:

【参考方案5】:

创建空DataSet的Java版本:

public Dataset<Row> emptyDataSet()

    SparkSession spark = SparkSession.builder().appName("Simple Application")
                .config("spark.master", "local").getOrCreate();

    Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema());

    return emptyDataSet;


public StructType getSchema() 

    String schemaString = "column1 column2 column3 column4 column5";

    List<StructField> fields = new ArrayList<>();

    StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true);
    fields.add(indexField);

    for (String fieldName : schemaString.split(" ")) 
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    

    StructType schema = DataTypes.createStructType(fields);

    return schema;

【讨论】:

【参考方案6】:

这是在 pyspark 2.0.0 或更高版本中创建空数据框的解决方案。

from pyspark.sql import SQLContext
sc = spark.sparkContext
schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)])
sqlContext.createDataFrame(sc.emptyRDD(), schema)

【讨论】:

【参考方案7】:

这有助于测试目的。

Seq.empty[String].toDF()

【讨论】:

如何从 trait 创建空 df :***.com/questions/64276952/…【参考方案8】:

我有一个特殊要求,其中我已经有一个数据框,但在特定条件下我必须返回一个空数据框,所以我返回了df.limit(0)

【讨论】:

【参考方案9】:

从 Spark 2.4.3 开始

val df = SparkSession.builder().getOrCreate().emptyDataFrame

【讨论】:

这并不能解决问题的架构部分。

以上是关于如何创建具有指定架构的空 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 GBQ 中创建具有特定模式的空表?

如何在 Spark 的空 Dataframe 中添加行记录

Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据

如何从列类型列表中删除 pandas DataFrame 中的空值

如何使用文件中的列和字段创建 DataFrame?

spark dataframe 和 scala Map互相转换