Spark Streaming:微批处理并行执行
Posted
技术标签:
【中文标题】Spark Streaming:微批处理并行执行【英文标题】:Spark Streaming: Micro batches Parallel Execution 【发布时间】:2017-12-18 11:32:40 【问题描述】:我们正在从 Kafka 接收火花流数据。一旦在 Spark Streaming 中开始执行,它只执行一个批次,其余批次开始在 Kafka 中排队。
我们的数据是独立的,可以并行处理。
我们尝试了多个配置,包括多个执行器、核心、背压和其他配置,但到目前为止没有任何效果。队列中有很多消息,一次只处理了一个微批次,其余消息仍在队列中。
我们希望最大程度地实现并行性,这样就没有任何微批处理排队,因为我们有足够的可用资源。那么我们如何通过最大限度地利用资源来减少时间。
// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
sparkServiceConf.getKafkaConsumeParams()));
ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());
JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>()
private static final long serialVersionUID = 1L;
@Override
public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception
return kafkaRecord.value();
);
// Decode each binary message and generate JSON array
JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>()
private static final long serialVersionUID = 1L;
@Override
public String call(byte[] asn1Data) throws Exception
if(asn1Data.length > 0)
try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
Writer writer = new StringWriter(); )
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
byte[] buffer = new byte[1024];
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int len;
while((len = gzipInputStream.read(buffer)) != -1)
byteArrayOutputStream.write(buffer, 0, len);
return new String(byteArrayOutputStream.toByteArray());
catch (Exception e)
//
producer.flush();
throw e;
return null;
);
// publish generated json gzip to kafka
cache.foreachRDD(new VoidFunction<JavaRDD<String>>()
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> jsonRdd4DF) throws Exception
//Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
if(!jsonRdd4DF.isEmpty())
//JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
airMainJsonProcessor.processAIRData(json, sparkSession);
);
getJavaStreamingContext().start();
getJavaStreamingContext().awaitTermination();
getJavaStreamingContext().stop();
我们正在使用的技术:
HDFS 2.7.1.2.5
YARN + MapReduce2 2.7.1.2.5
ZooKeeper 3.4.6.2.5
Ambari Infra 0.1.0
Ambari Metrics 0.1.0
Kafka 0.10.0.2.5
Knox 0.9.0.2.5
Ranger 0.6.0.2.5
Ranger KMS 0.6.0.2.5
SmartSense 1.3.0.0-1
Spark2 2.0.x.2.5
我们从不同实验中得到的统计数据:
实验 1
num_executors=6
executor_memory=8g
executor_cores=12
100 个文件处理时间 48 分钟
实验 2
spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12
100 个文件处理时间 8 分钟
实验 3
spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12
100 个文件处理时间 7 分钟
实验 4
spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12
100 个文件处理时间 10 分钟
请告知,我们如何处理最大数量以便无需排队。
【问题讨论】:
你的批处理时间是多少?处理每个批次似乎需要 30 秒 ~ 1 分钟,这就是您累积批次的原因。 我们的 spark.streaming.kafka.maxRatePerPartition =2 并且持续时间是 10 秒。所以最多 20 秒。 您必须在 kafka 级别使用单个分区。从并行性的角度来看,这个过程很好,但输入可能不是。 让我们continue this discussion in chat。 当我们确定问题中没有记录性能不佳的代码的实际部分时,我很惊讶地看到这个问题要求提供赏金答案。要么添加完整的代码,要么放弃它,因为这对任何试图提供全面答案的人都是不公平的。 【参考方案1】:我遇到了同样的问题,我尝试了一些方法来解决这个问题并得出以下结果:
首先。直觉说每个执行器必须处理一批,但相反,一次只处理一批,但作业和任务是并行处理的。
使用 spark.streaming.concurrentjobs 可以实现多个批处理,但它没有记录在案,仍然需要一些修复。问题之一是保存 Kafka 偏移量。假设我们将此参数设置为 4 并且并行处理 4 个批次,如果第 3 批次在第 4 批次之前完成,会提交哪些 Kafka 偏移量。如果批次是独立的,这个参数非常有用。
spark.default.parallelism 因为它的名字有时被认为是使事情并行。但它真正的好处在于分布式洗牌操作。尝试不同的数字并为此找到最佳数字。您将在处理时间上获得相当大的差异。这取决于您工作中的洗牌操作。设置得太高会降低性能。从您的实验结果中也可以看出。
另一种选择是使用 foreachPartitionAsync 代替 RDD 上的 foreach。但我认为 foreachPartition 更好,因为 foreachPartitionAsync 会将作业排队,而批处理似乎已处理,但它们的作业仍将在队列中或正在处理中。可能是我没有正确使用它。但它在我的 3 项服务中表现相同。
FAIR spark.scheduler.mode 必须用于具有大量任务的作业,作为任务到作业的循环分配,让较小的任务有机会在处理较大的任务时开始接收资源。
尝试调整您的批次持续时间+输入大小,并始终将其保持在处理持续时间以下,否则您会看到大量积压的批次。
这些是我的发现和建议,但是,有很多配置和方法可以进行流式传输,并且通常一组操作不适用于其他操作。 Spark Streaming 就是关于学习,将您的经验和预期结合在一起,以获得一组最佳配置。
希望对您有所帮助。如果有人能具体说明我们如何合法地并行处理批次,那将是一个很大的解脱。
【讨论】:
感谢您的详细解答。但是仍然需要更多关于 spark Stream 并行处理的细节。 Spark 似乎一次只支持一个批处理。 @Imran,这就是spark.streaming.concurrentJobs
的用途(另外,请注意大写“J”)。请注意It doesn't work with many features, and also can cause data lost.【参考方案2】:
我们希望最大程度地实现并行性,这样就不会有任何微批处理排队
流处理就是这样:您按照接收到的顺序处理数据。如果您以比到达速度慢的速度处理数据,它将被排队。另外,不要指望一条记录的处理会突然跨多个节点并行处理。
从您的屏幕截图来看,您的批处理时间似乎是 10 秒,而您的制作人在 90 秒内发布了 100 条记录。
处理 2 条记录需要 36 秒,处理 17 条记录需要 70 秒。显然,每批都有一些开销。如果这种依赖性是线性的,那么在一个小批量中处理所有 100 条记录只需 4:18,从而击败您的记录保持者。
由于您的代码不完整,因此很难判断究竟是什么花费了这么多时间。代码中的转换看起来不错,但操作(或后续转换)可能是真正的瓶颈。另外,你的代码中没有提到的 producer.flush()
是怎么回事?
【讨论】:
我们没有任何生产者要求。 Kafka 的错误队列中只写入错误。【参考方案3】:我遇到了同样的问题,我使用 Scala Futures 解决了它。
这里有一些链接显示如何使用它:
https://alvinalexander.com/scala/how-use-multiple-scala-futures-in-for-comprehension-loop
https://www.beyondthelines.net/computing/scala-future-and-execution-context/
另外,这是我使用 Scala Futures 时的一段代码:
messages.foreachRDD rdd =>
val f = Future
// sleep(100)
val newRDD = rdd.mapmessage =>
val req_message = message.value()
(message.value())
println("Request messages: " + newRDD.count())
var resultrows = newRDD.collect()//.collectAsList()
processMessage(resultrows, mlFeatures: MLFeatures, conf)
println("Inside scala future")
1
f.onComplete
case Success(messages) => println("yay!")
case Failure(exception) => println("On no!")
【讨论】:
【参考方案4】:如果不了解所有细节很难说,但解决此类问题的一般建议 - 从非常简单的应用程序开始,“Hello world”类型。只需从输入流中读取并将数据打印到日志文件中。一旦这个工作你证明问题出在应用程序中,你逐渐添加你的功能,直到你找到罪魁祸首。如果即使是最简单的应用程序也无法运行 - 您知道配置或 Spark 集群本身存在问题。希望这会有所帮助。
【讨论】:
以上是关于Spark Streaming:微批处理并行执行的主要内容,如果未能解决你的问题,请参考以下文章