Micronaut 3:如何使用 PubSubEmulatorContainer

Posted

技术标签:

【中文标题】Micronaut 3:如何使用 PubSubEmulatorContainer【英文标题】:Micronaut 3: How to use PubSubEmulatorContainer 【发布时间】:2021-11-02 14:06:12 【问题描述】:

更新:repo 的链接已移至 answer,因为 repo 现在已使用下面答案中的代码进行了更新。

问题描述

当前代码正在运行,但它正在使用来自 google/cloud-sdkgcloud beta emulators pubsub 进行集成测试。

由于 google/cloud-sdk 图像的大小,集成测试很慢 pubsub 模拟器必须在固定端口上运行,似乎无法告诉 Micronaut 模拟器在哪个端口上运行

我需要在maven-surefire-plugin 中设置以下环境变量。

<environmentVariables>
    <PUBSUB_EMULATOR_HOST>localhost:8085</PUBSUB_EMULATOR_HOST>
</environmentVariables>

在 Spring Boot 中如何做到这一点

根据测试容器 | Gcloud Module,在Spring Boot中用PubSubEmulatorContainer实现集成测试的正确方式是这样的: https://github.com/saturnism/testcontainers-gcloud-examples/blob/main/springboot/pubsub-example/src/test/java/com/example/springboot/pubsub/PubSubIntegrationTests.java

这将在随机端口上启动容器,这可能是因为 Spring 中的DynamicPropertyRegistry。似乎 Micronaut 错过了这种可能性。

文档:https://www.testcontainers.org/modules/gcloud/


我正在寻找在 Micronaut 3.x 中实现的 JUnit5 或 Spock 集成测试的工作示例,它使用 PubSubEmulatorContainer,如上述文档中所述。

相关文档:https://micronaut-projects.github.io/micronaut-gcp/latest/guide/#emulator


GitHub上有一些关于配置TransportChannelProvider的cmets。我能够注入一个实例并对其进行检查,但我仍然没有确切地知道该怎么做。

这些是迄今为止最接近的线索: https://github.com/micronaut-projects/micronaut-gcp/issues/257 https://github.com/micronaut-projects/micronaut-gcp/pull/259

【问题讨论】:

【参考方案1】:

TL;DR

我们需要先启动测试容器,获取模拟器主机地址,然后像这样调用ApplicationContext.run

applicationContext = ApplicationContext.run(               
["pubsub.emulator.host": emulatorHost])

带有示例代码的小型 Github 存储库:https://github.com/roar-skinderviken/pubsub-emulator-demo

带代码的长答案

我终于设法使用 Micronaut 3.0.2 和 Spock 制作了一个可行的解决方案。一个相关的 Micronaut PR 让我走上了正轨,还有这篇文章:Micronaut 测试最佳实践https://objectcomputing.com/files/9815/9259/7089/slide_deck_Micronaut_Testing_Best_Practices_webinar.pdf

首先是一个PubSubEmulator类(Groovy)

package no.myproject.testframework.testcontainers

import org.testcontainers.containers.PubSubEmulatorContainer
import org.testcontainers.utility.DockerImageName

