无法使用 spark(sqlContext) 在 aws redshift 中写入 csv 数据

Posted

技术标签:

【中文标题】无法使用 spark(sqlContext) 在 aws redshift 中写入 csv 数据【英文标题】:Not able to write csv data in aws redshift using spark(sqlContext) 【发布时间】:2020-10-04 22:07:14 【问题描述】:
spark version 2.4.1
artifact - spark-*_2.11
Dataset<Row> dataset = sqlContext.read().csv(url);
dataset
  .write()
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:postgresql://*:5439/dev?user=*****&password=*******")
  .option("dbtable", "dpay.testSpark") .option("tempdir", tempDir) 
  .option("forward_spark_s3_credentials", true)
  .mode(SaveMode.Append)
  .save();

收到此错误。你能帮忙解决这个问题吗?

java.io.InterruptedIOException: doesBucketExist on dreampay-stag-reconciliations: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:141) ~[hadoop-aws-3.0.0.jar:na]
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:332) ~[hadoop-aws-3.0.0.jar:na]
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:275) ~[hadoop-aws-3.0.0.jar:na]
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3288) ~[hadoop-common-3.0.0.jar:na]
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) ~[hadoop-common-3.0.0.jar:na]
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3337) ~[hadoop-common-3.0.0.jar:na]
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3305) ~[hadoop-common-3.0.0.jar:na]
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:476) ~[hadoop-common-3.0.0.jar:na]
    at com.databricks.spark.redshift.Utils$.assertThatFileSystemIsNotS3BlockFileSystem(Utils.scala:162) ~[spark-redshift_2.11-3.0.0-preview1.jar:3.0.0-preview1]
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:386) ~[spark-redshift_2.11-3.0.0-preview1.jar:3.0.0-preview1]
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:108) ~[spark-redshift_2.11-3.0.0-preview1.jar:3.0.0-preview1]
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) ~[spark-sql_2.11-2.4.1.jar:2.4.1]
    at com.dreampay.reconsonsumer.service.FileConsumer.writeDataInRedShift(FileConsumer.java:281) ~[classes/:na]
    at com.dreampay.reconsonsumer.service.FileConsumer.lambda$null$1(FileConsumer.java:153) ~[classes/:na]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_261]
    at com.dreampay.reconsonsumer.service.FileConsumer.lambda$start$e3b46054$1(FileConsumer.java:95) ~[classes/:na]
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.12.jar:na]
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[scala-library-2.11.12.jar:na]
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) ~[spark-streaming_2.11-2.4.1.jar:2.4.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_261]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_261]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_261]
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
    at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:151) ~[hadoop-aws-3.0.0.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1119) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:759) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:723) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4221) ~[aws-java-sdk-s3-1.11.133.jar:na]
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4168) ~[aws-java-sdk-s3-1.11.133.jar:na]
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1306) ~[aws-java-sdk-s3-1.11.133.jar:na]
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1263) ~[aws-java-sdk-s3-1.11.133.jar:na]
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:320) ~[hadoop-aws-3.0.0.jar:na]
    ... 54 common frames omitted
Caused by: com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
    at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:180) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:159) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:141) ~[aws-java-sdk-core-1.11.133.jar:na]
    at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:129) ~[hadoop-aws-3.0.0.jar:na]
    ... 67 common frames omitted
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0_261]
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) ~[na:1.8.0_261]
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218) ~[na:1.8.0_261]
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) ~[na:1.8.0_261]
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) ~[na:1.8.0_261]
    at java.net.Socket.connect(Socket.java:606) ~[na:1.8.0_261]
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[na:1.8.0_261]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[na:1.8.0_261]
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[na:1.8.0_261]
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[na:1.8.0_261]
    at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[na:1.8.0_261]
    at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[na:1.8.0_261]
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226) ~[na:1.8.0_261]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162) ~[na:1.8.0_261]
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) ~[na:1.8.0_261]
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) ~[na:1.8.0_261]
    at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:47) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:106) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:77) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:156) ~[aws-java-sdk-core-1.11.133.jar:na]
    at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:121) ~[aws-java-sdk-core-1.11.133.jar:na]
    ... 70 common frames omitted

【问题讨论】:

数据集 数据集 = sqlContext.read().csv(url);数据集 .write() .format("com.databricks.spark.redshift") .option("url", "jdbc:postgresql://*:5439/dev?user=*****&password=*** ****") .option("dbtable", "dpay.testSpark") .option("tempdir", tempDir) .option("forward_spark_s3_credentials", true) .mode(SaveMode.Append) .save(); @paul 你能帮帮我吗? 【参考方案1】:

尝试单独提供您的凭据:

Dataset<Row> dataset = sqlContext.read().csv(url);

 dataset
      .write()
      .format("com.databricks.spark.redshift")
      .option("url", "jdbc:postgresql://*:5439/dev")
      .option("use"', "userid here")
      .option("password", "pasword here")
      .option("dbtable", "dpay.testSpark") 
      .option("tempdir", tempDir) 
      .option("forward_spark_s3_credentials", true)
      .mode(SaveMode.Append)
      .save();

【讨论】:

以上是关于无法使用 spark(sqlContext) 在 aws redshift 中写入 csv 数据的主要内容,如果未能解决你的问题,请参考以下文章

sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark 集群上运行在哪里?

NoSuchMethodError:org.apache.spark.sql.SQLContext.applySchema

Spark sql注册的临时表不能在sqlContext.read()中使用?

使用 sqlcontext spark 执行 sql join [重复]

Spark2 中的 SQLContext 没有获取更新的配置单元表记录

理解Spark SQL(二)—— SQLContext和HiveContext