如何确保我的 Apache Spark 设置代码只运行一次?
Posted
技术标签:
【中文标题】如何确保我的 Apache Spark 设置代码只运行一次?【英文标题】:How do I ensure that my Apache Spark setup code runs only once? 【发布时间】:2019-11-14 15:16:30 【问题描述】:我正在使用 Scala 编写一个 Spark 作业,它读取 S3 上的 parquet 文件,进行一些简单的转换,然后将它们保存到 DynamoDB 实例。每次运行时,我们都需要在 Dynamo 中创建一个新表,因此我编写了一个负责创建表的 Lambda 函数。我的 Spark 作业要做的第一件事是生成一个表名,调用我的 Lambda 函数(将新的表名传递给它),等待创建表,然后正常执行 ETL 步骤。
但是,我的 Lambda 函数似乎一直被调用两次。我无法解释。以下是代码示例:
def main(spark: SparkSession, pathToParquet: String)
// generate a unique table name
val tableName = generateTableName()
// call the lambda function
val result = callLambdaFunction(tableName)
// wait for the table to be created
waitForTableCreation(tableName)
// normal ETL pipeline
var parquetRDD = spark.read.parquet(pathToParquet)
val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
spark.sparkContext.stop()
等待创建表的代码非常简单,如您所见:
def waitForTableCreation(tableName: String)
val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
try
waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
catch
case ex: WaiterTimedOutException =>
LOGGER.error("Timed out waiting to create table: " + tableName)
throw ex
case t: Throwable => throw t
而 lambda 调用同样简单:
def callLambdaFunction(tableName: String)
val myLambda = LambdaInvokerFactory.builder()
.lambdaClient(AWSLambdaClientBuilder.defaultClient)
.lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
.build(classOf[MyLambdaContract])
myLambda.invoke(new MyLambdaInput(tableName))
就像我说的,当我在这段代码上运行spark-submit
时,它肯定会命中 Lambda 函数。但我无法解释为什么它会击中两次。结果是我在 DynamoDB 中预置了两个表。
在将此作为 Spark 作业运行的上下文中,等待步骤似乎也失败了。但是当我对等待的代码进行单元测试时,它似乎可以自己正常工作。它成功阻塞,直到表准备好。
起初我推测可能spark-submit
正在将此代码发送到所有工作节点,并且它们独立运行整个事情。最初我有一个 Spark 集群,有 1 个 master 和 2 个 worker。然而,我在另一个有 1 个主节点和 5 个工作节点的集群上对此进行了测试,它再次准确地命中了 Lambda 函数两次,然后显然未能等待创建表,因为它在调用 Lambda 后不久就死了。
有人对 Spark 可能在做什么有任何线索吗?我错过了什么明显的东西吗?
更新:这是我的 spark-submit 参数,在 EMR 的“步骤”选项卡上可见。
spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar
这是我的getConfiguration
函数的代码:
def getConfiguration(tableName: String) : JobConf =
val conf = new Configuration()
conf.set("dynamodb.servicename", "dynamodb")
conf.set("dynamodb.input.tableName", tableName)
conf.set("dynamodb.output.tableName", tableName)
conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
conf.set("dynamodb.regionid", "us-east-1")
conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
new JobConf(conf)
这里还有a Gist,其中包含我在尝试运行它时看到的一些异常日志。
【问题讨论】:
这不应该发生 - 只要该代码是驱动程序的一部分,它只会执行一次。main()
函数是如何调用的?这可能是 AWS 开发工具包本身的问题。您可以使用Java远程调试能力来调试运行在Spark客户端模式下的代码。
有趣的案例!您可以在 UI 中检查您的应用程序发生了什么?我想您正在使用 EMR 提交应用程序,对吧?如果是这样,您可以检查“应用程序历史记录”选项卡并验证作业是否执行了 1 次或 2 次(查找“尝试”关键字)。还要检查您是否在日志中没有看到任何异常。 提示#2:检查您的 AWS 客户端是否不是异步的。也许有一些并发问题。 提示#3:在 Waiter 的代码后添加 assert(...checkIfMyTableExists(), "Table doesn't exist")
以查看会发生什么提示#4您可以分享您的 Spark 配置和 spark-submit 参数吗?
谢谢大家。 @bartosz25 这是一个好主意,但在“应用程序历史记录”下,我每次尝试只看到 1 次执行,所以它并没有加倍。我一直在调用它的方式是在 EMR 的 Steps 选项卡中添加一个“Step”,其中 Step Type 设置为“Spark application”,我将编辑我的问题以添加上面的有效 spark-submit args。至于 Spark 配置,我还可以在 getConfiguration
函数的主体中添加。关于同步性,我没有使用涉及Future
s 的异步版本——我只是使用正常/同步版本。
我觉得这里发生的事情是当第一次调用时,您的应用程序没有等待创建表,它正在触发失败并中止工作的插入。随后,您第一个作业的命令在 dynamo DB 中创建表,第二次运行插入工作,因为表已经创建。但是,您的第二次运行也会创建另一个新表。我建议将布尔返回到您的 waitforTableCreation 并查看是否创建表强制插入代码。只是为了检查该功能的有效性。
更新:如果我在“客户端”部署模式而不是“集群”部署模式下运行它,我的代码似乎可以工作?这对这里的任何人有什么提示吗?这种模式部署起来不太方便,因为集群模式允许您将 JAR 文件存储在 S3 中,而客户端模式要求您事先将文件复制到主磁盘上,作为某种引导操作。
【参考方案1】:
感谢@soapergem 添加日志记录和选项。我添加了一个答案(尝试一个),因为它可能比评论长一点:)
总结:
spark-submit
和配置选项没什么奇怪的
在https://gist.github.com/soapergem/6b379b5a9092dcd43777bdec8dee65a8#file-stderr-log 中可以看到应用程序执行了两次。它从 ACCEPTED 到 RUNNING 状态经过两次。这与 EMR 默认值 (How to prevent EMR Spark step from retrying?) 一致。为了确认这一点,您可以检查在执行该步骤后是否创建了 2 个表(我假设您正在生成具有动态名称的表;每次执行时使用不同的名称,在重试的情况下应该给出 2 个不同的名称)
最后一个问题:
如果我在“客户端”部署模式而不是“集群”部署模式下运行我的代码,它看起来可能会工作?这对这里的任何人有什么提示吗?
有关差异的更多信息,请查看https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html 在您的情况下,看起来在客户端模式下执行spark-submit
的机器与 EMR 作业流具有不同的 IAM 策略。我在这里的假设是您的工作流角色不允许dynamodb:Describe*
,这就是为什么您会遇到500 code
的例外情况(来自您的要点):
Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)
为了确认这个假设,您需要执行创建表的部分并在本地等待创建(这里没有 Spark 代码,只需一个简单的 java
主函数命令)并且:
dynamodb:Describe*
on Resources: *
(如果是这个原因,AFAIK 你应该在生产中使用 somthing Resources: Test_Emr*
以实现最低特权原则)
对于第二次执行删除 dynamodb:Describe*
并检查您是否获得了与 gist 中相同的堆栈跟踪
【讨论】:
我觉得我们已经接近了,但事实并非如此。设置 EMR 集群时,您可以对其应用两种不同的 IAM 角色:一种用于集群,一种用于工作人员。但事实证明,这两个政策都已经有了dynamodb:*
政策。我检查的另一件事是,我尝试使用 AWS CLI 工具创建一个 Describe 表,首先使用有权访问的凭证,然后使用没有访问权限的凭证。后者返回一个AccessDeniedException
,它与ResourceNotFoundException
不同。
答,好的。也许尝试注释调用 Lambda 的代码,然后在添加步骤之前创建表并静态设置名称。它应该已经验证了Waiter
和 Spark 是否正常工作。
@soapergem, 我们可以在callLambdaFunction
方法中添加对org.apache.spark.util.Utils#getCallSite
的调用并像println(callSite.longForm) 一样打印它,这只是为了检查除了调用方法的正常 spark sumbit。
另一件事,地区。我在几个地方发现端点 url 是在没有协议的情况下编写的(aws.amazon.com/blogs/big-data/…、***.com/questions/52187885/…)所以也许也可以这样尝试?否则,您能否验证 Lambda、服务员和 Spark 执行程序使用相同的区域?另外,我想知道为什么你需要在配置中输入表?
我可以关闭协议,或者删除 dynamodb.input.tableName 属性,但目前它们没有损害任何东西。是的,我可以验证一切都在 us-east-1 中运行。【参考方案2】:
我在集群模式下也遇到了同样的问题(v2.4.0)。我通过使用 SparkLauncher 而不是 spark-submit.sh 以编程方式启动我的应用程序来解决它。您可以将 lambda 逻辑移动到启动 spark 应用程序的 main 方法中,如下所示:
def main(args: Array[String]) =
// generate a unique table name
val tableName = generateTableName()
// call the lambda function
val result = callLambdaFunction(tableName)
// wait for the table to be created
waitForTableCreation(tableName)
val latch = new CountDownLatch(1);
val handle = new SparkLauncher(env)
.setAppResource("/path/to/spark-app.jar")
.setMainClass("com.company.SparkApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.executor.instances", "2")
.setConf("spark.executor.cores", "2")
// other conf ...
.setVerbose(true)
.startApplication(new SparkAppHandle.Listener
override def stateChanged(sparkAppHandle: SparkAppHandle): Unit =
latch.countDown()
override def infoChanged(sparkAppHandle: SparkAppHandle): Unit =
)
println("app is launching...")
latch.await()
println("app exited")
【讨论】:
你如何调用它?你基本上只是在通过 SSH 连接到主服务器时直接使用java -jar ...
运行它吗?我希望用 AWS Data Pipeline 包装这个 EMR 作业,所以我想知道调用这样的东西有多么困难。我对 SparkLauncher 不熟悉,但如果它能解决我的问题,我一定会研究它。
假设上面的代码在object MySparkLauncher
,你可以像java -cp $SPARK_HOME/jars/*:/path/to/your/spark-app.jar com.company.SparkLauncher
这样运行它,其中SPARK_HOME是安装spark二进制文件的地方。
还有:1-count锁存器的作用是什么?
这是一种在退出程序之前等待 spark 应用程序完成运行的方法。没有它,程序将立即退出并且不执行任何操作。【参考方案3】:
您的 spark 作业在实际创建表之前开始,因为逐个定义操作并不意味着它们会等到前一个操作完成
您需要更改代码,以便在创建表后启动与 spark 相关的块,并且为了实现它,您必须使用 for-comprehension
来确保每个步骤都已完成,或者将您的 spark 管道放入回调中表创建后调用的waiter
(如果有的话,很难说)
您也可以使用andThen
或简单的map
重点是main中编写的所有代码行立即一一执行,无需等待前一行完成
【讨论】:
这个链接***.com/questions/31714788/…你可能会觉得有用以上是关于如何确保我的 Apache Spark 设置代码只运行一次?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用apache spark通过列表对文本中的特定单词进行去标识化?