如何从代码外部提供spark / scala中的模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何从代码外部提供spark / scala中的模式相关的知识,希望对你有一定的参考价值。

我想读取ex:schema_file的文件,它将包含模式,并希望在代码中使用它来创建DataFrame

我已阅读有关ConfigFactory提供架构但无法使用它,因为将来可能会更改架构。

schema[
  {
     columnName = EXAMPLE_1
     type = string
  },
  {
     columnName = EXAMPLE_2
     type = string
  },
  {
     columnName = EXAMPLE_3
     type = string
  }
]

如果我使用这个,那么我必须读取每个columnName

    config.getString("schema.ColumnName1")

但列不固定,可以更改列数。

此外,我尝试使用案例类,但在那,我需要指定每个字段。

任何人都可以告诉我如何从代码外部读取模式。

答案

您可以尝试使用此库来加载配置并将其映射到scala类:https://github.com/pureconfig/pureconfig

我希望这样的事能对你有用:

import scala.io.Source
import scala.util.parsing.combinator.syntactical.StandardTokenParsers

object Application extends App {
  override def main(args: Array[String]): Unit = {
    val fileContents = Source.fromFile("src/main/resources/schema_file").getLines.mkString
    print(ConfigDSL.parseSchema(fileContents))
  }
}

case class Schema(columns: List[Column])
case class Column(columnName: String, columnType: String)

object ConfigDSL extends StandardTokenParsers {
  lexical.delimiters ++= List("[", "]", "{", "}", ",", " ", "=", "
")
  lexical.reserved ++= List("schema", "type", "columnName")

  def parseSchema(schemaString: String): Schema = 
    schema(new lexical.Scanner(schemaString)) match {
      case Success(columns, _) => Schema(columns)
      case Failure(msg, _) => throw new RuntimeException(msg)
      case Error(msg, _) => throw new RuntimeException(msg)
    }

  def schema: Parser[List[Column]] =
    "schema" ~ "[" ~ listOfColumns ~ "]" ^^ { case _ ~ _ ~ recipeList ~ _ => recipeList }

  def columnDefinition: Parser[Column] =
    "{" ~ "columnName" ~ "=" ~ ident ~ "type" ~ "=" ~ ident ~ "}" ^^ {
      case _ ~ _ ~ _ ~ column ~ _ ~ _ ~ columnType ~ _ => Column(column, columnType)
    }

  def listOfColumns: Parser[List[Column]] =
    repsep(columnDefinition, ",")  ^^ { stepList: List[Column] => stepList}
}

以上是关于如何从代码外部提供spark / scala中的模式的主要内容,如果未能解决你的问题,请参考以下文章

Spark scala.collection.immutable.$colon$colon 不是字符串模式的有效外部类型

如何使用 Scala 从 Spark 中的列表或数组创建行

如何在 Spark 的 github 中查看 Functions.Scala 中的代码

如何从 SPARK SCALA 中的 XML 模式中获取列名?

Spark Scala创建外部配置单元表不使用位置作为变量

如何从 Scala 中的 DataFrame 在 Spark 中创建分布式稀疏矩阵