处理存储在 RDD [String] 中的记录时,spark collect 方法花费了太多时间

Posted

技术标签:

【中文标题】处理存储在 RDD [String] 中的记录时,spark collect 方法花费了太多时间【英文标题】:spark collect method taking too much time when processing records stored in RDD[String] 【发布时间】:2017-02-27 13:00:21 【问题描述】:

我有一个要求,我必须从 S3 中提取 parquet 文件并对其进行处理并转换为另一种对象格式,并将其以 json 和 Parquet 格式存储在 S3 中。

我已经对此问题陈述进行了以下更改,但是调用 collect 语句时 Spark 作业花费了太多时间请让我知道如何优化它,下面是从 S3 读取 Parquet 文件的完整代码和处理它并将其存储到 S3。我对 Spark 和 BigData 技术非常陌生

package com.expedia.www.lambda

import java.io._

import com.amazonaws.ClientConfiguration
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ListObjectsRequest, ObjectListing
import com.expedia.hendrix.lambda.HotelInfosite
import com.expedia.www.hendrix.signals.definition.local.HotelInfoSignal
import com.expedia.www.options.HendrixHistoricalOfflineProcessorOptions
import com.expedia.www.user.interaction.v1.UserInteraction
import com.expedia.www.util._
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationFeature, ObjectMapper
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.slf4j.Logger, LoggerFactory

import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random


object  GenericLambdaMapper

  private def currentTimeMillis: Long = System.currentTimeMillis

  /** The below Generic mapper object is built for creating json similar to the Signal pushed by hendrix */
  def populateSignalRecord( genericRecord: GenericRecord, uisMessage: UserInteraction, signalType: String): HotelInfoSignal =

    val objectMapper:ObjectMapper = new ObjectMapper
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    objectMapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)

    val hotelInfoObject = objectMapper.readValue( genericRecord.toString, classOf[com.expedia.www.hendrix.signals.definition.local.HotelInfosite])
    val userKey = UserKeyUtil.createUserKey(uisMessage)
    val hotelInfoSignal:HotelInfoSignal = new HotelInfoSignal
    hotelInfoSignal.setSignalType(signalType)
    hotelInfoSignal.setData(hotelInfoObject)
    hotelInfoSignal.setUserKey(userKey)
    hotelInfoSignal.setGeneratedAtTimestamp(currentTimeMillis)
    return hotelInfoSignal

  


