将读取文件的架构存储到 spark scala 中的 csv 文件中
Posted
技术标签:
【中文标题】将读取文件的架构存储到 spark scala 中的 csv 文件中【英文标题】:Store Schema of Read File Into csv file in spark scala 【发布时间】:2020-05-07 09:11:09 【问题描述】:我正在使用以下命令在数据框中启用的 inferschema 选项读取 csv 文件。
df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("s3://Bucket-Name/Fun/Map/file.csv")
df2.printSchema()
Output:
root
|-- CC|Fun|Head|Country|SendType: string (nullable = true)
现在我想将上面的输出只存储到一个 csv 文件中,该文件只有这些列名和这些列的数据类型,如下所示。
column_name,datatype
CC,string
Fun,string
Head,string
Country,string
SendType,string
我尝试使用以下选项将其写入 csv,但这是使用整个数据写入文件。
df2.coalesce(1).write.format("csv").mode("append").save("schema.csv")
问候 鲯鳅
【问题讨论】:
【参考方案1】:df.schema.fields
获取字段及其数据类型。
检查下面的代码。
scala> val schema = df.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype")
schema: org.apache.spark.sql.DataFrame = [column_name: string, datatype: string]
scala> schema.show(false)
+---------------+--------+
|column_name |datatype|
+---------------+--------+
|applicationName|string |
|id |string |
|requestId |string |
|version |long |
+---------------+--------+
scala> schema.write.format("csv").save("/tmp/schema")
【讨论】:
您好,Srini,感谢您的回复,它以单行而不是如您上面列出的那样逐列输入。 .toList 不工作,我想你能帮忙吗?val df2 = spark.read.options(Map("inferSchema"->"true","header"->"true")).csv("file.csv")
df2.printSchema()
val df_schema = df2.schema.fields.map(field => (field.name,field.dataType.typeName)).toList.toDF("column_name","datatype")
df_schema.show(false)
@AlexandrosBiratsis,我同意这一点,我不知道为什么 mahi 将列详细信息存储在文件中。
@AlexandrosBiratsis:基本上他们要求存储我们将要处理的文件的模式,然后用户想要进一步比较
您可以始终使用df.schema
来获取新的或更新的数据框的架构。这与您保存此架构的方式无关。请仔细阅读解决方案,您会更好地了解它们的工作原理。测量执行时间也很好,然后您就会明白,当您执行you_schema.toDF("column_name","datatype").write.save
时,您正在触发一个新的 Spark 作业,而您可以通过 df.schema 获取架构,然后使用简单的文件编写器保存它
【参考方案2】:
尝试如下使用coalesce(1)
和.option("header","true")
与标题一起输出
import java.io.FileWriter
object SparkSchema
def main(args: Array[String]): Unit =
val fw = new FileWriter("src/main/resources/csv.schema", true)
fw.write("column_name,datatype\n")
val spark = Constant.getSparkSess
import spark.implicits._
val df = List(("", "", "", 1l)).toDF("applicationName", "id", "requestId", "version")
val columnList : List[(String, String)] = df.schema.fields.map(field => (field.name, field.dataType.typeName))
.toList
try
val outString = columnList.map(col =>
col._1 + "," + col._2
).mkString("\n")
fw.write(outString)
finally fw.close()
val newColumnList : List[(String, String)] = List(("newColumn","integer"))
val finalColList = columnList ++ newColumnList
writeToS3("s3://bucket/newFileName.csv",finalColList)
def writeToS3(s3FileNameWithpath : String,finalColList : List[(String,String)])
val outString = finalColList.map(col =>
col._1 + "," + col._2
).mkString("\\n")
import org.apache.hadoop.fs._
import org.apache.hadoop.conf.Configuration
val conf = new Configuration()
conf.set("fs.s3a.access.key", "YOUR ACCESS KEY")
conf.set("fs.s3a.secret.key", "YOUR SECRET KEY")
val dest = new Path(s3FileNameWithpath)
val fs = dest.getFileSystem(conf)
val out = fs.create(dest, true)
out.write( outString.getBytes )
out.close()
【讨论】:
嗨 QuickSilver,这里我们首先使用推断模式读取文件,然后从该模式中提取输出。根据文件,我们读取每个文件的列名可以不同 输出独立于模式无关紧要会改变从文件@Mahi读取的ans ti 您好 QuickSilver 感谢您的输入,上面的 srini 建议了类似的步骤,但是当我尝试运行它时,正如您在上面的 cmets 中看到的那样,它不是按列行创建列,而只是创建一个单行其中的所有列名和数据类型为字符串一个值 @QuickSilver 使用 Spark 存储 5 行文件是一种开销。您应该只使用 Scala I/O API 来导出列列表并保存到文件中 @AlexandrosBiratsis 同意【参考方案3】:@QuickSilver 和 @Srinivas 解决方案的替代方案是使用模式的 DDL 表示,它们都应该工作。使用df.schema.toDDL
,您将获得:
CC STRING, fun STRING, Head STRING, Country STRING, SendType STRING
这是模式的字符串表示,然后您可以拆分和替换,如下所示:
import java.io.PrintWriter
val schema = df.schema.toDDL.split(",")
// Array[String] = Array(`CC` STRING, `fun` STRING, `Head` STRING, `Country` STRING, `SendType` STRING)
val writer = new PrintWriter("/tmp/schema.csv")
writer.write("column_name,datatype\n")
schema.foreach r => writer.write(r.replace(" ", ",") + "\n")
writer.close()
要写入 S3,您可以使用 Hadoop API 作为已经实现的 QuickSilver 或第三方库,例如 MINIO:
import io.minio.MinioClient
val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")
minioClient.putObject("YOUR_BUCKET","schema.csv", "/tmp/schema.csv", null)
甚至better 生成一个字符串,将其存储到缓冲区中,然后通过 InputStream 将其发送到 S3:
import java.io.ByteArrayInputStream
import io.minio.MinioClient
val minioClient = new MinioClient("https://play.min.io", "ACCESS_KEY", "SECRET_KEY")
val schema = df.schema.toDDL.split(",")
val schemaBuffer = new StringBuilder
schemaBuffer ++= "column_name,datatype\n"
schema.foreach r => schemaBuffer ++= r.replace(" ", ",") + "\n"
val inputStream = new ByteArrayInputStream(schemaBuffer.toString.getBytes("UTF-8"))
minioClient.putObject("YOUR_BUCKET", "schema.csv", inputStream, new PutObjectOptions(inputStream.available(), -1))
inputStream.close
【讨论】:
【参考方案4】:@PySpark
df_schema = spark.createDataFrame([(i.name, str(i.dataType)) for i in df.schema.fields], ['column_name', 'datatype'])
df_schema.show()
这将为现有数据框的架构创建新的数据框用例:
当你想用数据框的 Schema 创建表并且你不能使用下面的代码时很有用,因为 pySpark 用户可能没有被授权在数据库上执行 DDL 命令。
df.createOrReplaceTempView("tmp_output_table")
spark.sql("""drop table if exists schema.output_table""")
spark.sql("""create table schema.output_table as select * from tmp_output_table""")
【讨论】:
【参考方案5】:在 Pyspark - 您可以使用 df.dtypes 找到 PySpark DataFrame 的所有列名和数据类型 (DataType)。点击此链接了解更多详情pyspark.sql.DataFrame.dtypes
话虽如此,请尝试使用以下代码-
data = df.dtypes
cols = ["col_name", "datatype"]
df = spark.createDataFrame(data=data,schema=cols)
df.show()
【讨论】:
以上是关于将读取文件的架构存储到 spark scala 中的 csv 文件中的主要内容,如果未能解决你的问题,请参考以下文章
Spark-Scala 无法推断架构(将输入路径验证推迟到数据源中)
无法从 synapse spark scala notebook 读取 csv 文件