无法在 azure databricks 中使用 spark 读取 csv 文件

Posted

技术标签:

【中文标题】无法在 azure databricks 中使用 spark 读取 csv 文件【英文标题】:Unable to read csv file using spark read in azure databricks 【发布时间】:2020-06-01 08:50:07 【问题描述】:

我的数据位于 azure cosmos DB 中,我已将数据集安装在 azure databricks 上。

我可以使用 pandas 读取 csv 文件并将其加载到 spark 数据帧中。

df = pd.read_csv('/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
sdf = spark.createDataFrame(df)
sdf.head()

这适用于控制台的以下输出,我可以对此数据框进行进一步处理。

(1) Spark Jobs
sdf:pyspark.sql.dataframe.DataFrame = [Forest: string, LoadBalanceMoveReason: string ... 4 more fields]
Out[34]: Row(Forest='AUSP282', LoadBalanceMoveReason='DefaultEncryption', CompletionDate='5/26/2020 12:00:00 AM', efficiencyRopCount=None, efficiencySize=0.9966470723725392, efficiencyIOPS=None)

但是当我尝试使用 spark 数据帧直接读取文件时,它会因读取错误而失败。

df = spark.read.csv('/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
df

返回

Py4JJavaError                             Traceback (most recent call last)
<command-4117735793908621> in <module>
----> 1 df = spark.read.csv('/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')
      2 df

/databricks/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)
    533             path = [path]
    534         if type(path) == list:
--> 535             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536         elif isinstance(path, RDD):
    537             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     96     def deco(*a, **kw):
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:
    100             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling 012.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3781.csv.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at com.databricks.backend.daemon.driver.ClassLoaders$ReplWrappingClassLoader.loadClass(ClassLoaders.scala:65)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
    at java.util.ServiceLoader$LazyIterator.access$700(ServiceLoader.java:323)
    at java.util.ServiceLoader$LazyIterator$2.run(ServiceLoader.java:407)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:409)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:696)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:780)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:317)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:807)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

【问题讨论】:

【参考方案1】:

要通过 spark 方法从挂载的存储中读取,您不应包含 /dbfs 前缀:

df = spark.read.csv('/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv')

【讨论】:

那行不通。我得到同样的错误,df = spark.read.csv('/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv') 返回java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport【参考方案2】:

试试下面的,

df=spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").load("/dbfs/mnt/ajviswan/forest_efficiency/2020-04-26_2020-05-26.csv")
df.show()

编辑 #1 老实说,我相信您的集群创建中存在一些配置问题。 如果您的唯一目的是读取 COSMOS db 数据。那你可以试试下面的,

readConfig = 
  "Endpoint" : "https://<cosmos_end_point_name>.documents.azure.com:443/",
  "Masterkey" : "<master_key_value>",
  "Database" : "<database_name>",
  "preferredRegions" : "East US",
  "Collection": "<collection_name>",
  "schema_samplesize" : "1000",
  "query_pagesize" : "200000",
  "query_custom" : "SELECT * FROM c"

df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readConfig).load()

为此,在集群配置中的 Maven 包下添加 Spark CosmosDB 的 Maven 库。假设您的环境兼容,然后尝试使用 'com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:3.0.2'

【讨论】:

那个也不行,我还是一样的错误。 试试我编辑的第二种方法。也许它可以提供帮助。

以上是关于无法在 azure databricks 中使用 spark 读取 csv 文件的主要内容,如果未能解决你的问题,请参考以下文章

无法从 azure databricks 在 azure data Lake 中保存文件

无法从 databricks pyspark 工作人员写入 Azure Sql DataWarehouse

Databricks Connect:无法连接到 azure 上的远程集群,命令:“databricks-connect test”停止

无法使用 jdbc 和 spark 连接器从 databricks 集群连接到 Azure 数据库 for MySQL 服务器

Azure Databricks - 无法创建托管表关联的位置已存在

无法在 Azure Databricks 中创建具有 TIMESTAMP 数据类型的 Hive 表