如何在scala中创建镶木地板?

Posted

技术标签:

【中文标题】如何在scala中创建镶木地板?【英文标题】:How create parquet table in scala? 【发布时间】:2022-01-19 12:30:00 【问题描述】:

我想创建一个包含特定类型字段的 parquet 表:

name_process: 字符串 id_session:整数 time_write:本地日期或时间戳 键:字符串 值:字符串

name_process id_session time_write key value
OtherClass jsdfsadfsf 43434883477 schema0.table0.csv Success
OtherClass jksdfkjhka 23212123323 schema1.table1.csv Success
OtherClass alskdfksjd 23343212234 schema2.table2.csv Failure
ExternalClass sdfjkhsdfd 34455453434 schema3.table3.csv Success

我想正确地写出这样的表格。使用正确的数据类型。然后我要从中读取分区。我正在尝试实现读写。但到目前为止结果很糟糕。

def createHiveTable(implicit spark: SparkSession) 

  val schema = "test_schema"
  val table = "test_table"
  val partitionName = "name_process"
  val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil

  spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
  //val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"

  val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"

  val df = spark.read.option("delimiter", ",")
    .option("header", true)
    .csv(path)

  df.show()

  df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")



def getLastSession(processName: String)(implicit spark: SparkSession): Unit = 

  val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
                     .select(
                              col("name_process").cast(StringType),
                              col("id_session").cast(StringType),
                              col("time_write").cast(LongType),
                              col("key").cast(StringType),
                              col("value").cast(StringType)
                     )

  val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
  val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)

  println(lastSession)
  println(TimeStamp.getCurrentTime)

来自 spark 的日志:

21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
 
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
+- Relation[id_session#78,time_write#79,key#80,value#81] parquet
 
 
                at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:342)
                at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
                at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
                at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)

【问题讨论】:

【参考方案1】:

问题

当你这样做时

spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")

您正在从特定目录读取,这就是缺少 name_process 列的原因。

解决方案:

您可以执行以下操作

spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)

【讨论】:

只需使用spark.read 和选项basePath 指向顶层

以上是关于如何在scala中创建镶木地板?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 AvroParquetWriter 从 scala 案例类制作镶木地板文件?

Scala - 当 Row.get(i) 将检索 null 时如何避免 java.lang.IllegalArgumentException

如何在 Scala 中创建异构数组?

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

我应该如何在 Scala 中创建 Int 的子类型?

将rdd保存到镶木地板文件scala