如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?
Posted
技术标签:
【中文标题】如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?【英文标题】:how to write spark dataframe into avro file format in jupyter notebook? 【发布时间】:2019-05-02 07:36:02 【问题描述】:我已经配置了具有 1 个主节点和 2 个核心的 Amazon EMR 集群。以下是 EMR 上的软件安装: Hive 2.3.4、Pig 0.17.0、Hue 4.3.0、Ganglia 3.7.2、Spark 2.4.0、TensorFlow 1.12.0。
我没有配置任何引导操作。现在,集群已启动并等待步骤。我已经从 EMR 启动 notebook,下面是代码的详细信息。
sdf = spark.read.csv('hdfs://i....:8020/user/root/temp.csv')
这执行得很完美,我可以通过 sdf.show() 看到我的数据框
但是,当我尝试写入 avro 文件时,它失败了
sdf.write.format("avro").save("avro_file.avro")
错误:
u'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
self._jwrite.save(path)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
我试过了:
sdf.write.format("org.apache.spark.sql.avro").save("avro_file.avro")
同样的错误
u'Failed to find data source: org.apache.spark.sql.avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
self._jwrite.save(path)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Failed to find data source: org.apache.spark.sql.avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
我也尝试过通过 spark 交互会话:
[ec2-user@ip-xxxx conf]$ sudo pyspark --packages org.apache.spark:spark-avro_2.12:2.4.2
Python 2.7.16 (default, Mar 18 2019, 18:38:44)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e8c82e1e-629a-4d83-844d-a86057fc5ae7;1.0
confs: [default]
found org.apache.spark#spark-avro_2.12;2.4.2 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 209ms :: artifacts dl 6ms
:: modules in use:
org.apache.spark#spark-avro_2.12;2.4.2 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e8c82e1e-629a-4d83-844d-a86057fc5ae7
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/05/02 07:23:00 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/05/02 07:23:03 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-avro_2.12-2.4.2.jar added multiple times to distributed cache.
19/05/02 07:23:03 WARN Client: Same path resource file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 2.7.16 (default, Mar 18 2019 18:38:44)
SparkSession available as 'spark'.
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> df.write.format("avro").save("avro_file.avro")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 736, in save
self._jwrite.save(path)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o83.save.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
at org.apache.spark.sql.avro.AvroFileFormat.<init>(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 24 more
>>>
我也尝试过更新 /etc/spark/conf/spark-defaults.conf 以拥有
spark.jars.packages org.apache.spark:spark-avro_2.12:2.4.2, com.databricks:spark-csv_2.11:1.5.0
但是,发布此配置 jupyter notebook 无法启动 spark 并给出以下错误:
The code failed because of a fatal error:
Session 4 did not start up in 60 seconds..
Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.
【问题讨论】:
你的 Spark 版本是什么? 我在配置 emr 时选择了 2.4.0,pyspark 横幅似乎也证实了这一点。我应该运行任何命令来检查(也许在笔记本中)?? 你需要以下依赖。val sparkVersion = "2.4.0" "org.apache.spark" %% "spark-avro" % sparkVersion
如果你建议使用:org.apache.spark:spark-avro_2.12:2.4.0 那么它也给了提供者 org.apache.spark.sql.avro.AvroFileFormat 不能被实例化错误!!
【参考方案1】:
火花2.4.3
:
将 spark_arvo 版本返回到 org.apache.spark:spark-avro_2.11:2.4.3
,为我解决了这个问题。
另外,在您的jupyter-notebook
中,在启动spark-context
之前添加以下行:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark avro_2.11:2.4.3 pyspark-shell'
【讨论】:
注意:这里假设您运行 Spark 2.4.3,但问题是2.4.0 哦,抱歉错过了。我在Spark 2.4.2
上遇到了与 spark-avro_2.12
相同的问题。感谢您指出这一点,编辑我的答案。
另外,2.12
与 2.11
取决于构建 Spark 的 Scala 版本;)以上是关于如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?的主要内容,如果未能解决你的问题,请参考以下文章
在 jupyter notebook 中将自定义 jars 添加到 pyspark
在 Docker 容器中将 Spyder 连接到远程 Jupyter Notebook
在 jupyter notebook 中将代码单元更改为 markdown 单元的快捷键
无法在 Jupyter Notebook 中导入 TensorFlow