class PubSubEmulator 
    static PubSubEmulatorContainer pubSubEmulatorContainer

    static init() 
        if (pubSubEmulatorContainer == null) 
            pubSubEmulatorContainer = new PubSubEmulatorContainer(
                    DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:emulators"))
            pubSubEmulatorContainer.start()
        
    

然后是 PubSubEmulator (Groovy) 的夹具

package no.myproject.testframework.testcontainers

trait PubSubEmulatorFixture 
    Map<String, Object> getPubSubConfiguration() 
        if (PubSubEmulator.pubSubEmulatorContainer == null || !PubSubEmulator.pubSubEmulatorContainer.isRunning()) 
            PubSubEmulator.init()
        
        [
                "pubsub.emulator-host": PubSubEmulator.pubSubEmulatorContainer.getEmulatorEndpoint()
        ]
    

然后是启动容器、创建主题和订阅的规范类(Groovy)。

这里的线索是在调用ApplicationContext.run时将pubsub.emulator.host作为配置的一部分传入。

其余部分代码与我在问题中链接到的 Spring Boot 示例非常相似。

package no.myproject.testframework

import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.FixedTransportChannelProvider
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.TopicAdminClient
import com.google.cloud.pubsub.v1.TopicAdminSettings
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.PushConfig
import com.google.pubsub.v1.TopicName
import io.grpc.ManagedChannelBuilder
import io.micronaut.context.ApplicationContext
import no.myproject.configuration.GcpConfigProperties
import no.myproject.configuration.PubSubConfigProperties
import no.myproject.testframework.testcontainers.PubSubEmulatorFixture
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

abstract class PubSubSpecification extends Specification
        implements PubSubEmulatorFixture, EnvironmentFixture 

    @AutoCleanup
    @Shared
    EmbeddedServer embeddedServer

    @AutoCleanup
    @Shared
    ApplicationContext applicationContext

    def setupSpec() 

        // start the pubsub emulator
        def emulatorHost = getPubSubConfiguration().get("pubsub.emulator-host")

        // start a temporary applicationContext in order to read config
        // keep any pubsub subscriptions out of context at this stage
        applicationContext = ApplicationContext.run()

        def gcpConfigProperties = applicationContext.getBean(GcpConfigProperties)
        def pubSubConfigProperties = applicationContext.getBean(PubSubConfigProperties)

        def channel = ManagedChannelBuilder.forTarget("dns:///" + emulatorHost)
                .usePlaintext()
                .build()

        def channelProvider =
                FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))

        // START creating topic

        def topicAdminClient =
                TopicAdminClient.create(
                        TopicAdminSettings.newBuilder()
                                .setCredentialsProvider(NoCredentialsProvider.create())
                                .setTransportChannelProvider(channelProvider)
                                .build())

        def topic = TopicName.of(
                gcpConfigProperties.getProjectId(),
                pubSubConfigProperties.getTopicName())

        try 
            topicAdminClient.createTopic(topic)
         catch (AlreadyExistsException) 
            // this is fine, already created
            topicAdminClient.getTopic(topic)
        

        // START creating subscription

        pubSubConfigProperties.getSubscriptionNames().forEach(it -> 
            def subscription =
                    ProjectSubscriptionName.of(gcpConfigProperties.getProjectId(), it)

            def subscriptionAdminClient =
                    SubscriptionAdminClient.create(
                            SubscriptionAdminSettings.newBuilder()
                                    .setTransportChannelProvider(channelProvider)
                                    .setCredentialsProvider(NoCredentialsProvider.create())
                                    .build())

            try 
                subscriptionAdminClient
                        .createSubscription(
                                subscription,
                                topic,
                                PushConfig.getDefaultInstance(),
                                100)

                System.out.println("Subscription created " + subscriptionAdminClient.getSubscription(subscription))
             catch (AlreadyExistsException) 
                // this is fine, already created
                subscriptionAdminClient.getSubscription(subscription)
            
        )

        channel.shutdown()

        // stop the temporary applicationContext
        applicationContext.stop()

        // start the actual applicationContext
        embeddedServer = ApplicationContext.run(
                EmbeddedServer,
                [
                        'spec.name'           : "PubSubEmulatorSpec",
                        "pubsub.emulator.host": emulatorHost
                ],
                environments)

        applicationContext = embeddedServer.applicationContext
    

然后是用于模拟凭据的工厂类 (Groovy)

package no.myproject.pubsub

import com.google.auth.oauth2.AccessToken
import com.google.auth.oauth2.GoogleCredentials
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires

import javax.inject.Singleton


@Factory
@Requires(property = 'spec.name', value = 'PubSubEmulatorSpec')
class EmptyCredentialsFactory 

    @Singleton
    @Replaces(GoogleCredentials)
    GoogleCredentials mockCredentials() 
        return GoogleCredentials.create(new AccessToken("", new Date()))
    

最后是 Spock 测试规范。

package no.myproject.pubsub

import no.myproject.testframework.PubSubSpecification

import java.util.stream.IntStream

class PubSubIntegrationSpec extends PubSubSpecification 

    def NUMBER_OF_MESSAGES_IN_TEST = 5
    def DELAY_IN_MILLISECONDS_PER_MSG = 100

    def "when a number of messages is sent, same amount of messages is received"() 
        given:
        def documentPublisher = applicationContext.getBean(DocumentPublisher)
        def listener = applicationContext.getBean(IncomingDocListenerWithAck)
        def initialReceiveCount = listener.getReceiveCount()

        when:
        IntStream.rangeClosed(1, NUMBER_OF_MESSAGES_IN_TEST)
                .forEach(it -> documentPublisher.send("Hello World!"))

        // wait a bit in order to let all messages propagate through the queue
        Thread.sleep(NUMBER_OF_MESSAGES_IN_TEST * DELAY_IN_MILLISECONDS_PER_MSG)

        then:
        NUMBER_OF_MESSAGES_IN_TEST == listener.getReceiveCount() - initialReceiveCount
    

【讨论】:

以上是关于Micronaut 3:如何使用 PubSubEmulatorContainer的主要内容,如果未能解决你的问题,请参考以下文章

如何安装 Micronaut CLI?

Micronaut 教程:如何使用基于 JVM 的框架构建微服务?

如何配置 Jackson 在 Micronaut 中使用 SNAKE_CASE?

如何创建使用 S3Event 触发的 micronaut AWS Lambda 函数?

如何解决 micronaut 应用程序中的错误“没有 [io.micronaut.data.operations.PrimaryRepositoryOperations] 类型的 bean”?

如何设置 Micronaut 上下文路径