有没有办法在通过 Glue 插入 Redshift 时简单地截断列?

Posted

技术标签:

【中文标题】有没有办法在通过 Glue 插入 Redshift 时简单地截断列?【英文标题】:Is there a way to simply truncate columns while inserting into Redshift via Glue? 【发布时间】:2018-08-01 19:01:54 【问题描述】:

我有一列大于 varchar(max) 数据类型,据我了解,这是 AWS Glue 使用的最大数据类型,当我尝试加载我的因为它的桌子。我不想截断该列,因为它并不是那么重要,并且无法弄清楚如何在 Glue 中做到这一点。我知道如果我在 EC2 实例中使用 psql 连接到我的数据库并且实际上可以让我的表以这种方式成功加载,我可以使用 TRUNCATECOLUMNS 作为复制命令上的标签。但是,我的老板坚持要我使用 Glue 来完成这项工作,所以我正在寻找一种使用 Glue 脚本截断列的方法。我浏览了很多文档,但找不到类似的东西。谢谢。

这里有一些工作代码,供其他可能遇到此问题并需要完整参考的人使用。请注意,varchar(65535) 是 Redshift 中一列可以包含的最大字符数:

val truncColUdf = udf((str: String) => if (str.length > 29999) str.substring(0, 29999) else str)

val datasource30 = glueContext.getCatalogSource(database = "database", tableName = "entry", redshiftTmpDir = "", transformationContext = "datasource30").getDynamicFrame()
val revDF30 = datasource30.toDF()
  .withColumn("message", truncColUdf(col("message")))
val truncDynamicFrame30 = DynamicFrame(revDF30, glueContext)
val applymapping30 = truncDynamicFrame30.applyMapping(mappings = Seq(("id", "bigint", "id", "bigint"), ("message", "string", "message", "varchar(65535)"), ("state", "string", "state", "varchar(256)"), ("created_at", "timestamp", "created_at", "timestamp"), ("depth", "int", "depth", "int")), caseSensitive = false, transformationContext = "applymapping30")
val resolvechoice30 = applymapping30.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")), transformationContext = "resolvechoice30")
val dropnullfields30 = resolvechoice30.dropNulls(transformationContext = "dropnullfields30")
val datasink30 = glueContext.getJDBCSink(catalogConnection = "databaseConnection", options = JsonOptions(""""dbtable": "entry", "database": "database""""), redshiftTmpDir = args("TempDir"), transformationContext = "datasink30").writeDynamicFrame(dropnullfields30)

这是正在读取的示例数据行:

01,"<p>Here is the message where the quotations are in case of commas within the message, like so.</p>",active,2017-08-27 23:38:40,1

【问题讨论】:

【参考方案1】:

将 DynamicFrame 转换为 spark 的 DataFrame,然后使用用户定义的函数截断列值(Scala):

import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.sql.functions._

val truncColUdf = udf((str: String) => if (str.length > 20) str.substring(0, 20) else str)
val truncDataFrame = dynamicFrame.toDF()
  .select("text_long")
  .withColumn("text_short", truncColUdf(col("text_long")))
  .withColumn("text_short_length", length(col("text_short")))

truncDataFrame.show(5, false)

val truncDynamicFrame = DynamicFrame(truncDataFrame, glueContext)

...

//write to sink

输出:

+-----------------------+--------------------+-----------------+
|text_long              |text_short          |text_short_length|
+-----------------------+--------------------+-----------------+
|I'd rather not answer  |I'd rather not answe|20               |
|Agree                  |Agree               |5                |
|Custom Answer Favorable|Custom Answer Favora|20               |
|Agree                  |Agree               |5                |
|Sometimes              |Sometimes           |9                |
+-----------------------+--------------------+-----------------+

【讨论】:

当我运行最后一行时出现错误提示 找不到 DynamicFrame。任何想法为什么? 所以我想通了,我的代码现在可以运行了。除了您回答的内容之外,我所需要的只是包括import com.amazonaws.services.glue.DynamicFrame。谢谢! 它有点工作。当我认为它应该将较短的字符串保持在当前长度时,它会将所有内容转换为我指定的长度(即 65535)。 嗯...你能分享你的代码吗?顺便说一句,我用我的输出更新了答案 OK 代码已启动。我还发布了有关时间戳问题的问题,如果您能看到任何错误。【参考方案2】:

您可以在 DynamicFrameWriter 的“extracopyoptions”参数中传递“TRUNCATECOLUMNS”:https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/

【讨论】:

以上是关于有没有办法在通过 Glue 插入 Redshift 时简单地截断列?的主要内容,如果未能解决你的问题,请参考以下文章

通过 AWS Glue 执行 Redshift 过程

如何编写 AWS Glue 脚本以将新数据插入 Redshift 表

时间戳未从 Glue 加载到 Redshift 表中

何时通过 AWS Glue ETL 使用 Amazon Redshift 频谱来查询 Amazon S3 数据

如何在 AWS Glue PySpark 中运行并行线程?

使用 Keys [Glue] 预定义 Redshift 表