将读取文件的架构存储到 spark scala 中的 csv 文件中

Posted

技术标签:

【中文标题】将读取文件的架构存储到 spark scala 中的 csv 文件中【英文标题】:Store Schema of Read File Into csv file in spark scala 【发布时间】:2020-05-07 09:11:09 【问题描述】:

我正在使用以下命令在数据框中启用的 inferschema 选项读取 csv 文件。

df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("s3://Bucket-Name/Fun/Map/file.csv")
df2.printSchema()

Output:

root
 |-- CC|Fun|Head|Country|SendType: string (nullable = true)

现在我想将上面的输出只存储到一个 csv 文件中,该文件只有这些列名和这些列的数据类型,如下所示。

column_name,datatype
CC,string
Fun,string
Head,string
Country,string
SendType,string

我尝试使用以下选项将其写入 csv,但这是使用整个数据写入文件。

df2.coalesce(1).write.format("csv").mode("append").save("schema.csv")

问候 鲯鳅

【问题讨论】:

【参考方案1】:

df.schema.fields 获取字段及其数据类型。

检查下面的代码。

scala> val schema = df.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype")
schema: org.apache.spark.sql.DataFrame = [column_name: string, datatype: string]

scala> schema.show(false)
+---------------+--------+
|column_name    |datatype|
+---------------+--------+
|applicationName|string  |
|id             |string  |
|requestId      |string  |
|version        |long    |
+---------------+--------+


scala> schema.write.format("csv").save("/tmp/schema")

【讨论】:

您好,Srini,感谢您的回复,它以单行而不是如您上面列出的那样逐列输入。 .toList 不工作,我想你能帮忙吗? val df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("file.csv") df2.printSchema() val df_schema = df2.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype") df_schema.show(false) @AlexandrosBiratsis,我同意这一点,我不知道为什么 mahi 将列详细信息存储在文件中。 @AlexandrosBiratsis:基本上他们要求存储我们将要处理的文件的模式,然后用户想要进一步比较 您可以始终使用df.schema 来获取新的或更新的数据框的架构。这与您保存此架构的方式无关。请仔细阅读解决方案,您会更好地了解它们的工作原理。测量执行时间也很好,然后您就会明白,当您执行you_schema.toDF("column_name","datatype").write.save 时,您正在触发一个新的 Spark 作业,而您可以通过 df.schema 获取架构,然后使用简单的文件编写器保存它 【参考方案2】:

尝试如下使用coalesce(1).option("header","true") 与标题一起输出

import java.io.FileWriter

object SparkSchema 

  def main(args: Array[String]): Unit = 

    val fw = new FileWriter("src/main/resources/csv.schema", true)
    fw.write("column_name,datatype\n")

    val spark = Constant.getSparkSess

    import spark.implicits._

    val df = List(("", "", "", 1l)).toDF("applicationName", "id", "requestId", "version")
    val columnList : List[(String, String)] = df.schema.fields.map(field => (field.name, field.dataType.typeName))
      .toList
    try 
      val outString = columnList.map(col => 
        col._1 + "," + col._2
      ).mkString("\n")
      fw.write(outString)
    
    finally fw.close()

    val newColumnList : List[(String, String)] = List(("newColumn","integer"))

    val finalColList = columnList ++ newColumnList
    writeToS3("s3://bucket/newFileName.csv",finalColList)

  

  def writeToS3(s3FileNameWithpath : String,finalColList : List[(String,String)]) 

    val outString =  finalColList.map(col => 
      col._1 + "," + col._2
    ).mkString("\\n")

    import org.apache.hadoop.fs._
    import org.apache.hadoop.conf.Configuration
    val conf = new Configuration()
    conf.set("fs.s3a.access.key", "YOUR ACCESS KEY")
    conf.set("fs.s3a.secret.key", "YOUR SECRET KEY")

    val dest = new Path(s3FileNameWithpath)
    val fs = dest.getFileSystem(conf)
    val out = fs.create(dest, true)
    out.write( outString.getBytes )
    out.close()
  




