AWS Glue to Redshift:是不是可以替换、更新或删除数据?

Posted

技术标签:

【中文标题】AWS Glue to Redshift:是不是可以替换、更新或删除数据?【英文标题】:AWS Glue to Redshift: Is it possible to replace, update or delete data?AWS Glue to Redshift:是否可以替换、更新或删除数据? 【发布时间】:2017-09-14 21:08:51 【问题描述】:

以下是关于我如何设置的一些要点:

我已将 CSV 文件上传到 S3,并设置了 Glue 爬虫来创建表和架构。 我有一个 Glue 作业设置,它使用 JDBC 连接将 Glue 表中的数据写入我们的 Amazon Redshift 数据库。 Job 还负责映射列和创建红移表。

通过重新运行作业,我在 redshift 中得到重复的行(如预期的那样)。但是,有没有办法在插入新数据之前使用键或胶水中设置的分区来替换或删除行?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = "dbtable": "table01", "database": "db01", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()

【问题讨论】:

好问题,现在遇到同样的问题。到目前为止,你有什么进展吗? 我与 AWS Glue 支持部门取得了联系,并且能够得到解决。胶水似乎没有办法做到这一点,或者从来没有用于这种类型的工作。我能够得到一个可行的解决方案的方法是让胶水将所有行插入一个临时表,然后在胶水之外执行一个 upsert/merge。 【参考方案1】:

工作书签是关键。只需编辑作业并启用“作业书签”,它就不会处理已处理的数据。 请注意,该作业必须重新运行一次才能检测到它不必再次重新处理旧数据。

有关详细信息,请参阅: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

在我看来,“书签”这个名字有点牵强。如果不是在搜索过程中偶然发现它,我永远不会看它。

【讨论】:

我不确定你为什么被否决。作业书签相当于 spark 中的检查点,这听起来像是问题所在。 我也不知道。我能想到的唯一原因是重新运行相同的作业(例如通过清除书签)可能会导致 Redshift 中出现重复记录,因为该批处理再次被处理。 你真的让它工作了吗?我知道它应该按照你说的做,但我无法让它工作。我有一个目录表作为输入(由爬虫在 S3 中的 Parquet 数据集上创建)、一个简单的映射步骤和 Redshift 作为数据接收器。作业书签默认启用,所有作业运行也启用它。每次运行时仍会复制所有数据。 是的,它对我有用。我有一个每天爬行的爬虫。还有一个带有选项 --job-bookmark-option: job-bookmark-enable 的触发器(几个小时后爬虫完成)。我们没有使用 Parquet,不确定这是否会有所不同。总而言之,我使用 Glue 的经验并不是那么好:作业太大时会失败,我无法让自定义 Python 脚本正常工作。我们正在寻找替代品。 确实如此。刚刚使用 JSON 作为输入进行了测试,它可以工作。我已经向 AWS 报告了 Parquet 的错误。【参考方案2】:

这是我从 AWS Glue 支持获得的解决方案:

您可能知道,尽管您可以创建主键,但 Redshift 并不强制执行唯一性。因此,如果您正在重新运行 Glue 作业,则可能会插入重复的行。保持唯一性的一些方法是:

    使用临时表插入所有行,然后在主表中执行 upsert/merge [1],这必须在粘合之外完成。

    在您的 redshift 表 [1] 中添加另一列,例如插入时间戳,以允许重复但要知道哪一列先出现或最后一列,然后在需要时删除重复项。

    将之前插入的数据加载到dataframe中,然后比较要插入的数据,避免插入重复[3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html 和 http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

【讨论】:

您检查了工作书签吗?如果您的来源是 S3,那可能就足够了。如果它不适合你,我想知道你遇到了什么问题,所以我不会犯同样的错误? 我刚刚尝试使用 Glue,我的数据源和数据目标都在 Amazon Redshift 中。启用书签对我没有帮助。数据有重复。 是的,书签也不是我们的答案。从那以后,我们就不再使用 AWS 胶水了,但不幸的是,就我所知。【参考方案3】:

请查看this答案。有解释和代码示例如何使用临时表将数据插入 Redshift。同样的方法可用于在 Glue 使用 preactionspostactions 选项写入数据之前或之后运行任何 SQL 查询:

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "preactions" -> "<another SQL queries>",
    "postactions" -> "<some SQL queries>"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)

【讨论】:

【参考方案4】:

今天我已经测试并获得了一种解决方法,可以使用 JDBC 连接从目标表中更新/删除。

我用过如下

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pg8000
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'PW',
    'HOST',
    'USER',
    'DB'
])
# ...
# Create Spark & Glue context

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ...
config_port = ****
conn = pg8000.connect(
    database=args['DB'], 
    user=args['USER'], 
    password=args['PW'],
    host=args['HOST'],
    port=config_port
)
query = "UPDATE table .....;"

cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()



query1 = "DELETE  AAA FROM  AAA A, BBB B WHERE  A.id = B.id"

cur1 = conn.cursor()
cur1.execute(query1)
conn.commit()
cur1.close()
conn.close()

【讨论】:

我会测试一下,你试过 psycopg2 而不是 pg8000 吗? 是的。 Psycopg2 尚不支持用 C 语言编写。目前不支持 pandas 等库,也不支持用其他语言编写的扩展.. 要让 pg8000 在 Glue 中工作,您是否必须包含任何外部库?【参考方案5】:

Glue 中的作业书签选项应该可以解决问题,正如上面所建议的那样。当我的源是 S3 时,我一直在成功使用它。 http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

【讨论】:

虽然我认为你是对的,但你只是在重复我的回答:)【参考方案6】:

根据我的测试(使用相同的场景),BOOKMARK 功能不起作用。多次运行作业时会插入重复数据。通过每天(通过 lambda)从 S3 位置删除文件并实施暂存和目标表,我已经解决了这个问题。数据将根据匹配的键列进行插入/更新。

【讨论】:

以上是关于AWS Glue to Redshift:是不是可以替换、更新或删除数据?的主要内容,如果未能解决你的问题,请参考以下文章

通过 AWS Glue 执行 Redshift 过程

AWS Glue ETL 到 Redshift:日期

将 Parquet 文件从 AWS Glue 加载到 Redshift

AWS Glue 作业将 Null 写入 Redshift

从 AWS Redshift 到 S3 的 AWS Glue ETL 作业失败

AWS Glue - Redshift 中具有 Json 结构的字段