Spark 2.0.0 使用 jdbc 从 Redshift 表中截断
Posted
技术标签:
【中文标题】Spark 2.0.0 使用 jdbc 从 Redshift 表中截断【英文标题】:Spark 2.0.0 truncate from Redshift table using jdbc 【发布时间】:2016-12-05 11:09:30 【问题描述】:您好,我正在使用带有 Redshift 的 Spark SQL(2.0.0),我想截断我的表。我正在使用这个spark-redshift 包,我想知道如何截断我的表格。有人可以分享这个例子吗??
【问题讨论】:
【参考方案1】:我无法使用 Spark 和您上面列出的 spark-redshift 存储库中的代码来完成此操作。
但是,我能够使用带有 psycopg2 的 AWS Lambda 来截断红移表。然后我使用 boto3 通过 AWS Glue 开始我的 spark 工作。
下面重要的代码是cur.execute("truncate table yourschema.yourtable")
from __future__ import print_function
import sys
import psycopg2
import boto3
def lambda_handler(event, context):
db_database = "your_redshift_db_name"
db_user = "your_user_name"
db_password = "your_password"
db_port = "5439"
db_host = "your_redshift.hostname.us-west-2.redshift.amazonaws.com"
try:
print("attempting to connect...")
conn = psycopg2.connect(dbname=db_database, user=db_user, password=db_password, host=db_host, port=db_port)
print("connected...")
conn.autocommit = True
cur = conn.cursor()
count_sql = "select count(pivotid) from yourschema.yourtable"
cur.execute(count_sql)
results = cur.fetchone()
print("countBefore: ", results[0])
countOfPivots = results[0]
if countOfPivots > 0:
cur.execute("truncate table yourschema.yourtable")
print("truncated yourschema.yourtable")
cur.execute(count_sql)
results = cur.fetchone()
print("countAfter: ", results[0])
cur.close()
conn.close()
glueClient = boto3.client("glue")
startTriiggerResponse = glueClient.start_trigger(Name="your-awsglue-ondemand-trigger")
print("startedTrigger:", startTriiggerResponse.Name)
return results
except Exception as e:
print(e)
raise e
【讨论】:
【参考方案2】:在调用 save 之前,您需要将 mode
指定给库。例如:
my_dataframe.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://my_cluster.qwertyuiop.eu-west-1.redshift.amazonaws.com:5439/my_database?user=my_user&password=my_password")
.option("dbtable", "my_table")
.option("tempdir", "s3://my-bucket")
.option("diststyle", "KEY")
.option("distkey", "dist_key")
.option("sortkeyspec", "COMPOUND SORTKEY(key_1, key_2)")
.option("extracopyoptions", "TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF")
.mode("overwrite") // "append" / "error"
.save()
【讨论】:
提供的代码中允许用户截断任何表的内容是什么?是否暗示 my_dataframe 是一个空数据框?.mode("overwrite")
将导致现有数据被删除。 Redshift URL 中指定的用户必须具有必要的权限,否则会出错。
大概 my_dataframe 是建立在“my_table”之上的?如果该假设正确,我认为 .mode("overwrite") 会将 my_table 中的所有数据写回我的表中,并且实际上不会截断任何内容。我错过了什么?以上是关于Spark 2.0.0 使用 jdbc 从 Redshift 表中截断的主要内容,如果未能解决你的问题,请参考以下文章