class GenericLambdaMapper extends Serializable

  var LOGGER:Logger = LoggerFactory.getLogger("GenericLambdaMapper")
  var bw : BufferedWriter  = null
  var fw :FileWriter = null
  val random: Random = new Random
  var counter: Int = 0
  var fileName: String= null
  val s3Util = new S3Util


  /** Object Mapper function for serializing and deserializing objects**/
  def objectMapper : ObjectMapper= 
    val mapper = new ObjectMapper
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
  


  def process(sparkContext: SparkContext, options: HendrixHistoricalOfflineProcessorOptions ): Unit =  //ObjectListing

    try 
      LOGGER.info("Start Date : "+options.startDate)
      LOGGER.info("END Date : "+options.endDate)
      val listOfFilePath: List[String] = DateTimeUtil.getDateRangeStrFromInput(options.startDate, options.endDate)
      /**Looping through each folder based on start and end date **/
      listOfFilePath.map(
        path => applyLambdaForGivenPathAndPushToS3Signal( sparkContext, path, options )
      )
    catch 
      case ex: Exception => 
        LOGGER.error( "Exception in downloading data :" + options.rawBucketName + options.rawS3UploadRootFolder  + options.startDate)
        LOGGER.error("Stack Trace :"+ExceptionUtils.getFullStackTrace(ex))
      
    

  

  // TODO: Currently the Lambda is hardcoded only to HotelInfoSite to be made generic
  def prepareUisObjectAndApplyLambda(uisMessage: UserInteraction, options: HendrixHistoricalOfflineProcessorOptions): List[GenericRecord] = 
    try 
      val schemaDefinition = Source.fromInputStream(getClass.getResourceAsStream("/"+options.avroSchemaName)).getLines.mkString("\n")
      val schemaHotelInfo = new Schema.Parser().parse(schemaDefinition)
      HotelInfosite.apply(uisMessage, schemaHotelInfo).toList
    catch 
      case ex: Exception =>  LOGGER.error("Exception while preparing UIS Object" + ex.toString)
        List.empty
    

  

  /** Below method is used to extract userInteraction Data from Raw file **/
  private def constructUisObject(uisMessageRaw: String): UserInteraction = objectMapper.readValue( uisMessageRaw, classOf[UserInteraction])


  /** Below function contains logic to apply the lambda for the given range of dates and push to signals folder in S3 **/
  def applyLambdaForGivenPathAndPushToS3Signal(sparkContext: SparkContext, dateFolderPath: String, options: HendrixHistoricalOfflineProcessorOptions ): Unit =

    var awsS3Client: AmazonS3Client = null;

    try 

      if ("sandbox".equals(options.environment)) 
        val clientConfiguration = new ClientConfiguration()
          .withConnectionTimeout(options.awsConnectionTimeout)
          .withSocketTimeout(options.awsSocketTimeout)
          .withTcpKeepAlive(true)

        awsS3Client = S3Client.getAWSConnection(options.awsS3AccessKey, options.awsS3SecretKey, clientConfiguration)
       else 
        awsS3Client = S3Client.getAWSConnection
      

      /** Validate if destination path has any gzip file if so then just skip that date and process next record **/
      LOGGER.info("Validating if the destination folder path is empty: " + dateFolderPath)
      var objectListing: ObjectListing = null
      var listObjectsRequest: ListObjectsRequest = new ListObjectsRequest().withBucketName(options.destinationBucketName).withPrefix(options.s3SignalRootFolder + options.signalType + "/" + dateFolderPath.toString)
      objectListing = awsS3Client.listObjects(listObjectsRequest)
      if (objectListing.getObjectSummaries.size > 0) 
        LOGGER.warn("Record already present at the below location, so skipping the processing of record for the folder path :" + dateFolderPath.toString)
        LOGGER.warn("s3n://" + options.destinationBucketName + "/" + options.s3SignalRootFolder + options.signalType + "/" + dateFolderPath.toString)
        return
      
      LOGGER.info("Validated the destination folder path :" + dateFolderPath + " and found it to be empty ")
      /** End of validation **/


      /*Selecting all the files under the source path and iterating*/
      counter = 0
      listObjectsRequest = new ListObjectsRequest().withBucketName(options.rawBucketName).withPrefix(options.rawS3UploadRootFolder + dateFolderPath.toString)
      objectListing = awsS3Client.listObjects(listObjectsRequest)
      val rddListOfParquetFileNames = objectListing.getObjectSummaries.asScala.map(_.getKey).toList
      rddListOfParquetFileNames.flatMapkey =>  processIndividualParquetFileAndUploadToS3(sparkContext, awsS3Client, options, key, dateFolderPath)
                                                 "COMPLETED Processing=>"+key;
                                                

    catch
      case ex: Exception =>
        LOGGER.error("Exception occured while processing records for the path " + dateFolderPath)
        LOGGER.error("Exception in Apply Lambda method Message :" + ex.getMessage + "\n Stack Trace :" + ex.getStackTrace)
    finally 
      awsS3Client.shutdown
      LOGGER.info("JOB Complete ")
    
  

  def processIndividualParquetFileAndUploadToS3(sparkContext: SparkContext, awsS3Client: AmazonS3Client, options: HendrixHistoricalOfflineProcessorOptions, parquetFilePath:String, dateFolderPath:String ):Unit =

    try
      LOGGER.info("Currently Processing the Parquet file: "+parquetFilePath)

      LOGGER.info("Starting to reading Parquet File Start Time: "+System.currentTimeMillis)
      val dataSetString: RDD[String] = ParquetHelper.readParquetData(sparkContext, options, parquetFilePath)
      LOGGER.info("Data Set returned from Parquet file Successful Time: "+System.currentTimeMillis)

      val lambdaSignalRecords: Array[HotelInfoSignal] = dataSetString.map(x => constructUisObject(x))
        .filter(_ != null)
        .map(userInteraction => processIndividualRecords(userInteraction, options))
        .filter(_ != null)
        .collect

      LOGGER.info("Successfully Generated "+lambdaSignalRecords.length+" Signal Records")

      if(lambdaSignalRecords.length > 0) 

        //Write to Paraquet File :Start
        val parquetFileName: String = getFileNameForParquet(dateFolderPath, counter)
        val parquetWriter = ParquetHelper.newParquetWriter(HotelInfoSignal.getClassSchema, dateFolderPath, parquetFileName, options)
        LOGGER.info("Initialized Parquet Writer")
        lambdaSignalRecords.map(signalRecord => parquetWriter.write(signalRecord))
        LOGGER.info("Completed writing the data in Parquet format")
        parquetWriter.close
        //Parquet Write Complete

        /*val avroSignalString = lambdaSignalRecords.mkString("\n")
        val sparkSession = SparkSession.builder.getOrCreate
        uploadProceessedDataToS3(sparkSession, awsS3Client, dateFolderPath, avroSignalString, options)
*/      

    catch case ex:Exception =>
      LOGGER.error("Skipping processing of record :"+parquetFilePath+" because of Exception: "+ExceptionUtils.getFullStackTrace(ex))
    
    LOGGER.info("Completed data processing for file :" + options.rawBucketName + options.rawS3UploadRootFolder + parquetFilePath)

  

  def uploadProceessedDataToS3(sparkSession:SparkSession, awsS3Client: AmazonS3Client, filePath: String, genericSignalRecords: String, options: HendrixHistoricalOfflineProcessorOptions):Unit =

    var jsonFile: File = null
    var gzFile: File = null

    try 
      //Building the file name based on the folder accessed
      fileName = getFileName (filePath, counter)
      jsonFile = IOUtil.createS3JsonFile (genericSignalRecords, fileName)
      gzFile =  IOUtil.gzipIt (jsonFile)
      s3Util.uploadToS3(awsS3Client, options.destinationBucketName, options.s3SignalRootFolder + options.signalType + "/" + filePath, gzFile)
      counter += 1 //Incement counter
     catch 
      case ex: RuntimeException => LOGGER.error ("Exception while uploading file to path :" + options.s3SignalRootFolder + options.signalType + "/" + filePath + "/" + fileName)
        LOGGER.error ("Stack Trace for S3 Upload :" + ExceptionUtils.getFullStackTrace(ex))
     finally 
      //Cleaning the temp file created after upload to s3, we can create a temp dir if required.
      jsonFile.delete
      gzFile.delete
    
  


  def processIndividualRecords(userInteraction: UserInteraction, options: HendrixHistoricalOfflineProcessorOptions): HotelInfoSignal =
    try 
      //Applying lambda for the indivisual UserInteraction
      val list: List[GenericRecord] = prepareUisObjectAndApplyLambda (userInteraction, options)
      if (list.nonEmpty) return GenericLambdaMapper.populateSignalRecord (list.head, userInteraction, options.signalType)
     catch  case ex: Exception => LOGGER.error ("Error while creating signal record from UserInteraction for Singal Type :"+ options.signalType +" For Interaction "+userInteraction.toString)
      LOGGER.error ("Stack Trace while processIndividualRecords :" + ExceptionUtils.getFullStackTrace(ex))
    null
  


  /** This method is used to prepare the exact file name which has processed date and the no of files counter **/
  def getFileName(filePath : String, counter : Int): String = 
    filePath.replace("/","-")+"_"+counter+"_"+random.alphanumeric.take(5).mkString+".json"
  

  /** This method is used to prepare the exact file name which has processed date and the no of files counter **/
  def getFileNameForParquet(filePath : String, counter : Int): String = 
    filePath.replace("/","-")+"_"+counter+"_"+random.alphanumeric.take(5).mkString+".parquet"
  





