通过 Apache-Spark 从 AWS S3 加载数据

Posted

技术标签:

【中文标题】通过 Apache-Spark 从 AWS S3 加载数据【英文标题】:Loading data from AWS S3 through Apache-Spark 【发布时间】:2015-10-13 01:13:00 【问题描述】:

我编写了一个 python 代码,通过 Apache-Spark 从 Amazon Web Service (AWS) S3 加载文件。具体来说,该代码使用SparkContext().wholeTextFiles("s3n://ruofan-bucket/data") 在AWS S3 上的我的存储桶ruofan-bucket 中创建RDD 并从目录data 加载所有csv 文件。代码如下:

import os, sys, inspect

### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]

### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir

### Setup pyspark directory path
pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
sys.path.append(pyspark_dir)

### Import the pyspark
from pyspark import SparkConf, SparkContext

def main():
    ### Initialize the SparkConf and SparkContext
    conf = SparkConf().setAppName("ruofan").setMaster("local")
    sc = SparkContext(conf = conf)

    ### Create a RDD containing metadata about files in directory "data"
    datafile = sc.wholeTextFiles("s3n://ruofan-bucket/data")    ### Read data directory from S3 storage.

    ### Collect files from the RDD
    datafile.collect()


if __name__ == "__main__":
    main()

在我运行我的代码之前,我已经导出了环境变量:AWS_SECRET_ACCESS_KEYAWS_ACCESS_KEY_ID。但是当我运行我的代码时,它会显示错误:

IOError: [Errno 2] No such file or directory: 's3n://ruofan-bucket/data/test1.csv'

我确定我在 AWS S3 上拥有该目录和文件,但我不知道该错误。如果有人帮助我解决问题,我真的很感激。

【问题讨论】:

您好,能否分享一下您是如何解决问题的? 【参考方案1】:

wholeTextFiles 似乎不适用于 Amazon S3。

见:

SparkContext.wholeTextFiles Doesn't work with S3 Buckets Processing whole files from S3 with Spark

但是,Hadoop 版本之间可能存在差异,所以不要认为它是确定的。

【讨论】:

@RuofanKong 能否请您发布您是如何解决您的问题的,以及您如何验证“sc.wholeTextFiles("s3n://ruofan-bucket/data")”【参考方案2】:

您可以尝试以下方法将数据从 S3 加载到 RDD 中,然后将结果循环并打印出来。使用 Spark SQL 后,您可以进行任何转换。

      val spark = SparkSession
    .builder()
    .appName("Spark SQL POC")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val sparkContext = spark.sparkContext

  sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
  sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
  sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secret)
  sparkContext.hadoopConfiguration.set("fs.s3.endpoint",region)

  // The schema is encoded in a string
  val schemaString = "country displayName alias status userName userID exportTime city title email registrationDate firstName lastName dateOfBirth address1 address2 address3 postCode telephoneNumber nickName exclusionExpiryDate exclusionPeriod blocked blockReason lastLoginDate gender mobileNumber marketingSms marketingEmail affiliateMarker depositLimitDate depositLimitPeriod depositLimitAmount depositLimitCurrency depositLimitWaitForUpdate depositLimitUpdatePeriod depositLimitUpdateAmount depositLimitUpdateCurrency userModified selfExclusionModified userBlockModified registeredBy userNote"

  // Generate the schema based on the string of schema
  val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))

  val schema = StructType(fields)

  var s3Users = spark.sqlContext.read.schema(schema).json("s3://asgaard-data/users/*/*/*/*/").rdd

  // Apply the schema to the RDD
  val usersDataFrame = spark.createDataFrame(s3Users, schema)

  // Creates a temporary view using the DataFrame
  usersDataFrame.createOrReplaceTempView("users")

  // SQL can be run over a temporary view created using DataFrames
  val results = spark.sql("SELECT userName FROM users limit 10")

  results.map(attributes => "UserName: " + attributes(0)).show()

版本如下

        <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency> 

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>

【讨论】:

以上是关于通过 Apache-Spark 从 AWS S3 加载数据的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 gsutil 通过 http 链接从谷歌云存储下载到 AWS 实例或 s3 存储桶?

如何从 aws-sdk 中的 s3 存储桶创建 Elastic Beanstalk 实例?

AWS:通过使用 amazon-data-pipeline 将数据从 S3 传输到 Redshift 来实现除 COPY 之外的其他功能

AWS S3日志文件通过服务器上传到elk

如何检查从 aws S3 到雪花的数据加载结果

从 EC2 运行时,sp​​ring-cloud-aws 无法将文件放入 S3