Spring Kafka中通过参数配置解决超时问题

Posted 编程一生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Kafka中通过参数配置解决超时问题相关的知识,希望对你有一定的参考价值。

* Do actual reads and writes to sockets. * * timeout The maximum amount of time to wait (in ms) for responses if there are none immediately, * must be non-negative. The actual timeout will be the minimum of timeout, request timeout and * metadata timeout * now The current time in milliseconds * The list of responses received */ List<ClientResponse> timeout, now)


人家明确说了是读写时才会调用。证实了思路一不可行。


SettableListenableFuture future = SettableListenableFuture(); future. future. future.addCallback((sendResult) -> System. (Exception e) , r -> System. ); System.out.println("============end==============");

就是说KafkaTemplate的异步是靠使用SettableListenableFuture实现的,实际上它的set方法会马上触发callback,是同步的。代码是先同步调用set,并且还手动调用了get(这个方法会等待直到返回结果)。所以整体是同步的。或者直接这么看,future实现异步要有一个Callable或者Runnable的线程方法,人家SettableListenableFuture第一行源码就禁用了Callable。这个我看了2.5.17.RELEASE这个更高版本的spring-kafka,实现没有做更改。


也就是说spring-kafka自身起码在2.5.X版本里异步没有起到作用。


问题清楚了修改也很简单,比如可以加个异步注解将整个发送方法做异步,重试等逻辑也放到这个方法中。给调用方只返回受理成功。具体怎么解决交给开发小哥哥。


总结


幸亏我上周已经提前规划好周一要休假。否则现在都2点半了明天上班也没精神。主要时间花在异步不生效的问题上。其实排查异步不生效的思路是很简单清晰的。耗时长是因为:第一,不敢相信spring官方实现的,竟然使用异步的代码实际效果没有异步;第二,关于异步我在网上搜索了一下,都是按照项目中配置的那样。官方这样说,大家这样说,我总得考虑是不是自己搞错了。


所以我反复的验证、反复的debug之后也不敢下结论。仔细研究了源码仍然不敢下结论。直到终于搜索到一篇文章说要实现异步除了要使用addCallback之外还要加异步标签。人间清醒的我,马上意识到文章实际用了两种不同方法实现异步。作者之所以认为这是一个方法的两个部分大概也是发现其实spring-kafka的异步没好使吧。


Spring中通过变量和import标签来控制加载哪些bean

  需求:根据设置变量,来加载某个spring的bean的配置文件,这个配置文件中,有某些使用的bean。在一些情况下,不希望这些bean被初始化和加载进context中,也不需要被外面访问到。

  在spring中,我们通过placeholder类可以读取配置文件,里面可以设置参数,而在配置文件或容器中使用他,如我们有一个配置文件common.properties,内容如下:

jdbc.url=
jdbc.user=
jdbc.password=

  有一个配置类,为JdbcConfig,如下:

package cc.eabour.webapp.jdbc;

public class JdbcConfig {

    private String url;
    private String user;
    private String password;
    
    /**
     * @return the url
     */
    public String getUrl() {
        return url;
    }
    /**
     * @param url the url to set
     */
    public void setUrl(String url) {
        this.url = url;
    }
    /**
     * @return the user
     */
    public String getUser() {
        return user;
    }
    /**
     * @param user the user to set
     */
    public void setUser(String user) {
        this.user = user;
    }
    /**
     * @return the password
     */
    public String getPassword() {
        return password;
    }
    /**
     * @param password the password to set
     */
    public void setPassword(String password) {
        this.password = password;
    }
    
    
}

  在spring-bean.xml中配置如下:

    <context:property-placeholder location="classpath:common.properties" />

    <bean class="cc.eabour.webapp.jdbc.JdbcConfig">
        <property name="url" value="${jdbc.url}"/>
        <property name="user" value="${jdbc.user}"/>
        <property name="password" value="${jdbc.password}"/>
    </bean>

  我们假如在某些特定的场景下,才去初始化这个类,此时,我们新增一个xml配置文件,通过Spring的import标签,将配置文件引入:

    <import resource="classpath*:springfox-bean-swagger.xml"/>

  而这个springfox-bean-swagger.xml文件中,就可以像其他的配置文件一样,来配置bean。如果想控制这个import标签引入不同的配置文件,并且通过变量来实现,如:

    <import resource="classpath*:springfox-${springfox.swagger.name:default}-swagger.xml"/>

  我们给变量添加默认值default,因为不设置的时候,没有默认值是会报错的,说是无法处理placeholder。此时,这个变量在哪设置呢?在properties文件中设置肯定是不行的,因为解析的时候,还没加载呢,根据报错位置和调试发现,默认处理placeholder的时候,会从环境变量和虚拟机的系统变量中去取,像System.getProperty和System.getEnv一样。这样的话,我们可以给JVM添加参数来设置,如 -Dspringfox.swagger.name=product,eclipse设置在tomcat的VM参数设置,如下:

 

 

  这样,通过import标签和变量,就可以控制加载哪个配置XML来初始化需要的bean。

 

以上是关于Spring Kafka中通过参数配置解决超时问题的主要内容,如果未能解决你的问题,请参考以下文章

Spring中通过变量和import标签来控制加载哪些bean

kafka发送消息的时候报超时,有人遇到过吗

在 spring-data-jpa 中通过布尔属性查询而不使用方法参数

Spring Boot中通过CORS解决跨域问题

在Spring中通过构造自动装配--constructor

在 Codeigniter 3 中通过 Office365 帐户发送电子邮件 - 连接超时