使用PySpark的ETL雪花作业在本地而不在Dataproc上工作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用PySpark的ETL雪花作业在本地而不在Dataproc上工作相关的知识,希望对你有一定的参考价值。

我创建了一个火花作业,并且首先在本地对其进行了测试,并且可以完美地工作。但是,将spark作业传递给Dataproc之后,它将返回以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o78.load.
: net.snowflake.client.jdbc.SnowflakeSQLException: Private key provided is invalid or not supported: Please use java.security.interfaces.RSAPrivateCrtKey.class

Pyspark代码

sfOptions = ConnectToSnowflake(creds_path='creds.json').get_spark_sf_creds()

spark = SparkSession \
    .builder \
    .config("spark.jars", "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.repl.local.jars",
            "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

snowflake_source_name = 'net.snowflake.spark.snowflake'

df = spark.read.format(snowflake_source_name) \
    .options(**sfOptions) \
    .option("query", query) \
    .load()

df.show()

Dataproc

gcloud dataproc jobs submit pyspark --cluster featurelib-cluster \
    --jars gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar \
    --py-files snowflake_connector.py \
    --files creds.json,daodl_access.json \
    sf_loader.py

错误的完整堆栈跟踪

net.snowflake.client.jdbc.SnowflakeSQLException: Private key provided is invalid or not supported: Please use java.security.interfaces.RSAPrivateCrtKey.class
        at net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:136)
        at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:148)
        at java.sql.DriverManager.getConnection(DriverManager.java:664)
        at java.sql.DriverManager.getConnection(DriverManager.java:208)
        at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:180)
        at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:56)
        at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:53)
        at scala.Option.getOrElse(Option.scala:121)
        at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:53)
        at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:52)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)
        at $line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:43)
        at $line17.$read$$iw$$iw$$iw.<init>(<console>:45)
        at $line17.$read$$iw$$iw.<init>(<console>:47)
        at $line17.$read$$iw.<init>(<console>:49)
        at $line17.$read.<init>(<console>:51)
        at $line17.$read$.<init>(<console>:55)
        at $line17.$read$.<clinit>(<console>)
        at $line17.$eval$.$print$lzycompute(<console>:7)
        at $line17.$eval$.$print(<console>:6)
        at $line17.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:819)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:691)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:425)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:285)
        at org.apache.spark.repl.SparkILoop.runClosure(SparkILoop.scala:159)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:182)
        at org.apache.spark.repl.Main$.doMain(Main.scala:78)
        at org.apache.spark.repl.Main$.main(Main.scala:58)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
答案

要传递creds.json键,应使用--jars标志而不是--files标志(这样它将被添加到类路径中:]]

gcloud dataproc jobs submit pyspark --cluster featurelib-cluster \
    --jars gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar,creds.json \
    --py-files snowflake_connector.py \
    --files daodl_access.json \
    sf_loader.py

如果不起作用,请尝试将雪花罐子升级到最新版本。

以上是关于使用PySpark的ETL雪花作业在本地而不在Dataproc上工作的主要内容,如果未能解决你的问题,请参考以下文章

AWS Glue ETL 作业中的 Boto3 Glue

在运行 AWS Glue ETL 作业并命名输出文件名时,有没有办法从 S3 存储桶中读取文件名。 pyspark 是不是提供了一种方法来做到这一点?

无法使用 Pyspark 从 EMR 集群连接到雪花

如何使用 Spark/PySpark 删除雪花目标表

在 pyspark 作业中发送和使用 virtualenv

在作业 ETL-Load 中执行步骤 ETL-file-load 时遇到错误