package com.expedia.www.util

import com.expedia.www.options.HendrixHistoricalOfflineProcessorOptions
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter, AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.ParquetFileWriter, ParquetWriter
import org.apache.parquet.schema.MessageType
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.Logger, LoggerFactory

/**
  * Created by prasubra on 2/17/17.
  */

object ParquetHelper 

  val LOGGER:Logger = LoggerFactory.getLogger("ParquetHelper")
  def newParquetWriter(signalSchema: Schema, folderPath:String, fileName:String, options:HendrixHistoricalOfflineProcessorOptions): ParquetWriter[GenericRecord] = 
    val blockSize: Int = 256 * 1024 * 1024
    val pageSize: Int = 64 * 1024

    val compressionCodec = if (options.parquetCompressionToGzip) CompressionCodecName.GZIP else CompressionCodecName.UNCOMPRESSED
    val path: Path = new Path("s3n://" + options.destinationBucketName + "/" + options.parquetSignalFolderName + options.signalType + "/" + folderPath + "/" + fileName);
    val parquetSchema: MessageType = new AvroSchemaConverter().convert(signalSchema);
    // var writeSupport:WriteSupport = new AvroWriteSupport(parquetSchema, signalSchema);
    //(path, writeSupport, compressionCodec, blockSize, pageSize)
    //var parquetWriter:ParquetWriter[GenericRecord] = new ParquetWriter(path, writeSupport, compressionCodec, blockSize, pageSize);

    if ("sandbox".equals(options.environment)) 
      val hadoopConf = new Configuration
      hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
      hadoopConf.set("fs.s3n.awsAccessKeyId", options.awsS3AccessKey)
      hadoopConf.set("fs.s3n.awsSecretAccessKey", options.awsS3SecretKey)
      hadoopConf.set("fs.s3n.maxRetries", options.awsFileReaderRetry)

      AvroParquetWriter.builder(path)
        .withSchema(signalSchema)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .withCompressionCodec(compressionCodec)
        .withConf(hadoopConf)
        .build()
     else 

      AvroParquetWriter.builder(path)
        .withSchema(signalSchema)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .withCompressionCodec(compressionCodec)
        .withPageSize(pageSize)
        .build()
    
  

  def readParquetData(sc: SparkContext, options: HendrixHistoricalOfflineProcessorOptions, filePath: String): RDD[String] = 
    val filePathOfParquet = "s3n://"+options.rawBucketName+"/"+ filePath
    LOGGER.info("Reading Parquet file from path :"+filePathOfParquet)
    val sparkSession  = SparkSession.builder.getOrCreate
    val dataFrame = sparkSession.sqlContext.read.parquet(filePathOfParquet)
    //dataFrame.printSchema()
    dataFrame.toJSON.rdd
  

【问题讨论】:

【参考方案1】:

首先,您确实应该使用最少的代码示例来改进您的问题。真的很难看到你的代码中发生了什么......

Collect 将 RDD 的所有元素检索到驱动程序上的单个 RDD 中。如果您的 RDD 很大,那么这当然会花费很多时间(如果内容不适合驱动程序的主内存,可能会导致 OutOfMemeoryError)。

您可以使用 parquet 直接write Dataframe/Dataset 的内容。这肯定会更快、更具可扩展性。

【讨论】:

【参考方案2】:

使用 s3a:// 网址。 S3n// 有一个 bug 会严重影响 ORC/Parquet 的性能,现在已被 s3a 取代

【讨论】:

感谢史蒂夫您的输入将更改为 s3a 并监控性能

以上是关于处理存储在 RDD [String] 中的记录时,spark collect 方法花费了太多时间的主要内容,如果未能解决你的问题,请参考以下文章

将一个 RDD 拆分为多个 RDDS

spark的灵魂:RDD和DataSet

Spark RDD 分布式弹性数据集

将 String RDD 转换为 Int RDD

Spark中的“RDD可以存储在内存中”是啥意思?

如何从 Apache Spark 中的单个文件记录创建多个 RDD 行