如何在 scala 中创建未初始化的 Dataframe 变量。因此可以在 if else 条件下初始化相同的变量

Posted

技术标签:

【中文标题】如何在 scala 中创建未初始化的 Dataframe 变量。因此可以在 if else 条件下初始化相同的变量【英文标题】:How to create an uninitialised Dataframe variable in scala. So that same variable can be initialized in if else condition 【发布时间】:2021-10-14 21:25:46 【问题描述】:

我需要创建一个未初始化的Dataframe 变量。因此,初始化其中的值后,我可以将其添加到Seq

var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame

queries.foreach(q=>
    var view_name = q._1
    var sourceType = q._2
    var query = q._3
    var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame

    if(sourceType == "sqlserver")
        df = jdbcConn.option("query", query).load()
    else if(sourceType == "mongodb")
        var connectionString = connectionInt.setCollection(view_name);
        df = spark.read.format("com.mongodb.spark.sql.DefaultSource").
                                option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
                                option("uri", connectionString).    
                                load();
    

    df.createOrReplaceTempView(view_name)
    var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df)
    dataframes = dataframes :+ tup
);

我收到以下错误

input.scala:102: 错误:类型不匹配; 找到:Seq[(String, org.apache.spark.sql.DataFrame)] (扩展为)Seq[(String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])] 必需:(字符串,org.apache.spark.sql.DataFrame) (扩展为)(字符串,org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df) ^

【问题讨论】:

【参考方案1】:

使用“map”而不是“foreach”看起来更好:

  val dataframes = queries.map( case (view_name, sourceType, query) => 
  val df: org.apache.spark.sql.DataFrame =
    if (sourceType == "sqlserver") 
      jdbcConn.option("query", query).load()
     else if (sourceType == "mongodb") 
      var connectionString = connectionInt.setCollection(view_name);
      spark.read.format("com.mongodb.spark.sql.DefaultSource").
        option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
        option("uri", connectionString).
        load();
    
    else 
      spark.emptyDataFrame
    

  df.createOrReplaceTempView(view_name)
  (view_name, df)

);

【讨论】:

学习了使用if else 条件的新方法,谢谢

以上是关于如何在 scala 中创建未初始化的 Dataframe 变量。因此可以在 if else 条件下初始化相同的变量的主要内容,如果未能解决你的问题,请参考以下文章

如何像docker一样在linux中创建未命名的网络名称空间?

为啥 SELECT 在 SQL Server 中创建未提交的事务?

如何在 Scala 中创建异构数组?

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

如何在scala中创建镶木地板?

我应该如何在 Scala 中创建 Int 的子类型?