如何根据模式动态生成数据集?

Posted

技术标签:

【中文标题】如何根据模式动态生成数据集?【英文标题】:How to generate datasets dynamically based on schema? 【发布时间】:2018-11-30 07:21:46 【问题描述】:

我有多个模式,如下所示,具有不同的列名和数据类型。 我想使用 DataFrame 和 Scala 为每个模式生成测试/模拟数据并将其保存到镶木地板文件中。

下面是示例模式(来自示例 json),用于动态生成数据,其中包含虚拟值。

val schema1 = StructType(
  List(
    StructField("a", DoubleType, true),
    StructField("aa", StringType, true)
    StructField("p", LongType, true),
    StructField("pp", StringType, true)
  )
)

我需要这样的 rdd/dataframe,每行 1000 行,基于上述架构中的列数。

val data = Seq(
  Row(1d, "happy", 1L, "Iam"),
  Row(2d, "sad", 2L, "Iam"),
  Row(3d, "glad", 3L, "Iam")
)

基本上.. 像这 200 个数据集,我需要为其动态生成数据,为每个方案编写单独的程序对我来说简直是不可能的。

请。帮我想想你的想法或实现。因为我是新手。

是否可以根据不同类型的模式生成动态数据?

【问题讨论】:

见我的linked in article i.e. Test data generation using Spark by using simple Json data descriptor with Columns and DataTypes to load in dwh like Hive. 这是除了下面的答案之外的另一种方式...... 【参考方案1】:

使用@JacekLaskowski 的建议,您可以根据您期望的字段/类型使用带有ScalaCheck (Gen) 的生成器生成动态数据。

它可能看起来像这样:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row, SaveMode
import org.scalacheck._

import scala.collection.JavaConverters._

val dynamicValues: Map[(String, DataType), Gen[Any]] = Map(
  ("a", DoubleType) -> Gen.choose(0.0, 100.0),
  ("aa", StringType) -> Gen.oneOf("happy", "sad", "glad"),
  ("p", LongType) -> Gen.choose(0L, 10L),
  ("pp", StringType) -> Gen.oneOf("Iam", "You're")
)

val schemas = Map(
  "schema1" -> StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("aa", StringType, true),
      StructField("p", LongType, true),
      StructField("pp", StringType, true)
    )),
  "schema2" -> StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("pp", StringType, true),
      StructField("p", LongType, true)
    )
  )
)

val numRecords = 1000

schemas.foreach 
  case (name, schema) =>
    // create a data frame
    spark.createDataFrame(
      // of #numRecords records
      (0 until numRecords).map  _ =>
        // each of them a row
        Row.fromSeq(schema.fields.map(field => 
          // with fields based on the schema's fieldname & type else null
          dynamicValues.get((field.name, field.dataType)).flatMap(_.sample).orNull
        ))
      .asJava, schema)
      // store to parquet
      .write.mode(SaveMode.Overwrite).parquet(name)

【讨论】:

【参考方案2】:

ScalaCheck 是一个生成数据的框架,您可以使用自定义生成器根据架构生成原始数据。

访问ScalaCheck Documentation。

【讨论】:

【参考方案3】:

你可以这样做

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame, SparkSession
import org.json4s
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._

import scala.util.Random

object Test extends App 

  val structType: StructType = StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("aa", StringType, true),
      StructField("p", LongType, true),
      StructField("pp", StringType, true)
    )
  )

  val spark = SparkSession
    .builder()
    .master("local[*]")
    .config(new SparkConf())
    .getOrCreate()

  import spark.implicits._

  val df = createRandomDF(structType, 1000)

  def createRandomDF(structType: StructType, size: Int, rnd: Random = new Random()): DataFrame =
    spark.read.schema(structType).json((0 to size).map  _ => compact(randomJson(rnd, structType)).toDS())
  

  def randomJson(rnd: Random, dataType: DataType): JValue = 

    dataType match 
      case v: DoubleType =>
        json4s.JDouble(rnd.nextDouble())
      case v: StringType =>
        JString(rnd.nextString(10))
      case v: IntegerType =>
        JInt(rnd.nextInt())
      case v: LongType =>
        JInt(rnd.nextLong())
      case v: FloatType =>
        JDouble(rnd.nextFloat())
      case v: BooleanType =>
        JBool(rnd.nextBoolean())
      case v: ArrayType =>
        val size = rnd.nextInt(10)
        JArray(
          (0 to size).map(_ => randomJson(rnd, v.elementType)).toList
        )
      case v: StructType =>
        JObject(
          v.fields.flatMap 
            f =>
              if (f.nullable && rnd.nextBoolean())
                None
              else
                Some(JField(f.name, randomJson(rnd, f.dataType)))
          .toList
        )
    
  

【讨论】:

以上是关于如何根据模式动态生成数据集?的主要内容,如果未能解决你的问题,请参考以下文章

如何从动态生成的 UITextField 中检索数据?

mPDF 中动态生成页面的页码

Java运行时动态生成类几种方式

Cglib根据数据库表数据动态生成对象

如何根据动态SQL代码自动生成DTO

乐道网站建设管理系统如何开启伪静态