尝试将胶水表复制到红移时出现“在 awaitResult 中引发的异常:”错误

Posted

技术标签:

【中文标题】尝试将胶水表复制到红移时出现“在 awaitResult 中引发的异常:”错误【英文标题】:Getting the 'Exception thrown in awaitResult:' error when trying to copy table in glue to redshift 【发布时间】:2020-07-24 23:19:26 【问题描述】:

我一直在尝试将一张表格从胶水复制到红移表格。我使用以下代码创建了一个工作

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "booker", table_name = "relationalized_parquet_itemdetailsnew_output_itemdetails", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "booker", table_name = "relationalized_parquet_itemdetailsnew_output_itemdetails", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "long", "id", "long"), ("index", "int", "index", "int"), ("`itemdetails.val.calculatedweight`", "double", "calculated_weight", "double"), ("`itemdetails.val.weightsigma`", "double", "weight_sigma", "double"), ("`itemdetails.val.itemaggregationid.long`", "long", "item_aggregation_id_long", "long"), ("`itemdetails.val.quantity`", "int", "quantity", "int"), ("`itemdetails.val.expectedweight`", "double", "expected_weight", "double"), ("`itemdetails.val.weightoverridden`", "boolean", "weight_overridden", "boolean"), ("`itemdetails.val.itemid`", "string", "item_id", "string"), ("`itemdetails.val.itemaggregationid.int`", "int", "item_aggregation_id_int", "int"), ("`itemdetails.val.type`", "int", "type", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "long", "id", "long"), ("index", "int", "index", "int"), ("`itemdetails.val.calculatedweight`", "double", "calculated_weight", "double"), ("`itemdetails.val.weightsigma`", "double", "weight_sigma", "double"), ("`itemdetails.val.itemaggregationid.long`", "long", "item_aggregation_id_long", "long"), ("`itemdetails.val.quantity`", "int", "quantity", "int"), ("`itemdetails.val.expectedweight`", "double", "expected_weight", "double"), ("`itemdetails.val.weightoverridden`", "boolean", "weight_overridden", "boolean"), ("`itemdetails.val.itemid`", "string", "item_id", "string"), ("`itemdetails.val.itemaggregationid.int`", "int", "item_aggregation_id_int", "int"), ("`itemdetails.val.type`", "int", "type", "int")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["expected_weight", "quantity", "item_aggregation_id_int", "item_id", "length", "index", "type", "gift_option", "weight_overridden", "width", "id", "weight_sigma", "calculated_weight", "item_aggregation_id_long", "height"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["expected_weight", "quantity", "item_aggregation_id_int", "item_id", "length", "index", "type", "gift_option", "weight_overridden", "width", "id", "weight_sigma", "calculated_weight", "item_aggregation_id_long", "height"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "delphi_redshift", table_name = "delphi_shajeec_sd_item_details", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "delphi_redshift", table_name = "delphi_shajeec_sd_item_details", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "delphi_redshift", table_name = "delphi_shajeec_sd_item_details", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "delphi_redshift", table_name = "delphi_shajeec_sd_item_details", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()

运行此作业后,我收到错误“'An error occurred while calling o116.pyWriteDynamicFrame.Exception throw in awaitResult:'. 错误日志具体说:

020-07-24 22:00:47,493 WARN  [Thread-9] redshift.RedshiftWriter (RedshiftWriter.scala:retry$1(132)) - Exception thrown while running copy query. Exception message: Exception thrown in awaitResult: .Retrying 2 more times.
2020-07-24 22:00:47,524 WARN  [Thread-9] redshift.RedshiftWriter (RedshiftWriter.scala:retry$1(135)) - Sleeping 30000 milliseconds before proceeding to retry redshift copy
2020-07-24 22:00:49,236 INFO  [dispatcher-event
    
2020-07-24T17:01:55.663-05:00
    
-loop-2] cluster.YarnSchedulerBackend$YarnDriverEndpoint (Logging.scala:logInfo(54)) - Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.31.1.131:44498) with ID 4
2020-07-24 22:00:49,237 INFO  [spark-listener-group-executorManagement] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - New executor 4 has registered (new total is 3)
2020-07-24 22:00:49,353 INFO  [dispatcher-event-loop-0] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Registering block manager ip-172-31-1-131.ec2.internal:43747 with 2.8 GB RAM, BlockManagerId(4, ip-172-31-1-131.ec2.internal, 43747, None)
2020-07-24 22:00:50,043 INFO  [dispatcher-event-loop-2] cluster.YarnSchedulerBackend$YarnDriverEndpoint (Logging.scala:logInfo(54)) - Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.31.2.115:56504) with ID 3
2020-07-24 22:00:50,044 INFO  [spark-listener-group-executorManagement] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - New executor 3 has registered (new total is 4)
2020-07-24 22:00:50,144 INFO  [dispatcher-event-loop-0] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Registering block manager ip-172-31-2-115.ec2.internal:42197 with 2.8 GB RAM, BlockManagerId(3, ip-172-31-2-115.ec2.internal, 42197, None)
2020-07-24 22:01:18,950 WARN  [Thread-9] redshift.RedshiftWriter (RedshiftWriter.scala:retry$1(132)) - Exception thrown while running copy query. Exception message: Exception thrown in awaitResult: .Retrying 1 more times.
2020-07-24 22:01:18,988 WARN  [Thread-9] redshift.RedshiftWriter (RedshiftWriter.scala:retry$1(135)) - Sleeping 30000 milliseconds before proceeding to retry redshift copy
2020-07-24 22:01:45,785 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Request to remove executorIds: 2
2020-07-24 22:01:45,786 INFO  [spark-dynamic-executor-allocation] cluster.YarnClusterSchedulerBackend (Logging.scala:logInfo(54)) - Requesting to kill executor(s) 2
2020-07-24 22:01:45,788 INFO  [spark-dynamic-executor-allocation] cluster.YarnClusterSchedulerBackend (Logging.scala:logInfo(54)) - Actual list of executor(s) to be killed is 2
2020-07-24 22:01:45,789 INFO  [dispatcher-event-loop-2] yarn.ApplicationMaster$AMEndpoint (Logging.scala:logInfo(54)) - Driver requested to kill executor(s) 2.
2020-07-24 22:01:45,790 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Removing executor 2 because it has been idle for 60 seconds (new desired total will be 3)
2020-07-24 22:01:45,891 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Request to remove executorIds: 1
2020-07-24 22:01:45,891 INFO  [spark-dynamic-executor-allocation] cluster.YarnClusterSchedulerBackend (Logging.scala:logInfo(54)) - Requesting to kill executor(s) 1
2020-07-24 22:01:45,891 INFO  [spark-dynamic-executor-allocation] cluster.YarnClusterSchedulerBackend (Logging.scala:logInfo(54)) - Actual list of executor(s) to be killed is 1
2020-07-24 22:01:45,891 INFO  [dispatcher-event-loop-3] yarn.ApplicationMaster$AMEndpoint (Logging.scala:logInfo(54)) - Driver requested to kill executor(s) 1.
2020-07-24 22:01:45,892 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Removing executor 1 because it has been idle for 60 seconds (new desired total will be 2)
2020-07-24 22:01:49,290 INFO  [dispatcher-event-loop-3] cluster.YarnSchedulerBackend$YarnDriverEndpoint (Logging.scala:logInfo(54)) - Disabling executor 1.
2020-07-24 22:01:49,293 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Executor lost: 1 (epoch 1)
2020-07-24 22:01:49,294 INFO  [dispatcher-event-loop-2] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Trying to remove executor 1 from BlockManagerMaster.
2020-07-24 22:01:49,295 INFO  [dispatcher-event-loop-2] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Removing block manager BlockManagerId(1, ip-172-31-7-142.ec2.internal, 44815, None)
2020-07-24 22:01:49,295 INFO  [dag-scheduler-event-loop] storage.BlockManagerMaster (Logging.scala:logInfo(54)) - Removed 1 successfully in removeExecutor
2020-07-24 22:01:49,297 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Request to remove executorIds: 4
2020-07-24 22:01:49,297 INFO  [spark-dynamic-executor-allocation] cluster.YarnClusterSchedulerBackend (Logging.scala:logInfo(54)) - Requesting to kill executor(s) 4
2020-07-24 22:01:49,297 INFO  [spark-dynamic-executor-allocation] cluster.YarnClusterSchedulerBackend (Logging.scala:logInfo(54)) - Actual list of executor(s) to be killed is 4
2020-07-24 22:01:49,297 INFO  [dispatcher-event-loop-1] yarn.ApplicationMaster$AMEndpoint (Logging.scala:logInfo(54)) - Driver requested to kill executor(s) 4.
2020-07-24 22:01:49,298 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Removing executor 4 because it has been idle for 60 seconds (new desired total will be 1)
2020-07-24 22:01:49,302 INFO  [dispatcher-event-loop-3] cluster.YarnClusterScheduler (Logging.scala:logInfo(54)) - Executor 1 on ip-172-31-7-142.ec2.internal killed by driver.
2020-07-24 22:01:49,303 INFO  [spark-listener-group-executorManagement] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Existing executor 1 has been removed (new total is 3)
2020-07-24 22:01:49,818 INFO  [dispatcher-event-loop-2] cluster.YarnSchedulerBackend$YarnDriverEndpoint (Logging.scala:logInfo(54)) - Disabling executor 2.
2020-07-24 22:01:49,819 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Executor lost: 2 (epoch 1)
2020-07-24 22:01:49,819 INFO  [dispatcher-event-loop-2] cluster.YarnClusterScheduler (Logging.scala:logInfo(54)) - Executor 2 on ip-172-31-11-12.ec2.internal killed by driver.
2020-07-24 22:01:49,819 INFO  [dispatcher-event-loop-3] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Trying to remove executor 2 from BlockManagerMaster.
2020-07-24 22:01:49,819 INFO  [dispatcher-event-loop-3] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Removing block manager BlockManagerId(2, ip-172-31-11-12.ec2.internal, 35007, None)
2020-07-24 22:01:49,820 INFO  [dag-scheduler-event-loop] storage.BlockManagerMaster (Logging.scala:logInfo(54)) - Removed 2 successfully in removeExecutor
2020-07-24 22:01:49,820 INFO  [spark-listener-group-executorManagement] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Existing executor 2 has been removed (new total is 2)
2020-07-24 22:01:50,099 INFO  [spark-dynamic-executor-allocation] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Request to remove executorIds: 3
2020-07-24 22:01:51,044 INFO  [dispatcher-event-loop-3] cluster.YarnSchedulerBackend$YarnDriverEndpoint (Logging.scala:logInfo(54)) - Disabling executor 4.
2020-07-24 22:01:51,044 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Executor lost: 4 (epoch 1)
2020-07-24 22:01:51,045 INFO  [dispatcher-event-loop-1] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Trying to remove executor 4 from BlockManagerMaster.
2020-07-24 22:01:51,045 INFO  [dispatcher-event-loop-3] cluster.YarnClusterScheduler (Logging.scala:logInfo(54)) - Executor 4 on ip-172-31-1-131.ec2.internal killed by driver.
2020-07-24 22:01:51,045 INFO  [dispatcher-event-loop-1] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(54)) - Removing block manager BlockManagerId(4, ip-172-31-1-131.ec2.internal, 43747, None)
2020-07-24 22:01:51,045 INFO  [dag-scheduler-event-loop] storage.BlockManagerMaster (Logging.scala:logInfo(54)) - Removed 4 successfully in removeExecutor
2020-07-24 22:01:51,046 INFO  [spark-listener-group-executorManagement] spark.ExecutorAllocationManager (Logging.scala:logInfo(54)) - Existing executor 4 has been removed (new total is 1)
2020-07-24 22:01:52,841 ERROR [Thread-9] redshift.RedshiftWriter (RedshiftWriter.scala:retry$1(142)) - SQLException thrown while running COPY query; will attempt to retrieve more information by querying the STL_LOAD_ERRORS table
java.sql.SQLException: Exception thrown in awaitResult: 
    at com.databricks.spark.redshift.JDBCWrapper.com$databricks$spark$redshift$JDBCWrapper$$executeInterruptibly(RedshiftJDBCWrapper.scala:133)
    at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1$$anonfun$apply$mcV$sp$2.apply(RedshiftWriter.scala:218)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1$$anonfun$apply$mcV$sp$2.apply(RedshiftWriter.scala:215)
    at scala.Option.foreach(Option.scala:257)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply$mcV$sp(RedshiftWriter.scala:215)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply(RedshiftWriter.scala:195)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply(RedshiftWriter.scala:195)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.spark.redshift.RedshiftWriter.retry$1(RedshiftWriter.scala:129)
    at com.databricks.spark.redshift.RedshiftWriter.doRedshiftLoad(RedshiftWriter.scala:195)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:437)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:122)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.amazonaws.services.glue.util.RedshiftWrapper.writeDF(JDBCUtils.scala:975)
    at com.amazonaws.services.glue.RedshiftDataSink.writeDynamicFrame(RedshiftDataSink.scala:199)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:55)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: [Amazon](500310) Invalid operation: Load into table 'sd_item_details' failed.  Check 'stl_load_errors' system table for details.;
    at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleErrorResponse(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
    at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getErrorResponse(Unknown Source)
    at com.amazon.redshift.client.PGClient.handleErrorsScenario2ForPrepareExecution(Unknown Source)
    at com.amazon.redshift.client.PGClient.handleErrorsPrepareExecute(Unknown Source)
    at com.amazon.redshift.client.PGClient.executePreparedStatement(Unknown Source)
    at com.amazon.redshift.dataengine.PGQueryExecutor.executePreparedStatement(Unknown Source)
    at com.amazon.redshift.dataengine.PGQueryExecutor.execute(Unknown Source)
    at com.amazon.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
    at com.amazon.jdbc.common.SPreparedStatement.execute(Unknown Source)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$2.apply(RedshiftJDBCWrapper.scala:127)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) Invalid operation: Load into table 'sd_item_details' failed.  Check 'stl_load_errors' system table for details.;
    ... 20 more
