从 Spark 调用休息服务
Posted
技术标签:
【中文标题】从 Spark 调用休息服务【英文标题】:Calling a rest service from Spark 【发布时间】:2018-07-18 06:41:56 【问题描述】:我正在尝试找出从 Spark 调用 Rest 端点的最佳方法。
我目前的方法(解决方案 [1])看起来像这样 -
val df = ... // some dataframe
val repartitionedDf = df.repartition(numberPartitions)
lazy val restEndPoint = new restEndPointCaller() // lazy evaluation of the object which creates the connection to REST. lazy vals are also initialized once per JVM (executor)
val enrichedDf = repartitionedDf
.map(rec => restEndPoint.getResponse(rec)) // calls the rest endpoint for every record
.toDF
我知道我可以使用 .mapPartitions() 而不是 .map(),但是查看 DAG,看起来 spark 优化了重新分区 -> 无论如何映射到 mapPartition。
在第二种方法(解决方案 [2])中,为每个分区创建一次连接,并为分区内的所有记录重复使用。
val newDs = myDs.mapPartitions(partition =>
val restEndPoint = new restEndPointCaller /*creates a db connection per partition*/
val newPartition = partition.map(record =>
restEndPoint.getResponse(record, connection)
).toList // consumes the iterator, thus calls readMatchingFromDB
restEndPoint.close() // close dbconnection here
newPartition.iterator // create a new iterator
)
在这第三种方法(解决方案 [3])中,每个 JVM(执行程序)创建一次连接,并在执行程序处理的所有分区中重复使用。
lazy val connection = new DbConnection /*creates a db connection per partition*/
val newDs = myDs.mapPartitions(partition =>
val newPartition = partition.map(record =>
readMatchingFromDB(record, connection)
).toList // consumes the iterator, thus calls readMatchingFromDB
newPartition.iterator // create a new iterator
)
connection.close() // close dbconnection here
[a] 对于非常相似的解决方案 [1] 和 [3],我对 lazy val 如何工作的理解是否正确?目的是将每个执行程序/JVM 的连接数限制为 1,并重用打开的连接来处理后续请求。我会为每个 JVM 创建 1 个连接还是每个分区创建 1 个连接?
[b] 还有其他方法可以控制我们向其余端点发出的请求数 (RPS) 吗?
[c] 如果有更好、更有效的方法,请告诉我。
谢谢!
【问题讨论】:
【参考方案1】:IMO 使用mapPartitions
的第二种解决方案更好。首先,你明确地告诉你期望实现什么。转换的名称和实现的逻辑非常清楚地说明了这一点。对于第一个选项,您需要了解 Apache Spark 如何优化处理。现在对您来说可能很明显,但您还应该考虑将在 6 个月、1 年、2 年等等之后为您的代码工作的人,或者只是考虑您。他们应该比repartition
+ map
更了解mapPartitions
。
此外,使用 map 进行重新分区的优化可能会在内部发生变化(我不相信,但您仍然可以认为这是一个有效点),此时您的工作会表现得更差。
最后,使用第二种解决方案,您可以避免在序列化过程中可能遇到的许多问题。在您编写的代码中,驱动程序将创建端点对象的一个实例,将其序列化并发送给执行程序。所以是的,也许它会是一个单一的实例,但前提是它是可序列化的。
[编辑]
感谢您的澄清。您可以通过不同的方式实现您的目标。要让每个 JVM 恰好有 1 个连接,您可以使用一种称为单例的设计模式。在 Scala 中,它很容易表达为 object
(我在 Google https://alvinalexander.com/scala/how-to-implement-singleton-pattern-in-scala-with-object 上找到的第一个链接)
而且它非常好,因为您不需要序列化任何东西。单例直接从执行器端的类路径中读取。有了它,您肯定会拥有给定对象的一个实例。
[a] 与解决方案 [1] 和 [3] 非常相似,是我的 了解lazy val 如何正确工作?目的是 将每个执行程序/JVM 的连接数限制为 1 并重用 用于处理后续请求的打开连接。我会不会 每个 JVM 或每个分区创建 1 个连接? 它将为每个分区创建 1 个连接。你可以执行这个小测试来看看:
class SerializationProblemsTest extends FlatSpec
val conf = new SparkConf().setAppName("Spark serialization problems test").setMaster("local")
val sparkContext = SparkContext.getOrCreate(conf)
"lazy object" should "be created once per partition" in
lazy val restEndpoint = new NotSerializableRest()
sparkContext.parallelize(0 to 120).repartition(12)
.mapPartitions(numbers =>
//val restEndpoint = new NotSerializableRest()
numbers.map(nr => restEndpoint.enrich(nr))
)
.collect()
class NotSerializableRest()
println("Creating REST instance")
def enrich(id: Int): String = s"$id"
它应该打印 Creating REST instance 12 次(分区数)
[b] 有什么方法可以控制请求数 (RPS) 我们到其余端点?
要控制请求的数量,您可以使用类似于数据库连接池的方法:HTTP 连接池(一个快速找到的链接:HTTP connection pooling using HttpClient)。
但也许另一种有效的方法是处理较小的数据子集?因此,您可以将其拆分为不同的较小的微批次(如果它是流式作业),而不是处理 30000 行。它应该给你的网络服务更多的“休息”。
否则,您也可以尝试发送批量请求(Elasticsearch 会同时索引/删除多个文档https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)。但这取决于网络服务是否允许您这样做。
【讨论】:
感谢@bartosz25。我已经用额外的说明和问题编辑了这个问题。同意,该解决方案 [2] 看起来更简洁、更具可读性,并且在所有这些情况下,驱动程序都需要序列化端点对象。 嗨@Yash,您能分享一下您是如何解决问题的,并提供一些您测试过的解决方案的优缺点反馈吗? 嗨,很抱歉这里的回复很晚。我采用了方法 [3],由于多种原因,它似乎运作良好。 1. 持续打开的连接——服务器上更好的 cpu 时间使用、请求管道、网络 tcp 拥塞、握手延迟。 2. 除此之外,数据如何分区无关紧要,除非可以批量发出请求,并且可以通过分区大小控制批量大小。 别担心,亚什。感谢您分享您的反馈! 此代码的python版本是否可用?以上是关于从 Spark 调用休息服务的主要内容,如果未能解决你的问题,请参考以下文章
Spring RestTemplate 配置策略从单个 API 调用多个休息服务
从托管在被 CORS 策略阻止的 Asp.net 核心 Web 服务器中的 Angular 应用程序调用我的休息服务