Spring Kafka中通过参数配置解决超时问题
Posted 编程一生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Kafka中通过参数配置解决超时问题相关的知识,希望对你有一定的参考价值。
* Do actual reads and writes to sockets. 人家明确说了是读写时才会调用。证实了思路一不可行。 SettableListenableFuture future = SettableListenableFuture(); 就是说KafkaTemplate的异步是靠使用SettableListenableFuture实现的,实际上它的set方法会马上触发callback,是同步的。代码是先同步调用set,并且还手动调用了get(这个方法会等待直到返回结果)。所以整体是同步的。或者直接这么看,future实现异步要有一个Callable或者Runnable的线程方法,人家SettableListenableFuture第一行源码就禁用了Callable。这个我看了2.5.17.RELEASE这个更高版本的spring-kafka,实现没有做更改。 也就是说spring-kafka自身起码在2.5.X版本里异步没有起到作用。 问题清楚了修改也很简单,比如可以加个异步注解将整个发送方法做异步,重试等逻辑也放到这个方法中。给调用方只返回受理成功。具体怎么解决交给开发小哥哥。 总结 幸亏我上周已经提前规划好周一要休假。否则现在都2点半了明天上班也没精神。主要时间花在异步不生效的问题上。其实排查异步不生效的思路是很简单清晰的。耗时长是因为:第一,不敢相信spring官方实现的,竟然使用异步的代码实际效果没有异步;第二,关于异步我在网上搜索了一下,都是按照项目中配置的那样。官方这样说,大家这样说,我总得考虑是不是自己搞错了。 所以我反复的验证、反复的debug之后也不敢下结论。仔细研究了源码仍然不敢下结论。直到终于搜索到一篇文章说要实现异步除了要使用addCallback之外还要加异步标签。人间清醒的我,马上意识到文章实际用了两种不同方法实现异步。作者之所以认为这是一个方法的两个部分大概也是发现其实spring-kafka的异步没好使吧。 需求:根据设置变量,来加载某个spring的bean的配置文件,这个配置文件中,有某些使用的bean。在一些情况下,不希望这些bean被初始化和加载进context中,也不需要被外面访问到。 在spring中,我们通过placeholder类可以读取配置文件,里面可以设置参数,而在配置文件或容器中使用他,如我们有一个配置文件common.properties,内容如下: 有一个配置类,为JdbcConfig,如下: 在spring-bean.xml中配置如下: 我们假如在某些特定的场景下,才去初始化这个类,此时,我们新增一个xml配置文件,通过Spring的import标签,将配置文件引入: 而这个springfox-bean-swagger.xml文件中,就可以像其他的配置文件一样,来配置bean。如果想控制这个import标签引入不同的配置文件,并且通过变量来实现,如: 我们给变量添加默认值default,因为不设置的时候,没有默认值是会报错的,说是无法处理placeholder。此时,这个变量在哪设置呢?在properties文件中设置肯定是不行的,因为解析的时候,还没加载呢,根据报错位置和调试发现,默认处理placeholder的时候,会从环境变量和虚拟机的系统变量中去取,像System.getProperty和System.getEnv一样。这样的话,我们可以给JVM添加参数来设置,如 -Dspringfox.swagger.name=product,eclipse设置在tomcat的VM参数设置,如下:
这样,通过import标签和变量,就可以控制加载哪个配置XML来初始化需要的bean。 以上是关于Spring Kafka中通过参数配置解决超时问题的主要内容,如果未能解决你的问题,请参考以下文章 Spring中通过变量和import标签来控制加载哪些bean *
* timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
* must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
* metadata timeout
* now The current time in milliseconds
* The list of responses received
*/
List<ClientResponse> timeout, now)
future. future. future.addCallback((sendResult) ->
System. (Exception e)
, r ->
System. );
System.out.println("============end==============");
Spring中通过变量和import标签来控制加载哪些bean
jdbc.url=
jdbc.user=
jdbc.password=
package cc.eabour.webapp.jdbc;
public class JdbcConfig {
private String url;
private String user;
private String password;
/**
* @return the url
*/
public String getUrl() {
return url;
}
/**
* @param url the url to set
*/
public void setUrl(String url) {
this.url = url;
}
/**
* @return the user
*/
public String getUser() {
return user;
}
/**
* @param user the user to set
*/
public void setUser(String user) {
this.user = user;
}
/**
* @return the password
*/
public String getPassword() {
return password;
}
/**
* @param password the password to set
*/
public void setPassword(String password) {
this.password = password;
}
}
<context:property-placeholder location="classpath:common.properties" />
<bean class="cc.eabour.webapp.jdbc.JdbcConfig">
<property name="url" value="${jdbc.url}"/>
<property name="user" value="${jdbc.user}"/>
<property name="password" value="${jdbc.password}"/>
</bean>
<import resource="classpath*:springfox-bean-swagger.xml"/>
<import resource="classpath*:springfox-${springfox.swagger.name:default}-swagger.xml"/>