Spark没有将所有数据保存到红移

Posted

技术标签:

【中文标题】Spark没有将所有数据保存到红移【英文标题】:Spark not saving all data to redshift 【发布时间】:2017-07-10 22:49:59 【问题描述】:

以下代码从 S3 加载数据,使用 SparkSQL 清理和删除重复项,然后使用 JDBC 将数据保存到 Redshift。我也尝试过使用 spark-redshift maven 依赖并得到相同的结果。我正在使用 Spark 2.0。

我无法理解的是,当显示加载到内存中的结果时,总和是预期的数字,但是当 Spark 保存到 Redshift 时,它总是更少。不知何故,并非所有记录都已保存,而且我也没有在 STL_LOAD_ERRORS 中看到任何错误。有人遇到过这种情况或对为什么会发生这种情况有任何想法吗?

        // Load files that were loaded into firehose on this day
    var s3Files = spark.sqlContext.read.schema(schema).json("s3://" + job.getAWSAccessKey + ":" + job.getAWSSecretKey + "@" + job.getBucketName + "/"+ job.getAWSS3RawFileExpression + "/" + year+ "/" + monthCheck+ "/" + dayCheck + "/*/" ).rdd

    // Apply the schema to the RDD, here we will have duplicates
    val usersDataFrame = spark.createDataFrame(s3Files, schema)

    usersDataFrame.createOrReplaceTempView("results")

    // Clean and use partition by the keys to eliminate duplicates and get latest record
    var results = spark.sql(buildCleaningQuery(job,"results"))
    results.createOrReplaceTempView("filteredResults")

    // This returns the correct result!
    var check = spark.sql("select sum(Reward) from filteredResults where period=1706")
    check.show()

    var path = UUID.randomUUID().toString()

    println("s3://" + job.getAWSAccessKey + ":" + job.getAWSSecretKey + "@" + job.getAWSS3TemporaryDirectory +  "/" + path) 

    val prop = new Properties()

    results.write.jdbc(job.getRedshiftJDBC,"work.\"" + path + "\"",prop)

【问题讨论】:

【参考方案1】:

使用 jdbc 意味着 Spark 将尝试重复执行 INSERT INTO 语句 - 这在 Redshift 中非常缓慢。这就是您在 stl_load_errors 中看不到条目的原因。

我建议您改用spark-redshift 库。它经过了很好的测试,并且性能会更好。 https://github.com/databricks/spark-redshift

示例(显示许多选项):

my_dataframe.write
   .format("com.databricks.spark.redshift")
   .option("url", "jdbc:redshift://my_cluster.qwertyuiop.eu-west-1.redshift.amazonaws.com:5439/my_database?user=my_user&password=my_password")
   .option("dbtable", "my_table")
   .option("tempdir", "s3://my-bucket")
   .option("diststyle", "KEY")
   .option("distkey", "dist_key")
   .option("sortkeyspec", "COMPOUND SORTKEY(key_1, key_2)")
   .option("extracopyoptions", "TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF")
   .mode("overwrite") // "append" / "error"
   .save()

【讨论】:

是的,正如我解释的那样,我两者都用过。我使用 JDBC 来消除 spark-redshift 出现问题的可能性。 可能有些文件被跳过了。防止 spark-redshift 库删除暂存数据,并根据 S3 文件列表检查 stl_load_commits

以上是关于Spark没有将所有数据保存到红移的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法通过数据管道以预定义的顺序将文件从 S3 复制到红移

将 Hive 表迁移到红移

定期将数据从 S3 存储桶流式传输到红移

我们可以使用复制命令使用访问密钥和秘密密钥将数据从 S3 加载到红移表中吗(不使用 IAM 角色)

S3 到红移 nifi

将熊猫数据框上传到红移 - 关系“sqlite_master”不存在