在所有 spark executors 和 Driver 上执行脚本或小函数,而不使用 DataFrame 或 RDD

Posted

技术标签:

【中文标题】在所有 spark executors 和 Driver 上执行脚本或小函数,而不使用 DataFrame 或 RDD【英文标题】:Execute a script or small function on all spark executors and Driver without using a DataFrame or RDD 【发布时间】:2020-07-12 03:51:17 【问题描述】:

我正在尝试使用 Spark 结构化流从 Kafka 主题中读取数据。 Kafka Brokers 启用了 SSL。所以我需要将私有 CA 证书安装/导入到 spark 驱动程序和执行程序上的 TrustStore 文件中。

我无法在主 spark 提交命令之前使用单独的步骤导入证书,因为 spark 脚本是动态提交的(从 s3 下载)。来自 s3 的这个 spark 脚本包含有关私有 CA 证书文件 (.pem) 所在位置的信息(在单独的 s3 位置上)。

我查找了执行此操作的方法。大多数解决方案都需要创建 RDD 或 DataFrame 并在其上调用 Map 或 MapPartition 函数(本质上是定义分区)。但这对我来说就像一个循环依赖。如果不先导入私有 ca 证书,我既不能创建 Dataframe 或 RDD,也不能在不创建 DataFrame 或 RDD 的情况下导入 ca 证书。

我可以创建一个虚拟 DataFrame 并尝试将它们分布在所有 executor 上,但这种解决方案并不总是有效(例如,如果 executor 节点崩溃然后恢复,或者 DataFrame 由于分区而没有正确分布在所有 executor 节点上怎么办?算法限制)。

谁能提出一种更好的方法来在 Driver 和所有执行程序上执行一个小函数,而无需创建 DataFrame 或 RDD?

【问题讨论】:

您在哪里运行 Spark 应用程序?它在 AWS EMR 上吗? 您能否添加一个代码 sn-p、您想要的结果以及确切的错误消息?这将使这个问题更容易回答。 @AjayKrChoudhary:更多的是设计限制。我的应用程序包含各种微服务。一项服务接受 docker 映像并运行它。我的微服务提交 spark 命令和 docker 镜像。所以从本质上讲,我在 Spark 工作中只能迈出一步。 @Powers:我得到的错误是常规 SSL 握手失败异常。原因是 ca 证书未在所有执行程序中导入。更多的是关于如何在所有 spark 执行器上执行函数而不创建 rdd/dataframe 的设计问题 @Shashank 在 EMR 中添加引导操作不需要您添加额外的步骤。它是作为启动 EMR 集群的一部分完成的。因此,您仍然可以将其视为仅在您的应用程序要求时提交一个步骤。 【参考方案1】:

如果您在 AWS EMR 上运行 Spark 应用程序,则可以通过 EMR 中的引导操作来解决您的问题。

从bootstrap actionbootstrap action的官方文档中,你会发现这个

您可以使用引导操作安装其他软件或 自定义集群实例的配置。 引导操作 是在 Amazon EMR 启动后在集群上运行的脚本 使用 Amazon Linux Amazon 系统映像 (AMI) 的实例。引导程序 在 Amazon EMR 安装您的应用程序之前运行的操作 指定创建集群的时间和集群节点开始之前 处理数据。如果您将节点添加到正在运行的集群,请引导 操作也以相同的方式在这些节点上运行。您可以创建自定义 引导操作并在您创建集群时指定它们。

您可以根据用例使这些脚本在驱动程序或执行程序节点或两者上运行。默认情况下,它将在 EMR 中的所有实例上运行。

您可以将引导脚本放在 S3 上,也可以在从 AWS 控制台创建集群时粘贴整个脚本。我个人更喜欢将脚本放在 S3 中,并在启动 EMR 时在引导操作中指定此文件路径。

现在,为了满足您的用例,您可以将下载 CA 证书的逻辑放在脚本中,以及您希望在集群中的所有节点上执行的任何其他自定义逻辑。

【讨论】:

引导操作在我的用例中是不可能的。我只能执行单个 spark 步骤(即 spark-submit 命令) @Shashank 我仍然不明白为什么你不能运行引导操作。您能否通过编辑问题来更多地解释您的执行环境,以便我能以更好的方式提供帮助。如果我正确理解你的想法,这个解决方案仍然需要你只提交一个 spark 步骤。只是您通过放置引导操作在所有实例中预先执行某些事情。 将整个应用程序视为各种微服务的组合。其中一个微服务只接受一个 docker 图像 url 和命令来执行它。这个微服务抽象了 EMR 启动和命令执行。所以我无法控制这个微服务。我唯一能控制的是 API 调用和传递 Docker 映像 URL 以及执行命令。我的 Docker 映像包含一个使用 spark 结构化流的自定义库。所以一旦 spark-submit commad 启动,我的库代码就会被调用。 啊好吧。知道了。那么上面的方法就行不通了。

以上是关于在所有 spark executors 和 Driver 上执行脚本或小函数,而不使用 DataFrame 或 RDD的主要内容,如果未能解决你的问题,请参考以下文章

在所有 spark executors 和 Driver 上执行脚本或小函数,而不使用 DataFrame 或 RDD

SPARK 资源调度源码总结

在 Spark 中,是不是可以在两个 executor 之间共享数据?

spark:blockManagerbroadcastcachecheckpoint

Spark 宏观架构&执行步骤

2022-02-26-Spark-46(性能调优SparkUI)