如何从代码外部提供spark / scala中的模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何从代码外部提供spark / scala中的模式相关的知识,希望对你有一定的参考价值。
我想读取ex:schema_file的文件,它将包含模式,并希望在代码中使用它来创建DataFrame
我已阅读有关ConfigFactory
提供架构但无法使用它,因为将来可能会更改架构。
schema[
{
columnName = EXAMPLE_1
type = string
},
{
columnName = EXAMPLE_2
type = string
},
{
columnName = EXAMPLE_3
type = string
}
]
如果我使用这个,那么我必须读取每个columnName
config.getString("schema.ColumnName1")
但列不固定,可以更改列数。
此外,我尝试使用案例类,但在那,我需要指定每个字段。
任何人都可以告诉我如何从代码外部读取模式。
答案
您可以尝试使用此库来加载配置并将其映射到scala类:https://github.com/pureconfig/pureconfig
我希望这样的事能对你有用:
import scala.io.Source
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
object Application extends App {
override def main(args: Array[String]): Unit = {
val fileContents = Source.fromFile("src/main/resources/schema_file").getLines.mkString
print(ConfigDSL.parseSchema(fileContents))
}
}
case class Schema(columns: List[Column])
case class Column(columnName: String, columnType: String)
object ConfigDSL extends StandardTokenParsers {
lexical.delimiters ++= List("[", "]", "{", "}", ",", " ", "=", "
")
lexical.reserved ++= List("schema", "type", "columnName")
def parseSchema(schemaString: String): Schema =
schema(new lexical.Scanner(schemaString)) match {
case Success(columns, _) => Schema(columns)
case Failure(msg, _) => throw new RuntimeException(msg)
case Error(msg, _) => throw new RuntimeException(msg)
}
def schema: Parser[List[Column]] =
"schema" ~ "[" ~ listOfColumns ~ "]" ^^ { case _ ~ _ ~ recipeList ~ _ => recipeList }
def columnDefinition: Parser[Column] =
"{" ~ "columnName" ~ "=" ~ ident ~ "type" ~ "=" ~ ident ~ "}" ^^ {
case _ ~ _ ~ _ ~ column ~ _ ~ _ ~ columnType ~ _ => Column(column, columnType)
}
def listOfColumns: Parser[List[Column]] =
repsep(columnDefinition, ",") ^^ { stepList: List[Column] => stepList}
}
以上是关于如何从代码外部提供spark / scala中的模式的主要内容,如果未能解决你的问题,请参考以下文章
Spark scala.collection.immutable.$colon$colon 不是字符串模式的有效外部类型
如何在 Spark 的 github 中查看 Functions.Scala 中的代码