2020-07-24 22:01:52,962 ERROR [Thread-9] redshift.RedshiftWriter (RedshiftWriter.scala:saveToRedshift(442)) - Exception thrown during Redshift load; will roll back transaction
java.sql.SQLException: Exception thrown in awaitResult: 
    at com.databricks.spark.redshift.JDBCWrapper.com$databricks$spark$redshift$JDBCWrapper$$executeInterruptibly(RedshiftJDBCWrapper.scala:133)
    at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1$$anonfun$apply$mcV$sp$2.apply(RedshiftWriter.scala:218)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1$$anonfun$apply$mcV$sp$2.apply(RedshiftWriter.scala:215)
    at scala.Option.foreach(Option.scala:257)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply$mcV$sp(RedshiftWriter.scala:215)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply(RedshiftWriter.scala:195)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply(RedshiftWriter.scala:195)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.spark.redshift.RedshiftWriter.retry$1(RedshiftWriter.scala:129)
    at com.databricks.spark.redshift.RedshiftWriter.doRedshiftLoad(RedshiftWriter.scala:195)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:437)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:122)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.amazonaws.services.glue.util.RedshiftWrapper.writeDF(JDBCUtils.scala:975)
    at com.amazonaws.services.glue.RedshiftDataSink.writeDynamicFrame(RedshiftDataSink.scala:199)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:55)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: [Amazon](500310) Invalid operation: Load into table 'sd_item_details' failed.  Check 'stl_load_errors' system table for details.;
    at com.amazon.redshift.client.messages.inbound.ErrorResponse.toErrorException(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleErrorResponse(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.handleMessage(Unknown Source)
    at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(Unknown Source)
    at com.amazon.redshift.client.PGMessagingContext.getErrorResponse(Unknown Source)
    at com.amazon.redshift.client.PGClient.handleErrorsScenario2ForPrepareExecution(Unknown Source)
    at com.amazon.redshift.client.PGClient.handleErrorsPrepareExecute(Unknown Source)
    at com.amazon.redshift.client.PGClient.executePreparedStatement(Unknown Source)
    at com.amazon.redshift.dataengine.PGQueryExecutor.executePreparedStatement(Unknown Source)
    at com.amazon.redshift.dataengine.PGQueryExecutor.execute(Unknown Source)
    at com.amazon.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
    at com.amazon.jdbc.common.SPreparedStatement.execute(Unknown Source)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$executeInterruptibly$1.apply(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.JDBCWrapper$$anonfun$2.apply(RedshiftJDBCWrapper.scala:127)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Caused by: com.amazon.support.exceptions.ErrorException: [Amazon](500310) Invalid operation: Load into table 'sd_item_details' failed.  Check 'stl_load_errors' system table for details.;
    ... 20 more
Traceback (most recent call last):
  File "script_2020-07-24-21-59-59.py", line 45, in <module>
    datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "delphi_redshift", table_name = "delphi_shajeec_sd_item_details", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 657, in from_catalog
    return self._glue_context.write_dynamic_frame_from_catalog(frame, db, table_name, redshift_tmp_dir, transformation_ctx, additional_options, catalog_id)
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/PyGlue.zip/awsglue/context.py", line 296, in write_dynamic_frame_from_catalog
    return DataSink(j_sink, self).write(frame)
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/PyGlue.zip/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/PyGlue.zip/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/mnt/yarn/usercache/root/appcache/application_1595627243772_0003/container_1595627243772_0003_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o116.pyWriteDynamicFrame.
: java.sql.SQLException: Exception thrown in awaitResult: 
    at com.databricks.spark.redshift.JDBCWrapper.com$databricks$spark$redshift$JDBCWrapper$$executeInterruptibly(RedshiftJDBCWrapper.scala:133)
    at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:109)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1$$anonfun$apply$mcV$sp$2.apply(RedshiftWriter.scala:218)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1$$anonfun$apply$mcV$sp$2.apply(RedshiftWriter.scala:215)
    at scala.Option.foreach(Option.scala:257)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply$mcV$sp(RedshiftWriter.scala:215)
    at com.databricks.spark.redshift.RedshiftWriter$$anonfun$doRedshiftLoad$1.apply(RedshiftWriter.scala:195)
    ... 20 more

我该如何纠正这个错误?

【问题讨论】:

实际错误是(取自您的堆栈跟踪):[Amazon](500310) Invalid operation: Load into table 'sd_item_details' failed. Check 'stl_load_errors' system table for details.; 你能检查一下stl_load_errors,也许那里有一些细节? 【参考方案1】:

从 S3 复制到 Redshift 表时遇到类似错误。有用的东西:

确保 RedshiftTempDir 引用的 S3 位置可供 Redshift 访问。 RedshiftTempDir 有一个清单文件,其中包含需要在 Redshift 中加载的 S3 对象路径列表。更多信息可以在这里找到:COPY from Amazon S3

COPY 如果未找到指定的清单文件或清单文件格式不正确,Redshift 中的命令将返回错误。 COPY 命令需要授权才能访问另一个 AWS 资源中的数据, 包括在 Amazon S3、Amazon EMR、Amazon DynamoDB 和 Amazon EC2 中。

AWS Glue 脚本中用于将数据从 S3 复制到 Redshift 的命令:
val redshiftOutput = glueContext.getJDBCSink(catalogConnection = "your-connection-name", options = JsonOptions("\"database\" : \"yourDB\", \"dbtable\" : \"your_table\" "), redshiftTmpDir = "s3://yourRedshiftTmpDirPath/").writeDynamicFrame(yourframetocopy)

【讨论】:

【参考方案2】:

在处理从 RDS 到 Redshift 的数据的作业时,我遇到了类似的错误“o132.pyWriteDynamicFrame。在 awaitResult 中引发异常:”。多次检查写动态框架类,没有发现问题。

然后,我查看了数据类型映射并进行了一些更改,例如: 错误解决前的映射 = applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("client", "string", "client", "string"), ("id", "int", "id", "long")...

错误解决后的映射 = applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("client", "string", "client", "string"), ("id", "id", "numeric(20,0)")...

映射问题是什么? Redshift 不支持具有选定精度的整数类型,即 (INT(11,0),因此作业在之前被映射到 BIGINT 时失败。将目标更改为具有选定精度的数字类型可以解决问题。

以下提供了有关如何处理数据类型映射的指南: https://docs.aws.amazon.com/redshift/latest/dg/federated-data-types.html

'pyWriteDynamicFrame. awaitResult 中抛出的异常:'错误可能不是写入动态框架类的直接结果,而是强调了脚本中的其他问题。

【讨论】:

我得到它在我存储的过程参数周围加上单引号。当我删除单引号时,我得到一个错误,不存在这样的列。你有没有使用动态框架从胶水调用存储过程? ...,"postactions":"调用stored_proc('"+myString+"','"+myDate+"')"

以上是关于尝试将胶水表复制到红移时出现“在 awaitResult 中引发的异常:”错误的主要内容,如果未能解决你的问题,请参考以下文章

胶水加载作业不保留红移中的默认列值

将 Hive 表迁移到红移

从 dynamodb 复制到 s3

有没有办法通过数据管道以预定义的顺序将文件从 S3 复制到红移

我们可以使用复制命令使用访问密钥和秘密密钥将数据从 S3 加载到红移表中吗(不使用 IAM 角色)

Spark没有将所有数据保存到红移