【讨论】:

嗨 QuickSilver,这里我们首先使用推断模式读取文件,然后从该模式中提取输出。根据文件,我们读取每个文件的列名可以不同 输出独立于模式无关紧要会改变从文件@Mahi读取的ans ti 您好 QuickSilver 感谢您的输入,上面的 srini 建议了类似的步骤,但是当我尝试运行它时,正如您在上面的 cmets 中看到的那样,它不是按列行创建列,而只是创建一个单行其中的所有列名和数据类型为字符串一个值 @QuickSilver 使用 Spark 存储 5 行文件是一种开销。您应该只使用 Scala I/O API 来导出列列表并保存到文件中 @AlexandrosBiratsis 同意【参考方案3】:

@QuickSilver 和 @Srinivas 解决方案的替代方案是使用模式的 DDL 表示,它们都应该工作。使用df.schema.toDDL,您将获得:

CC STRING, fun STRING, Head STRING, Country STRING, SendType STRING

这是模式的字符串表示,然后您可以拆分和替换,如下所示:

import java.io.PrintWriter

val schema = df.schema.toDDL.split(",")
// Array[String] = Array(`CC` STRING, `fun` STRING, `Head` STRING, `Country` STRING, `SendType` STRING)

val writer = new PrintWriter("/tmp/schema.csv")

writer.write("column_name,datatype\n")
schema.foreach r => writer.write(r.replace(" ", ",") + "\n") 
writer.close()

要写入 S3,您可以使用 Hadoop API 作为已经实现的 QuickSilver 或第三方库,例如 MINIO:

import io.minio.MinioClient

val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")

minioClient.putObject("YOUR_BUCKET","schema.csv", "/tmp/schema.csv", null)

甚至better 生成一个字符串,将其存储到缓冲区中,然后通过 InputStream 将其发送到 S3:

import java.io.ByteArrayInputStream
import io.minio.MinioClient

val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")

val schema = df.schema.toDDL.split(",")
val schemaBuffer = new StringBuilder

schemaBuffer ++= "column_name,datatype\n"
schema.foreach r => schemaBuffer ++= r.replace(" ", ",") + "\n" 

val inputStream = new ByteArrayInputStream(schemaBuffer.toString.getBytes("UTF-8"))

minioClient.putObject("YOUR_BUCKET", "schema.csv", inputStream, new PutObjectOptions(inputStream.available(), -1))

inputStream.close

【讨论】:

【参考方案4】:

@PySpark

df_schema = spark.createDataFrame([(i.name, str(i.dataType)) for i in df.schema.fields], ['column_name', 'datatype'])
df_schema.show()

这将为现有数据框的架构创建新的数据框用例

当你想用数据框的 Schema 创建表并且你不能使用下面的代码时很有用,因为 pySpark 用户可能没有被授权在数据库上执行 DDL 命令。

df.createOrReplaceTempView("tmp_output_table")
spark.sql("""drop table if exists schema.output_table""")   
spark.sql("""create table schema.output_table as select * from tmp_output_table""")

【讨论】:

【参考方案5】:

在 Pyspark - 您可以使用 df.dtypes 找到 PySpark DataFrame 的所有列名和数据类型 (DataType)。点击此链接了解更多详情pyspark.sql.DataFrame.dtypes

话虽如此,请尝试使用以下代码-

data = df.dtypes
cols = ["col_name", "datatype"]

df = spark.createDataFrame(data=data,schema=cols)

df.show()

【讨论】:

以上是关于将读取文件的架构存储到 spark scala 中的 csv 文件中的主要内容,如果未能解决你的问题,请参考以下文章

Spark-Scala 无法推断架构(将输入路径验证推迟到数据源中)

无法从 synapse spark scala notebook 读取 csv 文件

关于如何在Scala编程中将文件值存储到变量中

在 Spark/Scala 中写入 HDFS,读取 zip 文件

如何从代码外部提供spark / scala中的模式

idea中spark项目Scala语言读取properties文件