如何确保我的 Apache Spark 设置代码只运行一次?

Posted

技术标签:

【中文标题】如何确保我的 Apache Spark 设置代码只运行一次?【英文标题】:How do I ensure that my Apache Spark setup code runs only once? 【发布时间】:2019-11-14 15:16:30 【问题描述】:

我正在使用 Scala 编写一个 Spark 作业,它读取 S3 上的 parquet 文件,进行一些简单的转换,然后将它们保存到 DynamoDB 实例。每次运行时,我们都需要在 Dynamo 中创建一个新表,因此我编写了一个负责创建表的 Lambda 函数。我的 Spark 作业要做的第一件事是生成一个表名,调用我的 Lambda 函数(将新的表名传递给它),等待创建表,然后正常执行 ETL 步骤。

但是,我的 Lambda 函数似乎一直被调用两次。我无法解释。以下是代码示例:

def main(spark: SparkSession, pathToParquet: String) 

  // generate a unique table name
  val tableName = generateTableName()

  // call the lambda function
  val result = callLambdaFunction(tableName)

  // wait for the table to be created
  waitForTableCreation(tableName)

  // normal ETL pipeline
  var parquetRDD = spark.read.parquet(pathToParquet)
  val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
  transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
  spark.sparkContext.stop()

等待创建表的代码非常简单,如您所见:

def waitForTableCreation(tableName: String) 
  val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
  val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
  try 
    waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
   catch 
      case ex: WaiterTimedOutException =>
        LOGGER.error("Timed out waiting to create table: " + tableName)
        throw ex
      case t: Throwable => throw t
  

而 lambda 调用同样简单:

def callLambdaFunction(tableName: String) 
  val myLambda = LambdaInvokerFactory.builder()
    .lambdaClient(AWSLambdaClientBuilder.defaultClient)
    .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
    .build(classOf[MyLambdaContract])
  myLambda.invoke(new MyLambdaInput(tableName))

就像我说的,当我在这段代码上运行spark-submit 时,它肯定会命中 Lambda 函数。但我无法解释为什么它会击中两次。结果是我在 DynamoDB 中预置了两个表。

在将此作为 Spark 作业运行的上下文中,等待步骤似乎也失败了。但是当我对等待的代码进行单元测试时,它似乎可以自己正常工作。它成功阻塞,直到表准备好。

起初我推测可能spark-submit 正在将此代码发送到所有工作节点,并且它们独立运行整个事情。最初我有一个 Spark 集群,有 1 个 master 和 2 个 worker。然而,我在另一个有 1 个主节点和 5 个工作节点的集群上对此进行了测试,它再次准确地命中了 Lambda 函数两次,然后显然未能等待创建表,因为它在调用 Lambda 后不久就死了。

有人对 Spark 可能在做什么有任何线索吗?我错过了什么明显的东西吗?

更新:这是我的 spark-submit 参数,在 EMR 的“步骤”选项卡上可见。

spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar

这是我的getConfiguration 函数的代码:

def getConfiguration(tableName: String) : JobConf = 
  val conf = new Configuration()
  conf.set("dynamodb.servicename", "dynamodb")
  conf.set("dynamodb.input.tableName", tableName)
  conf.set("dynamodb.output.tableName", tableName)
  conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
  conf.set("dynamodb.regionid", "us-east-1")
  conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
  new JobConf(conf)

这里还有a Gist,其中包含我在尝试运行它时看到的一些异常日志。

【问题讨论】:

