使用PySpark的ETL雪花作业在本地而不在Dataproc上工作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用PySpark的ETL雪花作业在本地而不在Dataproc上工作相关的知识,希望对你有一定的参考价值。
我创建了一个火花作业,并且首先在本地对其进行了测试,并且可以完美地工作。但是,将spark作业传递给Dataproc之后,它将返回以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o78.load.
: net.snowflake.client.jdbc.SnowflakeSQLException: Private key provided is invalid or not supported: Please use java.security.interfaces.RSAPrivateCrtKey.class
Pyspark代码
sfOptions = ConnectToSnowflake(creds_path='creds.json').get_spark_sf_creds()
spark = SparkSession \
.builder \
.config("spark.jars", "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.repl.local.jars",
"jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
snowflake_source_name = 'net.snowflake.spark.snowflake'
df = spark.read.format(snowflake_source_name) \
.options(**sfOptions) \
.option("query", query) \
.load()
df.show()
Dataproc
gcloud dataproc jobs submit pyspark --cluster featurelib-cluster \
--jars gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar \
--py-files snowflake_connector.py \
--files creds.json,daodl_access.json \
sf_loader.py
错误的完整堆栈跟踪
net.snowflake.client.jdbc.SnowflakeSQLException: Private key provided is invalid or not supported: Please use java.security.interfaces.RSAPrivateCrtKey.class
at net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:136)
at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:148)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:180)
at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:56)
at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:53)
at scala.Option.getOrElse(Option.scala:121)
at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:53)
at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:52)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)
at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)
at $line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)
at $line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:43)
at $line17.$read$$iw$$iw$$iw.<init>(<console>:45)
at $line17.$read$$iw$$iw.<init>(<console>:47)
at $line17.$read$$iw.<init>(<console>:49)
at $line17.$read.<init>(<console>:51)
at $line17.$read$.<init>(<console>:55)
at $line17.$read$.<clinit>(<console>)
at $line17.$eval$.$print$lzycompute(<console>:7)
at $line17.$eval$.$print(<console>:6)
at $line17.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:819)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:691)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:425)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:285)
at org.apache.spark.repl.SparkILoop.runClosure(SparkILoop.scala:159)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:182)
at org.apache.spark.repl.Main$.doMain(Main.scala:78)
at org.apache.spark.repl.Main$.main(Main.scala:58)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
答案
要传递creds.json
键,应使用--jars
标志而不是--files
标志(这样它将被添加到类路径中:]]
gcloud dataproc jobs submit pyspark --cluster featurelib-cluster \ --jars gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar,creds.json \ --py-files snowflake_connector.py \ --files daodl_access.json \ sf_loader.py
如果不起作用,请尝试将雪花罐子升级到最新版本。
以上是关于使用PySpark的ETL雪花作业在本地而不在Dataproc上工作的主要内容,如果未能解决你的问题,请参考以下文章
在运行 AWS Glue ETL 作业并命名输出文件名时,有没有办法从 S3 存储桶中读取文件名。 pyspark 是不是提供了一种方法来做到这一点?