处理存储在 RDD [String] 中的记录时,spark collect 方法花费了太多时间
Posted
技术标签:
【中文标题】处理存储在 RDD [String] 中的记录时,spark collect 方法花费了太多时间【英文标题】:spark collect method taking too much time when processing records stored in RDD[String] 【发布时间】:2017-02-27 13:00:21 【问题描述】:我有一个要求,我必须从 S3 中提取 parquet 文件并对其进行处理并转换为另一种对象格式,并将其以 json 和 Parquet 格式存储在 S3 中。
我已经对此问题陈述进行了以下更改,但是调用 collect 语句时 Spark 作业花费了太多时间请让我知道如何优化它,下面是从 S3 读取 Parquet 文件的完整代码和处理它并将其存储到 S3。我对 Spark 和 BigData 技术非常陌生
package com.expedia.www.lambda
import java.io._
import com.amazonaws.ClientConfiguration
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ListObjectsRequest, ObjectListing
import com.expedia.hendrix.lambda.HotelInfosite
import com.expedia.www.hendrix.signals.definition.local.HotelInfoSignal
import com.expedia.www.options.HendrixHistoricalOfflineProcessorOptions
import com.expedia.www.user.interaction.v1.UserInteraction
import com.expedia.www.util._
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationFeature, ObjectMapper
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.slf4j.Logger, LoggerFactory
import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
object GenericLambdaMapper
private def currentTimeMillis: Long = System.currentTimeMillis
/** The below Generic mapper object is built for creating json similar to the Signal pushed by hendrix */
def populateSignalRecord( genericRecord: GenericRecord, uisMessage: UserInteraction, signalType: String): HotelInfoSignal =
val objectMapper:ObjectMapper = new ObjectMapper
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
objectMapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
val hotelInfoObject = objectMapper.readValue( genericRecord.toString, classOf[com.expedia.www.hendrix.signals.definition.local.HotelInfosite])
val userKey = UserKeyUtil.createUserKey(uisMessage)
val hotelInfoSignal:HotelInfoSignal = new HotelInfoSignal
hotelInfoSignal.setSignalType(signalType)
hotelInfoSignal.setData(hotelInfoObject)
hotelInfoSignal.setUserKey(userKey)
hotelInfoSignal.setGeneratedAtTimestamp(currentTimeMillis)
return hotelInfoSignal
class GenericLambdaMapper extends Serializable
var LOGGER:Logger = LoggerFactory.getLogger("GenericLambdaMapper")
var bw : BufferedWriter = null
var fw :FileWriter = null
val random: Random = new Random
var counter: Int = 0
var fileName: String= null
val s3Util = new S3Util
/** Object Mapper function for serializing and deserializing objects**/
def objectMapper : ObjectMapper=
val mapper = new ObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
def process(sparkContext: SparkContext, options: HendrixHistoricalOfflineProcessorOptions ): Unit = //ObjectListing
try
LOGGER.info("Start Date : "+options.startDate)
LOGGER.info("END Date : "+options.endDate)
val listOfFilePath: List[String] = DateTimeUtil.getDateRangeStrFromInput(options.startDate, options.endDate)
/**Looping through each folder based on start and end date **/
listOfFilePath.map(
path => applyLambdaForGivenPathAndPushToS3Signal( sparkContext, path, options )
)
catch
case ex: Exception =>
LOGGER.error( "Exception in downloading data :" + options.rawBucketName + options.rawS3UploadRootFolder + options.startDate)
LOGGER.error("Stack Trace :"+ExceptionUtils.getFullStackTrace(ex))
// TODO: Currently the Lambda is hardcoded only to HotelInfoSite to be made generic
def prepareUisObjectAndApplyLambda(uisMessage: UserInteraction, options: HendrixHistoricalOfflineProcessorOptions): List[GenericRecord] =
try
val schemaDefinition = Source.fromInputStream(getClass.getResourceAsStream("/"+options.avroSchemaName)).getLines.mkString("\n")
val schemaHotelInfo = new Schema.Parser().parse(schemaDefinition)
HotelInfosite.apply(uisMessage, schemaHotelInfo).toList
catch
case ex: Exception => LOGGER.error("Exception while preparing UIS Object" + ex.toString)
List.empty
/** Below method is used to extract userInteraction Data from Raw file **/
private def constructUisObject(uisMessageRaw: String): UserInteraction = objectMapper.readValue( uisMessageRaw, classOf[UserInteraction])
/** Below function contains logic to apply the lambda for the given range of dates and push to signals folder in S3 **/
def applyLambdaForGivenPathAndPushToS3Signal(sparkContext: SparkContext, dateFolderPath: String, options: HendrixHistoricalOfflineProcessorOptions ): Unit =
var awsS3Client: AmazonS3Client = null;
try
if ("sandbox".equals(options.environment))
val clientConfiguration = new ClientConfiguration()
.withConnectionTimeout(options.awsConnectionTimeout)
.withSocketTimeout(options.awsSocketTimeout)
.withTcpKeepAlive(true)
awsS3Client = S3Client.getAWSConnection(options.awsS3AccessKey, options.awsS3SecretKey, clientConfiguration)
else
awsS3Client = S3Client.getAWSConnection
/** Validate if destination path has any gzip file if so then just skip that date and process next record **/
LOGGER.info("Validating if the destination folder path is empty: " + dateFolderPath)
var objectListing: ObjectListing = null
var listObjectsRequest: ListObjectsRequest = new ListObjectsRequest().withBucketName(options.destinationBucketName).withPrefix(options.s3SignalRootFolder + options.signalType + "/" + dateFolderPath.toString)
objectListing = awsS3Client.listObjects(listObjectsRequest)
if (objectListing.getObjectSummaries.size > 0)
LOGGER.warn("Record already present at the below location, so skipping the processing of record for the folder path :" + dateFolderPath.toString)
LOGGER.warn("s3n://" + options.destinationBucketName + "/" + options.s3SignalRootFolder + options.signalType + "/" + dateFolderPath.toString)
return
LOGGER.info("Validated the destination folder path :" + dateFolderPath + " and found it to be empty ")
/** End of validation **/
/*Selecting all the files under the source path and iterating*/
counter = 0
listObjectsRequest = new ListObjectsRequest().withBucketName(options.rawBucketName).withPrefix(options.rawS3UploadRootFolder + dateFolderPath.toString)
objectListing = awsS3Client.listObjects(listObjectsRequest)
val rddListOfParquetFileNames = objectListing.getObjectSummaries.asScala.map(_.getKey).toList
rddListOfParquetFileNames.flatMapkey => processIndividualParquetFileAndUploadToS3(sparkContext, awsS3Client, options, key, dateFolderPath)
"COMPLETED Processing=>"+key;
catch
case ex: Exception =>
LOGGER.error("Exception occured while processing records for the path " + dateFolderPath)
LOGGER.error("Exception in Apply Lambda method Message :" + ex.getMessage + "\n Stack Trace :" + ex.getStackTrace)
finally
awsS3Client.shutdown
LOGGER.info("JOB Complete ")
def processIndividualParquetFileAndUploadToS3(sparkContext: SparkContext, awsS3Client: AmazonS3Client, options: HendrixHistoricalOfflineProcessorOptions, parquetFilePath:String, dateFolderPath:String ):Unit =
try
LOGGER.info("Currently Processing the Parquet file: "+parquetFilePath)
LOGGER.info("Starting to reading Parquet File Start Time: "+System.currentTimeMillis)
val dataSetString: RDD[String] = ParquetHelper.readParquetData(sparkContext, options, parquetFilePath)
LOGGER.info("Data Set returned from Parquet file Successful Time: "+System.currentTimeMillis)
val lambdaSignalRecords: Array[HotelInfoSignal] = dataSetString.map(x => constructUisObject(x))
.filter(_ != null)
.map(userInteraction => processIndividualRecords(userInteraction, options))
.filter(_ != null)
.collect
LOGGER.info("Successfully Generated "+lambdaSignalRecords.length+" Signal Records")
if(lambdaSignalRecords.length > 0)
//Write to Paraquet File :Start
val parquetFileName: String = getFileNameForParquet(dateFolderPath, counter)
val parquetWriter = ParquetHelper.newParquetWriter(HotelInfoSignal.getClassSchema, dateFolderPath, parquetFileName, options)
LOGGER.info("Initialized Parquet Writer")
lambdaSignalRecords.map(signalRecord => parquetWriter.write(signalRecord))
LOGGER.info("Completed writing the data in Parquet format")
parquetWriter.close
//Parquet Write Complete
/*val avroSignalString = lambdaSignalRecords.mkString("\n")
val sparkSession = SparkSession.builder.getOrCreate
uploadProceessedDataToS3(sparkSession, awsS3Client, dateFolderPath, avroSignalString, options)
*/
catch case ex:Exception =>
LOGGER.error("Skipping processing of record :"+parquetFilePath+" because of Exception: "+ExceptionUtils.getFullStackTrace(ex))
LOGGER.info("Completed data processing for file :" + options.rawBucketName + options.rawS3UploadRootFolder + parquetFilePath)
def uploadProceessedDataToS3(sparkSession:SparkSession, awsS3Client: AmazonS3Client, filePath: String, genericSignalRecords: String, options: HendrixHistoricalOfflineProcessorOptions):Unit =
var jsonFile: File = null
var gzFile: File = null
try
//Building the file name based on the folder accessed
fileName = getFileName (filePath, counter)
jsonFile = IOUtil.createS3JsonFile (genericSignalRecords, fileName)
gzFile = IOUtil.gzipIt (jsonFile)
s3Util.uploadToS3(awsS3Client, options.destinationBucketName, options.s3SignalRootFolder + options.signalType + "/" + filePath, gzFile)
counter += 1 //Incement counter
catch
case ex: RuntimeException => LOGGER.error ("Exception while uploading file to path :" + options.s3SignalRootFolder + options.signalType + "/" + filePath + "/" + fileName)
LOGGER.error ("Stack Trace for S3 Upload :" + ExceptionUtils.getFullStackTrace(ex))
finally
//Cleaning the temp file created after upload to s3, we can create a temp dir if required.
jsonFile.delete
gzFile.delete
def processIndividualRecords(userInteraction: UserInteraction, options: HendrixHistoricalOfflineProcessorOptions): HotelInfoSignal =
try
//Applying lambda for the indivisual UserInteraction
val list: List[GenericRecord] = prepareUisObjectAndApplyLambda (userInteraction, options)
if (list.nonEmpty) return GenericLambdaMapper.populateSignalRecord (list.head, userInteraction, options.signalType)
catch case ex: Exception => LOGGER.error ("Error while creating signal record from UserInteraction for Singal Type :"+ options.signalType +" For Interaction "+userInteraction.toString)
LOGGER.error ("Stack Trace while processIndividualRecords :" + ExceptionUtils.getFullStackTrace(ex))
null
/** This method is used to prepare the exact file name which has processed date and the no of files counter **/
def getFileName(filePath : String, counter : Int): String =
filePath.replace("/","-")+"_"+counter+"_"+random.alphanumeric.take(5).mkString+".json"
/** This method is used to prepare the exact file name which has processed date and the no of files counter **/
def getFileNameForParquet(filePath : String, counter : Int): String =
filePath.replace("/","-")+"_"+counter+"_"+random.alphanumeric.take(5).mkString+".parquet"
package com.expedia.www.util
import com.expedia.www.options.HendrixHistoricalOfflineProcessorOptions
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter, AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.ParquetFileWriter, ParquetWriter
import org.apache.parquet.schema.MessageType
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.Logger, LoggerFactory
/**
* Created by prasubra on 2/17/17.
*/
object ParquetHelper
val LOGGER:Logger = LoggerFactory.getLogger("ParquetHelper")
def newParquetWriter(signalSchema: Schema, folderPath:String, fileName:String, options:HendrixHistoricalOfflineProcessorOptions): ParquetWriter[GenericRecord] =
val blockSize: Int = 256 * 1024 * 1024
val pageSize: Int = 64 * 1024
val compressionCodec = if (options.parquetCompressionToGzip) CompressionCodecName.GZIP else CompressionCodecName.UNCOMPRESSED
val path: Path = new Path("s3n://" + options.destinationBucketName + "/" + options.parquetSignalFolderName + options.signalType + "/" + folderPath + "/" + fileName);
val parquetSchema: MessageType = new AvroSchemaConverter().convert(signalSchema);
// var writeSupport:WriteSupport = new AvroWriteSupport(parquetSchema, signalSchema);
//(path, writeSupport, compressionCodec, blockSize, pageSize)
//var parquetWriter:ParquetWriter[GenericRecord] = new ParquetWriter(path, writeSupport, compressionCodec, blockSize, pageSize);
if ("sandbox".equals(options.environment))
val hadoopConf = new Configuration
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", options.awsS3AccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey", options.awsS3SecretKey)
hadoopConf.set("fs.s3n.maxRetries", options.awsFileReaderRetry)
AvroParquetWriter.builder(path)
.withSchema(signalSchema)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(compressionCodec)
.withConf(hadoopConf)
.build()
else
AvroParquetWriter.builder(path)
.withSchema(signalSchema)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(compressionCodec)
.withPageSize(pageSize)
.build()
def readParquetData(sc: SparkContext, options: HendrixHistoricalOfflineProcessorOptions, filePath: String): RDD[String] =
val filePathOfParquet = "s3n://"+options.rawBucketName+"/"+ filePath
LOGGER.info("Reading Parquet file from path :"+filePathOfParquet)
val sparkSession = SparkSession.builder.getOrCreate
val dataFrame = sparkSession.sqlContext.read.parquet(filePathOfParquet)
//dataFrame.printSchema()
dataFrame.toJSON.rdd
【问题讨论】:
【参考方案1】:首先,您确实应该使用最少的代码示例来改进您的问题。真的很难看到你的代码中发生了什么......
Collect 将 RDD 的所有元素检索到驱动程序上的单个 RDD 中。如果您的 RDD 很大,那么这当然会花费很多时间(如果内容不适合驱动程序的主内存,可能会导致 OutOfMemeoryError
)。
您可以使用 parquet 直接write Dataframe
/Dataset
的内容。这肯定会更快、更具可扩展性。
【讨论】:
【参考方案2】:使用 s3a:// 网址。 S3n// 有一个 bug 会严重影响 ORC/Parquet 的性能,现在已被 s3a 取代
【讨论】:
感谢史蒂夫您的输入将更改为 s3a 并监控性能以上是关于处理存储在 RDD [String] 中的记录时,spark collect 方法花费了太多时间的主要内容,如果未能解决你的问题,请参考以下文章