字符串长度超过 Glue 中的 DDL 长度(python,pyspark)

Posted

技术标签:

【中文标题】字符串长度超过 Glue 中的 DDL 长度(python,pyspark)【英文标题】:String length exceeds DDL Length in Glue (python,pyspark) 【发布时间】:2018-07-06 07:37:21 【问题描述】:

我有一个 4MB 大小的 S3 json 文件。我使用 AWS 胶水抓取了数据,并生成了与之对应的数据目录表。我创建了一个作业(使用 AWS Glue 中的 ETL 控制台将数据上传到 mazon Redshift。

数据格式相同但文件大小不同,数据正确加载到数据库中。但是当它达到 4MB 大小时,就会显示错误

“调用 0146.pyWriteDynamicFrame 时出错。将数据加载到 REdshift 时出错(代码 1204)。“字符串长度超过 DDL 长度”

有没有人可以帮我解决这个问题?我的脚本如下。这是使用胶水控制台生成的。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "tga", table_name = "db", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("attachments", "string", "attachments", "string"), ("classifications.Classification", "array", "`classifications.Classification`", "string"), ("code", "string", "code", "string"), ("completionmapping.NrtCompletion", "array", "`completionmapping.NrtCompletion`", "string"), ("componenttype", "string", "componenttype", "string"), ("contacts.Contact", "array", "`contacts.Contact`", "string"), ("createddate.DateTime", "string", "`createddate.DateTime`", "string"), ("createddate.OffsetMinutes", "string", "`createddate.OffsetMinutes`", "string"), ("currencyperiods.NrtCurrencyPeriod", "array", "`currencyperiods.NrtCurrencyPeriod`", "string"), ("currencystatus", "string", "currencystatus", "string"), ("datamanagers.DataManagerAssignment", "array", "`datamanagers.DataManagerAssignment`", "string"), ("islegacydata", "boolean", "islegacydata", "boolean"), ("isstreamlined", "boolean", "isstreamlined", "boolean"), ("mappinginformation.Mapping", "array", "`mappinginformation.Mapping`", "string"), ("recognitionmanagers.RecognitionManagerAssignment", "array", "`recognitionmanagers.RecognitionManagerAssignment`", "string"), ("restrictions", "string", "restrictions", "string"), ("reversemappinginformation.Mapping", "array", "`reversemappinginformation.Mapping`", "string"), ("title", "string", "title", "string"), ("updateddate.DateTime", "string", "`updateddate.DateTime`", "string"), ("updateddate.OffsetMinutes", "string", "`updateddate.OffsetMinutes`", "string"), ("_code", "string", "_code", "string"), ("_salesforceid", "string", "_salesforceid", "string"), ("_api", "string", "_api", "string"), ("_timestamp", "string", "_timestamp", "string"), ("parentcode", "string", "parentcode", "string"), ("parenttitle", "string", "parenttitle", "string"), ("releases.Release", "array", "`releases.Release`", "string"), ("usagerecommendations.UsageRecommendation", "array", "`usagerecommendations.UsageRecommendation`", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = "dbtable": "trainingcomponentservicegetdetails", "database": "db", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

Scala 脚本...

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.MetadataBuilder
import scala.collection.JavaConverters._

object GlueApp 
  def main(sysArgs: Array[String]) 
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("TempDir","JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    val datasource0 = glueContext.getCatalogSource(database = "tga", tableName = "trainingcomponentservicegetdetails", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    val applymapping1 = datasource0.applyMapping(mappings = Seq(("attachments", "string", "attachments", "string"), ("classifications.Classification", "array", "`classifications.Classification`", "string"), ("code", "string", "code", "string"), ("completionmapping.NrtCompletion", "array", "`completionmapping.NrtCompletion`", "string"), ("componenttype", "string", "componenttype", "string"), ("contacts.Contact", "array", "`contacts.Contact`", "string"), ("createddate.DateTime", "string", "`createddate.DateTime`", "string"), ("createddate.OffsetMinutes", "string", "`createddate.OffsetMinutes`", "string"), ("currencyperiods.NrtCurrencyPeriod", "array", "`currencyperiods.NrtCurrencyPeriod`", "string"), ("currencystatus", "string", "currencystatus", "string"), ("datamanagers.DataManagerAssignment", "array", "`datamanagers.DataManagerAssignment`", "string"), ("islegacydata", "boolean", "islegacydata", "boolean"), ("isstreamlined", "boolean", "isstreamlined", "boolean"), ("mappinginformation.Mapping", "array", "`mappinginformation.Mapping`", "string"), ("recognitionmanagers.RecognitionManagerAssignment", "array", "`recognitionmanagers.RecognitionManagerAssignment`", "string"), ("restrictions", "string", "restrictions", "string"), ("reversemappinginformation.Mapping", "array", "`reversemappinginformation.Mapping`", "string"), ("title", "string", "title", "string"), ("updateddate.DateTime", "string", "`updateddate.DateTime`", "string"), ("updateddate.OffsetMinutes", "string", "`updateddate.OffsetMinutes`", "string"), ("_code", "string", "_code", "string"), ("_salesforceid", "string", "_salesforceid", "string"), ("_api", "string", "_api", "string"), ("_timestamp", "long", "_timestamp", "long"), ("industrysectors.TrainingComponentIndustrySector", "array", "`industrysectors.TrainingComponentIndustrySector`", "string"), ("occupations.TrainingComponentOccupation", "array", "`occupations.TrainingComponentOccupation`", "string"), ("parentcode", "string", "parentcode", "string"), ("parenttitle", "string", "parenttitle", "string"), ("releases.Release", "array", "`releases.Release`", "string"), ("usagerecommendations.UsageRecommendation", "array", "`usagerecommendations.UsageRecommendation`", "string"), ("tpdevelopercode", "string", "tpdevelopercode", "string")), caseSensitive = false, transformationContext = "applymapping1")
    val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")), transformationContext = "resolvechoice2")
    val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3")
    val datasink4 = glueContext.getJDBCSink(catalogConnection = "redshift", options = JsonOptions(""""dbtable": "trainingcomponentservicegetdetails", "database": "dbanasightmla""""), redshiftTmpDir = args("TempDir"), transformationContext = "datasink4").writeDynamicFrame(dropnullfields3)

    Job.commit()
  

我找到了这个,但我无法让它工作。

https://github.com/databricks/spark-redshift

    val columnLengthMap = Map(
"attachments" ->4000, 
"classifications.Classification" ->4000, 
"code" ->4000, 
"completionmapping.NrtCompletion" ->4000, 
"componenttype" ->4000, 
"contacts.Contact" ->4000, 
"createddate.DateTime" ->4000,  
"createddate.OffsetMinutes" ->4000, 
"currencyperiods.NrtCurrencyPeriod" ->4000, 
"currencystatus" ->4000, 
"datamanagers.DataManagerAssignment" ->4000, 
"`datamanagers.DataManagerAssignment`" ->4000, 
"islegacydata" ->4000,
"isstreamlined" ->4000,
"mappinginformation.Mapping" ->4000,
"recognitionmanagers.RecognitionManagerAssignment" ->4000,
"restrictions" ->4000,
"reversemappinginformation.Mapping" ->4000,
"title" ->4000,
"updateddate.DateTime" ->4000, 
"updateddate.OffsetMinutes" ->4000,
"_code" ->4000,
"_salesforceid" ->4000,
"_api" ->4000,
"_timestamp" ->4000,
"industrysectors.TrainingComponentIndustrySector" ->4000,
"occupations.TrainingComponentOccupation" ->4000,
"parentcode" ->4000,
"parenttitle" ->4000,
"releases.Release" ->4000,
"usagerecommendations.UsageRecommendation" ->4000,
"tpdevelopercode" ->4000)


val df: DataFrame = sqlContext.read
.format("com.databricks.spark.redshift")
.option("jdbc:redshift://anasight-redshift-mla.cf2ow8sevrix.ap-southeast-2.redshift.amazonaws.com:5439/dbanasightmla")
.option("trainingcomponentservicegetdetails", "trainingcomponentservicegetdetails")
.option("tempdir", "s3://redshift-anasight-2018/EMPLOYMENT")
.load()

columnLengthMap.foreach 
 case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))

【问题讨论】:

【参考方案1】:

根据redshift document,“使用 COPY 命令加载的单行的最大大小为 4 MB。有关更多信息,请参阅 Amazon Redshift 数据库开发人员指南中的 COPY。”我相信,在某些时候,记录大小超过了 4mb 的限制。你能检查一下记录大小吗?

https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html

认为行大小限制来自 Redshift 方面,我相信 Glue 连接在内部也确实使用 COPY 命令。请参阅此 AWS 文档链接 here。

请参阅 AWS 论坛中的讨论:https://forums.aws.amazon.com/thread.jspa?threadID=150345

Reference for DMS S3 to Redshift

【讨论】:

感谢 yuva 但我没有使用复制命令。我在 AWS GLue 中使用 python 脚本。 嗨,贝尼,我已经用一些额外的信息更新了我的答案。 谢谢尤瓦。我理解这个限制。我想知道是否有解决方法?谢谢! 如果数据不大,您可以尝试 INSERT into table..etc,您可以尝试的其他选项是使用 AWS DMS 的 S3 到 Redshift。我正在添加一些链接供您参考,您可以尝试一下。我还没有测试 INSERT into 选项,但它应该可以工作。 不幸的是,这不是一个选择。我们收到了几个 json 文件,加载它们的最佳方法是使用 AWS Glue 服务,这样我们就可以安排作业并将数据自动上传到 redshift。我找到了另一个选项,即 scala,但我的代码有错误。我会更新帖子,以便您看到。请让我知道如何增加列的字符串大小。谢谢!

以上是关于字符串长度超过 Glue 中的 DDL 长度(python,pyspark)的主要内容,如果未能解决你的问题,请参考以下文章

字符长度超过 DDL 长度

PHP数组和字符串相互转换以及判断字符串长度

2022华为机试真题 C++ 实现寻找相同子串

Java中的子字符串 - 长度不超过一个值

Oracle中substr用法,如果截取长度超过值的长度会返回啥值。

DDL(数据定义语言)