如何根据模式动态生成数据集?
Posted
技术标签:
【中文标题】如何根据模式动态生成数据集?【英文标题】:How to generate datasets dynamically based on schema? 【发布时间】:2018-11-30 07:21:46 【问题描述】:我有多个模式,如下所示,具有不同的列名和数据类型。
我想使用 DataFrame
和 Scala 为每个模式生成测试/模拟数据并将其保存到镶木地板文件中。
下面是示例模式(来自示例 json),用于动态生成数据,其中包含虚拟值。
val schema1 = StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true)
StructField("p", LongType, true),
StructField("pp", StringType, true)
)
)
我需要这样的 rdd/dataframe,每行 1000 行,基于上述架构中的列数。
val data = Seq(
Row(1d, "happy", 1L, "Iam"),
Row(2d, "sad", 2L, "Iam"),
Row(3d, "glad", 3L, "Iam")
)
基本上.. 像这 200 个数据集,我需要为其动态生成数据,为每个方案编写单独的程序对我来说简直是不可能的。
请。帮我想想你的想法或实现。因为我是新手。
是否可以根据不同类型的模式生成动态数据?
【问题讨论】:
见我的linked in article i.e. Test data generation using Spark by using simple Json data descriptor with Columns and DataTypes to load in dwh like Hive. 这是除了下面的答案之外的另一种方式...... 【参考方案1】:使用@JacekLaskowski 的建议,您可以根据您期望的字段/类型使用带有ScalaCheck
(Gen
) 的生成器生成动态数据。
它可能看起来像这样:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row, SaveMode
import org.scalacheck._
import scala.collection.JavaConverters._
val dynamicValues: Map[(String, DataType), Gen[Any]] = Map(
("a", DoubleType) -> Gen.choose(0.0, 100.0),
("aa", StringType) -> Gen.oneOf("happy", "sad", "glad"),
("p", LongType) -> Gen.choose(0L, 10L),
("pp", StringType) -> Gen.oneOf("Iam", "You're")
)
val schemas = Map(
"schema1" -> StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true),
StructField("p", LongType, true),
StructField("pp", StringType, true)
)),
"schema2" -> StructType(
List(
StructField("a", DoubleType, true),
StructField("pp", StringType, true),
StructField("p", LongType, true)
)
)
)
val numRecords = 1000
schemas.foreach
case (name, schema) =>
// create a data frame
spark.createDataFrame(
// of #numRecords records
(0 until numRecords).map _ =>
// each of them a row
Row.fromSeq(schema.fields.map(field =>
// with fields based on the schema's fieldname & type else null
dynamicValues.get((field.name, field.dataType)).flatMap(_.sample).orNull
))
.asJava, schema)
// store to parquet
.write.mode(SaveMode.Overwrite).parquet(name)
【讨论】:
【参考方案2】:ScalaCheck 是一个生成数据的框架,您可以使用自定义生成器根据架构生成原始数据。
访问ScalaCheck Documentation。
【讨论】:
【参考方案3】:你可以这样做
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame, SparkSession
import org.json4s
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.util.Random
object Test extends App
val structType: StructType = StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true),
StructField("p", LongType, true),
StructField("pp", StringType, true)
)
)
val spark = SparkSession
.builder()
.master("local[*]")
.config(new SparkConf())
.getOrCreate()
import spark.implicits._
val df = createRandomDF(structType, 1000)
def createRandomDF(structType: StructType, size: Int, rnd: Random = new Random()): DataFrame =
spark.read.schema(structType).json((0 to size).map _ => compact(randomJson(rnd, structType)).toDS())
def randomJson(rnd: Random, dataType: DataType): JValue =
dataType match
case v: DoubleType =>
json4s.JDouble(rnd.nextDouble())
case v: StringType =>
JString(rnd.nextString(10))
case v: IntegerType =>
JInt(rnd.nextInt())
case v: LongType =>
JInt(rnd.nextLong())
case v: FloatType =>
JDouble(rnd.nextFloat())
case v: BooleanType =>
JBool(rnd.nextBoolean())
case v: ArrayType =>
val size = rnd.nextInt(10)
JArray(
(0 to size).map(_ => randomJson(rnd, v.elementType)).toList
)
case v: StructType =>
JObject(
v.fields.flatMap
f =>
if (f.nullable && rnd.nextBoolean())
None
else
Some(JField(f.name, randomJson(rnd, f.dataType)))
.toList
)
【讨论】:
以上是关于如何根据模式动态生成数据集?的主要内容,如果未能解决你的问题,请参考以下文章