Spark 2.0.0 使用 jdbc 从 Redshift 表中截断

Posted

技术标签:

【中文标题】Spark 2.0.0 使用 jdbc 从 Redshift 表中截断【英文标题】:Spark 2.0.0 truncate from Redshift table using jdbc 【发布时间】:2016-12-05 11:09:30 【问题描述】:

您好,我正在使用带有 Redshift 的 Spark SQL(2.0.0),我想截断我的表。我正在使用这个spark-redshift 包,我想知道如何截断我的表格。有人可以分享这个例子吗??

【问题讨论】:

【参考方案1】:

我无法使用 Spark 和您上面列出的 spark-redshift 存储库中的代码来完成此操作。

但是,我能够使用带有 psycopg2 的 AWS Lambda 来截断红移表。然后我使用 boto3 通过 AWS Glue 开始我的 spark 工作。

下面重要的代码是cur.execute("truncate table yourschema.yourtable")

from __future__ import print_function
import sys
import psycopg2
import boto3

def lambda_handler(event, context):
    db_database = "your_redshift_db_name"
    db_user = "your_user_name"
    db_password = "your_password"
    db_port = "5439"
    db_host = "your_redshift.hostname.us-west-2.redshift.amazonaws.com"

    try:
        print("attempting to connect...")
        conn = psycopg2.connect(dbname=db_database, user=db_user, password=db_password, host=db_host, port=db_port)
        print("connected...")
        conn.autocommit = True
        cur = conn.cursor()
        count_sql = "select count(pivotid) from yourschema.yourtable"
        cur.execute(count_sql)
        results = cur.fetchone()

        print("countBefore: ", results[0])
        countOfPivots = results[0]
        if countOfPivots > 0:
            cur.execute("truncate table yourschema.yourtable")
            print("truncated yourschema.yourtable")
            cur.execute(count_sql)
            results = cur.fetchone()
            print("countAfter: ", results[0])

        cur.close()
        conn.close()

        glueClient = boto3.client("glue")
        startTriiggerResponse = glueClient.start_trigger(Name="your-awsglue-ondemand-trigger")
        print("startedTrigger:", startTriiggerResponse.Name)

        return results
    except Exception as e:
        print(e)
        raise e

【讨论】:

【参考方案2】:

在调用 save 之前,您需要将 mode 指定给库。例如:

my_dataframe.write
   .format("com.databricks.spark.redshift")
   .option("url", "jdbc:redshift://my_cluster.qwertyuiop.eu-west-1.redshift.amazonaws.com:5439/my_database?user=my_user&password=my_password")
   .option("dbtable", "my_table")
   .option("tempdir", "s3://my-bucket")
   .option("diststyle", "KEY")
   .option("distkey", "dist_key")
   .option("sortkeyspec", "COMPOUND SORTKEY(key_1, key_2)")
   .option("extracopyoptions", "TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF")
   .mode("overwrite") // "append" / "error"
   .save()

【讨论】:

提供的代码中允许用户截断任何表的内容是什么?是否暗示 my_dataframe 是一个空数据框? .mode("overwrite") 将导致现有数据被删除。 Redshift URL 中指定的用户必须具有必要的权限,否则会出错。 大概 my_dataframe 是建立在“my_table”之上的?如果该假设正确,我认为 .mode("overwrite") 会将 my_table 中的所有数据写回我的表中,并且实际上不会截断任何内容。我错过了什么?

以上是关于Spark 2.0.0 使用 jdbc 从 Redshift 表中截断的主要内容,如果未能解决你的问题,请参考以下文章

Spark 无法从 SBT 找到 JDBC 驱动程序

Spark上的Hive如何从jdbc读取数据?

如何使用 JDBC 从 Oracle 读取数据集?

如何使用Spark执行MySQL(JDBC)连接?

使用 jdbc 从 Spark 2.3.1 Scala 2.11.8 连接到 Vertica

在 Spark 中使用 jdbc 驱动程序连接到 Hive