如何在scala中创建镶木地板?
Posted
技术标签:
【中文标题】如何在scala中创建镶木地板?【英文标题】:How create parquet table in scala? 【发布时间】:2022-01-19 12:30:00 【问题描述】:我想创建一个包含特定类型字段的 parquet 表:
name_process: 字符串 id_session:整数 time_write:本地日期或时间戳 键:字符串 值:字符串
name_process | id_session | time_write | key | value |
---|---|---|---|---|
OtherClass | jsdfsadfsf | 43434883477 | schema0.table0.csv | Success |
OtherClass | jksdfkjhka | 23212123323 | schema1.table1.csv | Success |
OtherClass | alskdfksjd | 23343212234 | schema2.table2.csv | Failure |
ExternalClass | sdfjkhsdfd | 34455453434 | schema3.table3.csv | Success |
我想正确地写出这样的表格。使用正确的数据类型。然后我要从中读取分区。我正在尝试实现读写。但到目前为止结果很糟糕。
def createHiveTable(implicit spark: SparkSession)
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath ++ "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
def getLastSession(processName: String)(implicit spark: SparkSession): Unit =
val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
.select(
col("name_process").cast(StringType),
col("id_session").cast(StringType),
col("time_write").cast(LongType),
col("key").cast(StringType),
col("value").cast(StringType)
)
val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)
println(lastSession)
println(TimeStamp.getCurrentTime)
来自 spark 的日志:
21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
+- Relation[id_session#78,time_write#79,key#80,value#81] parquet
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
【问题讨论】:
【参考方案1】:问题
当你这样做时
spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
您正在从特定目录读取,这就是缺少 name_process
列的原因。
解决方案:
您可以执行以下操作
spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)
【讨论】:
只需使用spark.read
和选项basePath
指向顶层以上是关于如何在scala中创建镶木地板?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 AvroParquetWriter 从 scala 案例类制作镶木地板文件?
Scala - 当 Row.get(i) 将检索 null 时如何避免 java.lang.IllegalArgumentException