这不应该发生 - 只要该代码是驱动程序的一部分,它只会执行一次。 main() 函数是如何调用的?这可能是 AWS 开发工具包本身的问题。您可以使用Java远程调试能力来调试运行在Spark客户端模式下的代码。 有趣的案例!您可以在 UI 中检查您的应用程序发生了什么?我想您正在使用 EMR 提交应用程序,对吧?如果是这样,您可以检查“应用程序历史记录”选项卡并验证作业是否执行了 1 次或 2 次(查找“尝试”关键字)。还要检查您是否在日志中没有看到任何异常。 提示#2:检查您的 AWS 客户端是否不是异步的。也许有一些并发问题。 提示#3:在 Waiter 的代码后添加 assert(...checkIfMyTableExists(), "Table doesn't exist") 以查看会发生什么提示#4您可以分享您的 Spark 配置和 spark-submit 参数吗? 谢谢大家。 @bartosz25 这是一个好主意,但在“应用程序历史记录”下,我每次尝试只看到 1 次执行,所以它并没有加倍。我一直在调用它的方式是在 EMR 的 Steps 选项卡中添加一个“Step”,其中 Step Type 设置为“Spark application”,我将编辑我的问题以添加上面的有效 spark-submit args。至于 Spark 配置,我还可以在 getConfiguration 函数的主体中添加。关于同步性,我没有使用涉及Futures 的异步版本——我只是使用正常/同步版本。 我觉得这里发生的事情是当第一次调用时,您的应用程序没有等待创建表,它正在触发失败并中止工作的插入。随后,您第一个作业的命令在 dynamo DB 中创建表,第二次运行插入工作,因为表已经创建。但是,您的第二次运行也会创建另一个新表。我建议将布尔返回到您的 waitforTableCreation 并查看是否创建表强制插入代码。只是为了检查该功能的有效性。 更新:如果我在“客户端”部署模式而不是“集群”部署模式下运行它,我的代码似乎可以工作?这对这里的任何人有什么提示吗?这种模式部署起来不太方便,因为集群模式允许您将 JAR 文件存储在 S3 中,而客户端模式要求您事先将文件复制到主磁盘上,作为某种引导操作。 【参考方案1】:

感谢@soapergem 添加日志记录和选项。我添加了一个答案(尝试一个),因为它可能比评论长一点:)

总结:

spark-submit 和配置选项没什么奇怪的 在https://gist.github.com/soapergem/6b379b5a9092dcd43777bdec8dee65a8#file-stderr-log 中可以看到应用程序执行了两次。它从 ACCEPTED 到 RUNNING 状态经过两次。这与 EMR 默认值 (How to prevent EMR Spark step from retrying?) 一致。为了确认这一点,您可以检查在执行该步骤后是否创建了 2 个表(我假设您正在生成具有动态名称的表;每次执行时使用不同的名称,在重试的情况下应该给出 2 个不同的名称)

最后一个问题:

如果我在“客户端”部署模式而不是“集群”部署模式下运行我的代码,它看起来可能会工作?这对这里的任何人有什么提示吗?

有关差异的更多信息,请查看https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html 在您的情况下,看起来在客户端模式下执行spark-submit 的机器与 EMR 作业流具有不同的 IAM 策略。我在这里的假设是您的工作流角色不允许dynamodb:Describe*,这就是为什么您会遇到500 code 的例外情况(来自您的要点):

Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)

为了确认这个假设,您需要执行创建表的部分并在本地等待创建(这里没有 Spark 代码,只需一个简单的 java 主函数命令)并且:

对于第一次执行,请确保您拥有所有权限。 IMO 它将是 dynamodb:Describe* on Resources: * (如果是这个原因,AFAIK 你应该在生产中使用 somthing Resources: Test_Emr* 以实现最低特权原则) 对于第二次执行删除 dynamodb:Describe* 并检查您是否获得了与 gist 中相同的堆栈跟踪

【讨论】:

