如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?

Posted

技术标签:

【中文标题】如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?【英文标题】:How to create apache spark standalone cluster for integration testing using TestContainers? 【发布时间】:2020-04-02 08:22:03 【问题描述】:

有谁知道如何使用 testContainers https://www.testcontainers.org/创建一个用于集成测试的 apache-spark 集群

请提供任何正在运行的示例,我正在努力寻找。

【问题讨论】:

【参考方案1】:

我能够使用 GenericContainer 类和 bitnami/spark 图像创建这种集成测试。它是以下代码(我为 library 编写了它,它将数据帧写入 AWS SQS)。

这个想法是创建一个 Spark 容器(在这种情况下,它不是一个集群,而只是一个主节点),复制运行测试所需的所有文件(一些 Python 文件和所有依赖项),发出 spark-submit 命令并检查最终状态(另一个容器中 Localstack 的 SQS 服务中的消息)。

    @Testcontainers
    public class SparkIntegrationTest 
    
        private static Network network = Network.newNetwork();
    
        @Container
        public LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.13"))
                .withNetwork(network)
                .withNetworkAliases("localstack")
                .withServices(SQS);
    
        @Container
        public GenericContainer spark = new GenericContainer(DockerImageName.parse("bitnami/spark:3.1.2"))
                .withCopyFileToContainer(MountableFile.forHostPath("build/resources/test/.", 0744), "/home/")
                .withCopyFileToContainer(MountableFile.forHostPath("build/libs/.", 0555), "/home/")
                .withNetwork(network)
                .withEnv("AWS_ACCESS_KEY_ID", "test")
                .withEnv("AWS_SECRET_KEY", "test")
                .withEnv("SPARK_MODE", "master");
    
        @Test
        public void shouldPutASQSMessageInLocalstackUsingSpark() throws IOException, InterruptedException 
            String expectedBody = "my message body";    // the same value in resources/sample.txt
    
            AmazonSQS sqs = AmazonSQSClientBuilder.standard()
                    .withEndpointConfiguration(localstack.getEndpointConfiguration(SQS))
                    .withCredentials(localstack.getDefaultCredentialsProvider())
                    .build();
            sqs.createQueue("my-test");
    
            org.testcontainers.containers.Container.ExecResult lsResult =
                    spark.execInContainer("spark-submit",
                            "--jars", "/home/spark-aws-messaging-0.3.1.jar,/home/deps/aws-java-sdk-core-1.12.12.jar,/home/deps/aws-java-sdk-sqs-1.12.12.jar",
                            "--master", "local",
                            "/home/sqs_write.py",
                            "/home/sample.txt",
                            "http://localstack:4566");
    
            System.out.println(lsResult.getStdout());
            System.out.println(lsResult.getStderr());
    
            assertEquals(0, lsResult.getExitCode());
    
            String queueUrl = sqs.getQueueUrl("my-test").getQueueUrl()
                    .replace("localstack", localstack.getContainerIpAddress());
            List<Message> messages = sqs.receiveMessage(queueUrl)
                    .getMessages();
            assertEquals(expectedBody, messages.get(0).getBody());
        
    

还有一个缺点:它是一个黑盒子,我无法衡量代码覆盖率。

【讨论】:

以上是关于如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?的主要内容,如果未能解决你的问题,请参考以下文章

使用 testcontainers 测试 kafka 和 spark

如何解决 Spring Boot 上的 org.testcontainers.containers.ContainerFetchException?

我们如何在 Testcontainers R2DBC 中初始化模式?

在 Corda 中使用 TestContainers 进行 API 测试

testcontainers-java 新增对 TiDB 的支持

无法连接到 SpringTest 中由 TestContainers 创建的容器