Spark:使用 Spark 处理大量文件的数据显示 SocketException:读取超时

Posted

技术标签:

【中文标题】Spark:使用 Spark 处理大量文件的数据显示 SocketException:读取超时【英文标题】:Spark : Data processing using Spark for large number of files says SocketException : Read timed out 【发布时间】:2015-04-09 13:24:21 【问题描述】:

我在两台具有这些配置的机器上以独立模式运行 Spark

    500GB 内存,4 核,7.5 RAM 250GB 内存,8 核,15 RAM

我在 8core 机器上创建了一个 master 和一个 slave,给 worker 7 个 core。我在具有 3 个工作核心的 4 核机器上创建了另一个从站。 UI 显示 8 核和 4 核的可用 RAM 分别为 13.7 和 6.5 G。

现在,我必须在 15 天内处理用户评分的汇总。我正在尝试使用 Pyspark 来做到这一点 此数据存储在 s3 存储桶中按天目录中的按小时文件中,每个文件必须在 100MB 左右,例如

s3://some_bucket/2015-04/2015-04-09/data_files_hour1

我正在阅读这样的文件

a = sc.textFile(files, 15).coalesce(7*sc.defaultParallelism) #to restrict partitions

其中 files 是这种形式的字符串 's3://some_bucket/2015-04/2015-04-09/*,s3://some_bucket/2015-04/2015-04-09/* '

然后我做一系列的地图和过滤器并持久化结果

a.persist(StorageLevel.MEMORY_ONLY_SER)

然后我需要做一个 reduceByKey 来获得几天内的总分。

b = a.reduceByKey(lambda x, y: x+y).map(aggregate)
b.persist(StorageLevel.MEMORY_ONLY_SER)

然后我需要为用户评分的项目的实际术语进行redis调用,所以我这样调用mapPartitions

final_scores = b.mapPartitions(get_tags)

get_tags 函数每次调用都会创建一个 redis 连接并调用 redis 并生成一个 (user, item, rate) 元组 (redis的hash存放在4core中)

我已将 SparkConf 的设置调整为

conf = (SparkConf().setAppName(APP_NAME).setMaster(master)
        .set("spark.executor.memory", "5g")
        .set("spark.akka.timeout", "10000")
        .set("spark.akka.frameSize", "1000")
        .set("spark.task.cpus", "5")
        .set("spark.cores.max", "10")
        .set("spark.serializer",      "org.apache.spark.serializer.KryoSerializer")
        .set("spark.kryoserializer.buffer.max.mb", "10")
        .set("spark.shuffle.consolidateFiles", "True")
        .set("spark.files.fetchTimeout", "500")
        .set("spark.task.maxFailures", "5"))

我在客户端模式下使用 2g 的驱动程序内存运行该作业,因为这里似乎不支持集群模式。 上述过程需要很长时间才能获取 2 天的数据(大约 2.5 小时)并完全放弃 14 天。

这里有什么需要改进的地方?

    这个基础设施在 RAM 和内核方面是否不足(这是离线的,可能需要几个小时,但必须在 5 小时左右完成) 我应该增加/减少分区数吗? Redis 可能会降低系统速度,但键的数量太大,无法一次性调用。 我不确定任务失败的地方是读取文件还是减少。 我不应该使用 Python,因为在 Scala 中有更好的 Spark API,这也有助于提高效率吗?

这是异常跟踪

Lost task 4.1 in stage 0.0 (TID 11, <node>): java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
    at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
    at sun.security.ssl.InputRecord.read(InputRecord.java:509)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
    at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
    at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
    at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:164)
    at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:227)
    at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
    at org.apache.http.util.EntityUtils.consume(EntityUtils.java:88)
    at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.releaseConnection(HttpMethodReleaseInputStream.java:102)
    at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.close(HttpMethodReleaseInputStream.java:194)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.seek(NativeS3FileSystem.java:152)
    at org.apache.hadoop.fs.BufferedFSInputStream.seek(BufferedFSInputStream.java:89)
    at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:63)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:126)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:236)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
    at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
    at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)