我觉得我们已经接近了,但事实并非如此。设置 EMR 集群时,您可以对其应用两种不同的 IAM 角色:一种用于集群,一种用于工作人员。但事实证明,这两个政策都已经有了dynamodb:* 政策。我检查的另一件事是,我尝试使用 AWS CLI 工具创建一个 Describe 表,首先使用有权访问的凭证,然后使用没有访问权限的凭证。后者返回一个AccessDeniedException,它与ResourceNotFoundException不同。 答,好的。也许尝试注释调用 Lambda 的代码,然后在添加步骤之前创建表并静态设置名称。它应该已经验证了Waiter 和 Spark 是否正常工作。 @soapergem, 我们可以在callLambdaFunction 方法中添加对org.apache.spark.util.Utils#getCallSite 的调用并像println(callSite.longForm) 一样打印它,这只是为了检查除了调用方法的正常 spark sumbit。 另一件事,地区。我在几个地方发现端点 url 是在没有协议的情况下编写的(aws.amazon.com/blogs/big-data/…、***.com/questions/52187885/…)所以也许也可以这样尝试?否则,您能否验证 Lambda、服务员和 Spark 执行程序使用相同的区域?另外,我想知道为什么你需要在配置中输入表? 我可以关闭协议,或者删除 dynamodb.input.tableName 属性,但目前它们没有损害任何东西。是的,我可以验证一切都在 us-east-1 中运行。【参考方案2】:

我在集群模式下也遇到了同样的问题(v2.4.0)。我通过使用 SparkLauncher 而不是 spark-submit.sh 以编程方式启动我的应用程序来解决它。您可以将 lambda 逻辑移动到启动 spark 应用程序的 main 方法中,如下所示:

def main(args: Array[String]) = 
    // generate a unique table name
    val tableName = generateTableName()

    // call the lambda function
    val result = callLambdaFunction(tableName)

    // wait for the table to be created
    waitForTableCreation(tableName)

    val latch = new CountDownLatch(1);

    val handle = new SparkLauncher(env)
        .setAppResource("/path/to/spark-app.jar")
        .setMainClass("com.company.SparkApp")
        .setMaster("yarn")
        .setDeployMode("cluster")
        .setConf("spark.executor.instances", "2")
        .setConf("spark.executor.cores", "2")
        // other conf ... 
        .setVerbose(true)
        .startApplication(new SparkAppHandle.Listener 
            override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = 
                latch.countDown()
            

            override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = 

            
        )  

    println("app is launching...")
    latch.await()
    println("app exited")

【讨论】:

你如何调用它?你基本上只是在通过 SSH 连接到主服务器时直接使用 java -jar ... 运行它吗?我希望用 AWS Data Pipeline 包装这个 EMR 作业,所以我想知道调用这样的东西有多么困难。我对 SparkLauncher 不熟悉,但如果它能解决我的问题,我一定会研究它。 假设上面的代码在object MySparkLauncher,你可以像java -cp $SPARK_HOME/jars/*:/path/to/your/spark-app.jar com.company.SparkLauncher这样运行它,其中SPARK_HOME是安装spark二进制文件的地方。 还有:1-count锁存器的作用是什么? 这是一种在退出程序之前等待 spark 应用程序完成运行的方法。没有它,程序将立即退出并且不执行任何操作。【参考方案3】:

您的 spark 作业在实际创建表之前开始,因为逐个定义操作并不意味着它们会等到前一个操作完成

您需要更改代码,以便在创建表后启动与 spark 相关的块,并且为了实现它,您必须使用 for-comprehension 来确保每个步骤都已完成,或者将您的 spark 管道放入回调中表创建后调用的waiter(如果有的话,很难说)

您也可以使用andThen 或简单的map

重点是main中编写的所有代码行立即一一执行,无需等待前一行完成

【讨论】:

这个链接***.com/questions/31714788/…你可能会觉得有用

以上是关于如何确保我的 Apache Spark 设置代码只运行一次?的主要内容,如果未能解决你的问题,请参考以下文章

如何设置 Spark 执行器的数量?

如何设置 Apache Spark Executor 内存

如何使用apache spark通过列表对文本中的特定单词进行去标识化?

在Apache Spark中使用Bigquery Connector时如何设置分区数?

调优 | Apache Hudi应用调优指南

Apache Spark Hadoop S3A SignatureDoesNotMatch