使用 testcontainers 测试 kafka 和 spark
Posted
技术标签:
【中文标题】使用 testcontainers 测试 kafka 和 spark【英文标题】:testing kafka and spark with testcontainers 【发布时间】:2021-10-25 02:18:01 【问题描述】:我正在尝试与 testcontainers 一起检查流式管道作为集成测试,但我不知道如何获取 bootstrapServers,至少在最后一个 testcontainers 版本中并在那里创建特定主题。如何使用 'containerDef' 提取引导服务器并添加主题?
import com.dimafeng.testcontainers.ContainerDef, KafkaContainer
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession
class Mykafkatest extends FunSuite with TestContainerForAll
//val kafkaContainer: KafkaContainer = KafkaContainer("confluentinc/cp-kafka:5.4.3")
override val containerDef: ContainerDef = KafkaContainer.Def()
test("do something")(withContainers container =>
val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Unit testing")
.getOrCreate()
// How add a topic in that container?
// This is not posible:
val servers=container.bootstrapServers
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "topic1")
.load()
df.show(false)
)
我的 sbt 配置:
lazy val root = project
.in(file("./pipeline"))
.settings(
organization := "org.example",
name := "spark-stream",
version := "0.1",
scalaVersion := "2.12.10",
libraryDependencies := Seq(
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.3" % Compile,
"org.apache.spark" %% "spark-sql" % "3.0.3" % Compile,
"com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5" % Test,
"org.dimafeng" %% "testcontainers-scala-kafka" % "0.39.5" % Test,
"org.scalameta" %% "munit" % "0.7.28" % Test
),
testFrameworks += new TestFramework("munit.Framework"),
Test / fork := true
)
文档未显示完整示例:https://www.testcontainers.org/modules/kafka/
【问题讨论】:
这个问题是否与 MuleSoft 的应用程序测试框架 MUnit (docs.mulesoft.com/munit/latest) 相关,用于为您的 Mule 应用程序构建自动化测试?container.getBootstrapServers()
不起作用?
不是,不是
如果没有这个测试用例,理想情况下你会如何“在那个容器中添加一个主题?”就像流程的工作原理一样,contianer 对象上有哪些可用的方法?
【参考方案1】:
这里唯一的问题是您将 KafkaContainer.Def
显式转换为 ContainerDef
。
withContianers
、Containter
提供的容器类型由path dependent type
在提供的ContainerDef
决定,
trait TestContainerForAll extends TestContainersForAll self: Suite =>
val containerDef: ContainerDef
final override type Containers = containerDef.Container
override def startContainers(): containerDef.Container =
containerDef.start()
// inherited from TestContainersSuite
def withContainers[A](runTest: Containers => A): A =
val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
runTest(c)
trait ContainerDef
type Container <: Startable with Stoppable
protected def createContainer(): Container
def start(): Container =
val container = createContainer()
container.start()
container
当您在 override val containerDef: ContainerDef = KafkaContainer.Def()
中明确指定类型 ContainerDef
时,这会破坏整个“类型诡计”,因此 Scala 编译器会留下 type Container <: Startable with Stoppable
而不是 KafkaContainer
。
因此,只需删除转换为 ContainerDef
的显式类型,val servers = container.bootstrapServers
就会按预期工作。
import com.dimafeng.testcontainers.KafkaContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import munit.FunSuite
class Mykafkatest extends FunSuite with TestContainerForAll
override val containerDef = KafkaContainer.Def()
test("do something")(withContainers container =>
//...
val servers = container.bootstrapServers
println(servers)
//...
)
【讨论】:
你能提供更多细节吗?此代码无法编译 您的意思是答案中的Mykafkatest
代码吗?你得到什么编译错误?
添加了正确的导入。请注意,我们需要导入 com.dimafeng.testcontainers.KafkaContainer
而不是 org.testcontainers.containers.KafkaContainer
。
现在找不到了:repo1.maven.org/maven2/org/dimafeng/… 真的在这个 url repo1.maven.org/maven2/org 不存在 'dimafeng' 文件夹
你为什么要在那个 url 上寻找那个文件夹? "com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5"
可在 maven central - mvnrepository.com/artifact/com.dimafeng/… 上找到。 sbt
将自动下载并将其添加到您的项目中。你的 sbt 项目有什么问题吗?以上是关于使用 testcontainers 测试 kafka 和 spark的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot 集成测试 Sybase 和 Testcontainers
使用 Testcontainers 进行 Dropwizard 集成测试
使用 Testcontainers 执行运行集成测试的 Gitlab CI 管道作业时出现问题