如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?
Posted
技术标签:
【中文标题】如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?【英文标题】:How to create apache spark standalone cluster for integration testing using TestContainers? 【发布时间】:2020-04-02 08:22:03 【问题描述】:有谁知道如何使用 testContainers https://www.testcontainers.org/创建一个用于集成测试的 apache-spark 集群
请提供任何正在运行的示例,我正在努力寻找。
【问题讨论】:
【参考方案1】:我能够使用 GenericContainer 类和 bitnami/spark 图像创建这种集成测试。它是以下代码(我为 library 编写了它,它将数据帧写入 AWS SQS)。
这个想法是创建一个 Spark 容器(在这种情况下,它不是一个集群,而只是一个主节点),复制运行测试所需的所有文件(一些 Python 文件和所有依赖项),发出 spark-submit 命令并检查最终状态(另一个容器中 Localstack 的 SQS 服务中的消息)。
@Testcontainers
public class SparkIntegrationTest
private static Network network = Network.newNetwork();
@Container
public LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.13"))
.withNetwork(network)
.withNetworkAliases("localstack")
.withServices(SQS);
@Container
public GenericContainer spark = new GenericContainer(DockerImageName.parse("bitnami/spark:3.1.2"))
.withCopyFileToContainer(MountableFile.forHostPath("build/resources/test/.", 0744), "/home/")
.withCopyFileToContainer(MountableFile.forHostPath("build/libs/.", 0555), "/home/")
.withNetwork(network)
.withEnv("AWS_ACCESS_KEY_ID", "test")
.withEnv("AWS_SECRET_KEY", "test")
.withEnv("SPARK_MODE", "master");
@Test
public void shouldPutASQSMessageInLocalstackUsingSpark() throws IOException, InterruptedException
String expectedBody = "my message body"; // the same value in resources/sample.txt
AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(localstack.getEndpointConfiguration(SQS))
.withCredentials(localstack.getDefaultCredentialsProvider())
.build();
sqs.createQueue("my-test");
org.testcontainers.containers.Container.ExecResult lsResult =
spark.execInContainer("spark-submit",
"--jars", "/home/spark-aws-messaging-0.3.1.jar,/home/deps/aws-java-sdk-core-1.12.12.jar,/home/deps/aws-java-sdk-sqs-1.12.12.jar",
"--master", "local",
"/home/sqs_write.py",
"/home/sample.txt",
"http://localstack:4566");
System.out.println(lsResult.getStdout());
System.out.println(lsResult.getStderr());
assertEquals(0, lsResult.getExitCode());
String queueUrl = sqs.getQueueUrl("my-test").getQueueUrl()
.replace("localstack", localstack.getContainerIpAddress());
List<Message> messages = sqs.receiveMessage(queueUrl)
.getMessages();
assertEquals(expectedBody, messages.get(0).getBody());
还有一个缺点:它是一个黑盒子,我无法衡量代码覆盖率。
【讨论】:
以上是关于如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?的主要内容,如果未能解决你的问题,请参考以下文章
使用 testcontainers 测试 kafka 和 spark
如何解决 Spring Boot 上的 org.testcontainers.containers.ContainerFetchException?
我们如何在 Testcontainers R2DBC 中初始化模式?
在 Corda 中使用 TestContainers 进行 API 测试