Spring Cloud Stream 绑定可视化

Posted

技术标签:

【中文标题】Spring Cloud Stream 绑定可视化【英文标题】:Spring Cloud Stream binding visualisation 【发布时间】:2018-12-24 17:35:30 【问题描述】:

通过设置以下属性启用绑定执行器端点后:management.endpoints.web.exposure.include=bindings,我应该看到绑定(消费者)属性。

但是,在我添加了 spring cloud bus 依赖并启用了 spring cloud bus 之后,我只能看到 springCloudBus 的绑定属性,但看不到我在项目中创建的绑定。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

问题:如何确保我们在启用spring cloud bus后仍然可以看到我们创建的绑定?

当我设置spring.cloud.bus.enabled=false时,我得到了以下结果

[
    
        "name": "TestInput",
        "group": null,
        "pausable": true,
        "state": "running",
        "extendedInfo": 
            "bindingDestination": "KafkaConsumerDestinationconsumerDestinationName='TestInput', partitions=1, dlqName='null'",
            "ExtendedConsumerProperties": 
                "concurrency": 1,
                "instanceCount": 1,
                "maxAttempts": 3,
                "backOffInitialInterval": 1000,
                "backOffMaxInterval": 10000,
                "backOffMultiplier": 2,
                "extension": 
                    "ackEachRecord": false,
                    "autoRebalanceEnabled": true,
                    "autoCommitOffset": true,
                    "autoCommitOnError": null,
                    "startOffset": null,
                    "resetOffsets": false,
                    "enableDlq": false,
                    "dlqName": null,
                    "dlqProducerProperties": 
                        "bufferSize": 16384,
                        "compressionType": "none",
                        "sync": false,
                        "batchTimeout": 0,
                        "messageKeyExpression": null,
                        "headerPatterns": null,
                        "configuration": ,
                        "admin": 
                            "replicationFactor": null,
                            "replicasAssignments": ,
                            "configuration": 
                        
                    ,
                    "recoveryInterval": 5000,
                    "trustedPackages": null,
                    "standardHeaders": "none",
                    "converterBeanName": null,
                    "idleEventInterval": 30000,
                    "configuration": ,
                    "admin": 
                        "replicationFactor": null,
                        "replicasAssignments": ,
                        "configuration": 
                    
                
            
        
    ,
    
        "name": "TestInput2",
        "group": null,
        "pausable": true,
        "state": "running",
        "extendedInfo": 
            "bindingDestination": "KafkaConsumerDestinationconsumerDestinationName='TestInput2', partitions=1, dlqName='null'",
            "ExtendedConsumerProperties": 
                "concurrency": 1,
                "instanceCount": 1,
                "maxAttempts": 3,
                "backOffInitialInterval": 1000,
                "backOffMaxInterval": 10000,
                "backOffMultiplier": 2,
                "extension": 
                    "ackEachRecord": false,
                    "autoRebalanceEnabled": true,
                    "autoCommitOffset": true,
                    "autoCommitOnError": null,
                    "startOffset": null,
                    "resetOffsets": false,
                    "enableDlq": false,
                    "dlqName": null,
                    "dlqProducerProperties": 
                        "bufferSize": 16384,
                        "compressionType": "none",
                        "sync": false,
                        "batchTimeout": 0,
                        "messageKeyExpression": null,
                        "headerPatterns": null,
                        "configuration": ,
                        "admin": 
                            "replicationFactor": null,
                            "replicasAssignments": ,
                            "configuration": 
                        
                    ,
                    "recoveryInterval": 5000,
                    "trustedPackages": null,
                    "standardHeaders": "none",
                    "converterBeanName": null,
                    "idleEventInterval": 30000,
                    "configuration": ,
                    "admin": 
                        "replicationFactor": null,
                        "replicasAssignments": ,
                        "configuration": 
                    
                
            
        
    
]

设置spring.cloud.bus.enabled=true后,只能看到springCloudBus的属性

[

    "name": "springCloudBus",
    "group": null,
    "pausable": true,
    "state": "running",
    "extendedInfo": 
        "bindingDestination": "KafkaConsumerDestinationconsumerDestinationName='springCloudBus', partitions=1, dlqName='null'",
        "ExtendedConsumerProperties": 
            "concurrency": 1,
            "instanceCount": 1,
            "maxAttempts": 3,
            "backOffInitialInterval": 1000,
            "backOffMaxInterval": 10000,
            "backOffMultiplier": 2,
            "extension": 
                "ackEachRecord": false,
                "autoRebalanceEnabled": true,
                "autoCommitOffset": true,
                "autoCommitOnError": null,
                "startOffset": null,
                "resetOffsets": false,
                "enableDlq": false,
                "dlqName": null,
                "dlqProducerProperties": 
                    "bufferSize": 16384,
                    "compressionType": "none",
                    "sync": false,
                    "batchTimeout": 0,
                    "messageKeyExpression": null,
                    "headerPatterns": null,
                    "configuration": ,
                    "admin": 
                        "replicationFactor": null,
                        "replicasAssignments": ,
                        "configuration": 
                    
                ,
                "recoveryInterval": 5000,
                "trustedPackages": null,
                "standardHeaders": "none",
                "converterBeanName": null,
                "idleEventInterval": 30000,
                "configuration": ,
                "admin": 
                    "replicationFactor": null,
                    "replicasAssignments": ,
                    "configuration": 
                
            
        
    

]

复制步骤:

    从https://github.com/HLTan94/SpringCloudStreamBindingsDemo克隆项目

    设置spring.cloud.bus.enabled

    执行curl -X GET http://localhost:9999/bindings

【问题讨论】:

你的意思是http://localhost:9999/actuator/bindings 我设置了management.endpoints.web.base-path=/。所以它是http://localhost:9999/bindings,但不是http://localhost:9999/actuator/bindings 【参考方案1】:

我能够重现它并发现这实际上是我们这边的一个错误。您可以跟踪它here。无论如何,我们应该在 7 月底之前发布服务版本(即 2.0.2)

【讨论】:

以上是关于Spring Cloud Stream 绑定可视化的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream kafka绑定配置最大请求

如何为开发目的禁用 Spring Cloud Stream 绑定?

Spring Cloud Stream教程编程模型

使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列

具有多个路由键的Spring引导流绑定队列

Spring cloud stream 3.1 rocketmq踩坑记录