如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?

Posted

技术标签:

【中文标题】如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?【英文标题】:how to write spark dataframe into avro file format in jupyter notebook? 【发布时间】:2019-05-02 07:36:02 【问题描述】:

我已经配置了具有 1 个主节点和 2 个核心的 Amazon EMR 集群。以下是 EMR 上的软件安装: Hive 2.3.4、Pig 0.17.0、Hue 4.3.0、Ganglia 3.7.2、Spark 2.4.0、TensorFlow 1.12.0。

我没有配置任何引导操作。现在,集群已启动并等待步骤。我已经从 EMR 启动 notebook,下面是代码的详细信息。

sdf = spark.read.csv('hdfs://i....:8020/user/root/temp.csv')

这执行得很完美,我可以通过 sdf.show() 看到我的数据框

但是,当我尝试写入 avro 文件时,它失败了

sdf.write.format("avro").save("avro_file.avro")

错误:

u'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'

我试过了:

sdf.write.format("org.apache.spark.sql.avro").save("avro_file.avro")

同样的错误

u'Failed to find data source: org.apache.spark.sql.avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Failed to find data source: org.apache.spark.sql.avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of "Apache Avro Data Source Guide".;'

我也尝试过通过 spark 交互会话:

[ec2-user@ip-xxxx conf]$ sudo pyspark --packages org.apache.spark:spark-avro_2.12:2.4.2
Python 2.7.16 (default, Mar 18 2019, 18:38:44)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e8c82e1e-629a-4d83-844d-a86057fc5ae7;1.0
        confs: [default]
        found org.apache.spark#spark-avro_2.12;2.4.2 in central
        found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 209ms :: artifacts dl 6ms
        :: modules in use:
        org.apache.spark#spark-avro_2.12;2.4.2 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e8c82e1e-629a-4d83-844d-a86057fc5ae7
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/05/02 07:23:00 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/05/02 07:23:03 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-avro_2.12-2.4.2.jar added multiple times to distributed cache.
19/05/02 07:23:03 WARN Client: Same path resource file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 2.7.16 (default, Mar 18 2019 18:38:44)
SparkSession available as 'spark'.
>>> df = spark.createDataFrame(
...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
...     ("id", "v"))
>>> df.write.format("avro").save("avro_file.avro")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 736, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o83.save.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
        at java.util.ServiceLoader.fail(ServiceLoader.java:232)
        at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:244)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
        at org.apache.spark.sql.avro.AvroFileFormat.<init>(AvroFileFormat.scala:44)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
        ... 24 more

>>>

我也尝试过更新 /etc/spark/conf/spark-defaults.conf 以拥有

spark.jars.packages org.apache.spark:spark-avro_2.12:2.4.2, com.databricks:spark-csv_2.11:1.5.0

但是,发布此配置 jupyter notebook 无法启动 spark 并给出以下错误:

The code failed because of a fatal error:
    Session 4 did not start up in 60 seconds..


Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.

【问题讨论】:

你的 Spark 版本是什么? 我在配置 emr 时选择了 2.4.0,pyspark 横幅似乎也证实了这一点。我应该运行任何命令来检查(也许在笔记本中)?? 你需要以下依赖。 val sparkVersion = "2.4.0" "org.apache.spark" %% "spark-avro" % sparkVersion 如果你建议使用:org.apache.spark:spark-avro_2.12:2.4.0 那么它也给了提供者 org.apache.spark.sql.avro.AvroFileFormat 不能被实例化错误!! 【参考方案1】:

火花2.4.3

将 spark_arvo 版本返回到 org.apache.spark:spark-avro_2.11:2.4.3,为我解决了这个问题。

另外,在您的jupyter-notebook 中,在启动spark-context 之前添加以下行:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages   org.apache.spark:spark avro_2.11:2.4.3  pyspark-shell'

【讨论】:

注意:这里假设您运行 Spark 2.4.3,但问题是2.4.0 哦,抱歉错过了。我在 Spark 2.4.2 上遇到了与 spark-avro_2.12 相同的问题。感谢您指出这一点,编辑我的答案。 另外,2.122.11 取决于构建 Spark 的 Scala 版本;)

以上是关于如何在 jupyter notebook 中将 spark 数据帧写入 avro 文件格式?的主要内容,如果未能解决你的问题,请参考以下文章

在 jupyter notebook 中将自定义 jars 添加到 pyspark

在 Docker 容器中将 Spyder 连接到远程 Jupyter Notebook

在 jupyter notebook 中将代码单元更改为 markdown 单元的快捷键

无法在 Jupyter Notebook 中导入 TensorFlow

如何确保我的模型使用 jupyter notebook 中所有可用的 GPU

如何修改jupyter notebook的默认工作路径