我真的需要一些帮助,在此先感谢

这是我的主要代码的样子

def main(sc): f=get_files() a=sc.textFile(f, 15) .coalesce(7*sc.defaultParallelism) .map(lambda line: line.split(",")) .filter(len(line)>0) .map(lambda line: (line[18], line[2], line[13], line[15])).map(scoring) .map(lambda line: ((line[0], line[1]), line[2])).persist(StorageLevel.MEMORY_ONLY_SER) b=a.reduceByKey(lambda x, y: x+y).map(aggregate) b.persist(StorageLevel.MEMORY_ONLY_SER) c=taggings.mapPartitions(get_tags) c.saveAsTextFile("f") a.unpersist() b.unpersist()

get_tags 函数是

def get_tags(partition):
 rh = redis.Redis(host=settings['REDIS_HOST'], port=settings['REDIS_PORT'], db=0)
 for element in partition:
    user = element[0]
    song = element[1]
    rating = element[2]
    tags = rh.hget(settings['REDIS_HASH'], song)
    if tags:
        tags = json.loads(tags)
    else:
        tags = scrape(song, rh)
    if tags:
        for tag in tags:
            yield (user, tag, rating)

get_files 函数如下:

def get_files():
 paths = get_path_from_dates(DAYS)
 base_path = 's3n://acc_key:sec_key@bucket/'
 files = list()
 for path in paths:
    fle = base_path+path+'/file_format.*'
    files.append(fle)
 return ','.join(files)

get_path_from_dates(DAYS) 是

def get_path_from_dates(last):
 days = list()
 t = 0
 while t <= last:
    d = today - timedelta(days=t)
    path = d.strftime('%Y-%m')+'/'+d.strftime('%Y-%m-%d')
    days.append(path)
    t += 1
 return days

【问题讨论】:

这个问题有很多细节。我建议您将其分解为几个单独的问题。例如,您可以选择一个问题来帮助您解决异常,而另一个问题来帮助您提高性能。 谢谢,我同意,但我自己一直无法深入研究原因和解决方案。因此,我提供了整个上下文,以便对问题感兴趣的任何人都能获得整个上下文并做出相应的回答。它可能看起来像一个性能调整代码审查,但我非常需要它:( 我很欣赏你的情况。我会尽力提供帮助。关于性能问题,从单个输入拆分开始(我假设这是一个 100MB 的文件)。您能否调整在您的机器上本地运行的作业(即删除 AWS 组件)?你的代码高效吗?通常这就是效率低下的地方。你能分享更多你的代码吗? @Myles Baker,PFA 部分 MR 工作 为了优化,我可以广播redis-connection吗? 【参考方案1】:

作为一个小的优化,我创建了两个独立的任务,一个从 s3 读取并获得加和,第二个从 redis 读取转换。第一个任务有很多分区,因为要读取大约 2300 个文件。第二个分区的数量要少得多,以防止 redis 连接延迟,并且只有一个文件要读取,该文件位于 EC2 集群本身上。这只是部分内容,仍在寻找改进建议...

【讨论】:

【参考方案2】:

我在一个类似的用例中:在具有 300,000 多个分区的 RDD 上执行 coalesce。不同之处在于我使用的是 s3a(来自S3AFileSystem.waitAysncCopySocketTimeoutException)。最后通过设置更大的fs.s3a.connection.timeout(Hadoop 的core-site.xml)解决了这个问题。希望你能得到一个线索。

【讨论】:

以上是关于Spark:使用 Spark 处理大量文件的数据显示 SocketException:读取超时的主要内容,如果未能解决你的问题,请参考以下文章

Spark核心编程---创建RDD

Spark 处理非结构化文件

大数据处理为何选择spark?

Spark 处理小文件

【大数据】Spark 递归读取 HDFS

Apache Spark 和 Hudi:大量的输出文件