基于Spring Cloud的微服务落地
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Spring Cloud的微服务落地相关的知识,希望对你有一定的参考价值。
微服务架构模式的核心在于如何识别服务的边界,设计出合理的微服务。但如果要将微服务架构运用到生产项目上,并且能够发挥该架构模式的重要作用,则需要微服务框架的支持。
在Java生态圈,目前使用较多的微服务框架就是集成了包括Netfilix OSS以及Spring的Spring Cloud。它包括:
Spring Cloud Config:配置管理工具,支持使用Git存储配置内容,可以实现应用配置的外部化存储,支持客户端配置信息刷新、加密/解密配置内容等。
Spring Cloud Netflix:对Netflix OSS进行了整合。其中又包括:
Eureka:服务治理组件,包含服务注册中心、服务注册与发现。
Hystrix:容器管理组件,实现断路器模式,倘若依赖的服务出现延迟或故障,则提供强大的容错功能。
Ribbon:客户端负载均衡的服务调用组件。
Feign:基于Ribbon和Hystrix的声明式服务调用组件。
Zuul:网关组件,提供智能路由、访问过滤等功能。
Archaius:外部化配置组件。
Spring Cloud Bus:事件、消息总线。
Spring Cloud Cluster:针对ZooKeeper、Redis、Hazelcast、Consul的选举算法和通用状态模式的实现。
Spring Cloud Cloudfoundry:与Pivotal Cloudfoundry的整合支持。
Spring Cloud Consul:服务发现与配置管理工具。
Spring Cloud Stream:通过Redis、Rabbit或者Kafka实现的消息驱动的微服务。
Spirng Cloud AWS:简化和整合Amazon Web Service。
Spring Cloud Security:安全工具包,提供Zuul代理中对OAuth2客户端请求的中继器。
Spring Cloud Sleuth:Spring Cloud应用的分布式跟踪实现,可以整合Zipkin。
Spring Cloud ZooKeeper:基于ZooKeeper的服务发现与配置管理组件。
Spring Cloud Starters:Spring Cloud的基础组件,是基于Spring Boot风格项目的基础依赖模块。
Spring Cloud CLI:用于在Groovy中快速创建Spring Cloud应用的Spring Boot CLI插件。
服务治理
当一个系统的微服务数量越来越多的时候,我们就需要对服务进行治理,提供统一的服务注册中心,然后在其框架下提供发现服务的功能。这样就避免了对多个微服务的配置,以及微服务之间以及与客户端之间的耦合。
Spring Cloud Eureka是对Netflix Eureka的包装,用以实现服务注册与发现。Eureka服务端即服务注册中心,支持高可用配置。它依托于强一致性提供良好的服务实例可用性,并支持集群模式部署。Eureka客户端则负责处理服务的注册与发现。客户端服务通过annotation与参数配置的方式,嵌入在客户端应用程序代码中。在运行应用程序时,Eureka客户端向注册中心注册自身提供的服务,并周期性地发送心跳更新它的服务租约。
搭建服务注册中心
服务注册中心是一个独立部署的服务(你可以认为它也是一个微服务),所以需要单独为它创建一个项目,并在pom.xml中添加Eureka的依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka-server</artifactId></dependency>
创建Spring Boot Application:
@[email protected] class Application { public static void main(String[] args) { new SpringApplicationBuilder(Application.class).web(true).run(args);}}
注册服务提供者
要让自己编写的微服务能够注册到Eureka服务器中,需要在服务的Spring Boot Application中添加 @EnableDiscoveryClient
注解,如此才能让Eureka服务器发现该服务。当然,pom.xml文件中也需要添加相关依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka</artifactId></dependency>
同时,我们还需要为服务命名,并指定地址。这些信息都可以在application.properties配置文件中配置:
spring.application.name=demo-serviceeureka.client.serviceUrl.defaultZone=http://localhost:1111/eureka/
说明:Spring更推荐使用yml文件来维护系统的配置,yml文件可以体现出配置节的层次关系,表现力比单纯的key-value形式更好。如果结合使用后面讲到的Spring Cloud Config,则客户端的配置文件必须命名为bootstrap.properties或者bootstrap.yml。与上述配置相同的yml文件配置为:
spring: application: name: demo-serviceeureka: client: serviceUrl:defaultZone: http://localhost:1111/eureka/
服务发现与消费
在微服务架构下,许多微服务可能会扮演双重身份。一方面它是服务的提供者,另一方面它又可能是服务的消费者。注册在Eureka Server中的微服务可能会被别的服务消费。此时,就相当于在服务中创建另一个服务的客户端,并通过RestTemplate发起对服务的调用。为了更好地提高性能,可以在服务的客户端引入Ribbon,作为客户端负载均衡。
现在假定我们要为demo-service创建一个服务消费者demo-consumer。该消费者自身也是一个Spring Boot微服务,同时也能够被Eureka服务器注册。这时,就需要在该服务的pom.xml中添加eureka与ribbon的依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-eureka</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-ribbon</artifactId></dependency>
然后在主应用类 ConosumerApplication
中注入 RestTemplate
,并引入 @LoadBalanced
注解开启客户端负载均衡:
@[email protected] class ConsumerApplication { @[email protected] restTemplate() { return new RestTemplate();} public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args)}}
假设消费demo-service的客户端代码写在demo-consumer服务的其中一个Controller中:
@RestControllerpublic class ConsumerController { @AutowiredRestTemplate restTemplate; @RequestMapping(value = "/demo-consumer", method = RequestMethod.Get)public String helloConsumer() { return restTemplate.getForEntity("http://demo-service/demo", String.class).getBody();}}
通过 RestTemplate
就可以发起对demo-service的消费调用。
声明式服务调用
通过Ribbon和Hystrix可以实现对微服务的调用以及容错保护,但Spring Cloud还提供了另一种更简单的声明式服务调用方式,即Spring Cloud Feign。Feign实际上就是对Ribbon与Hystrix的进一步封装。通过Feign,我们只需创建一个接口并用annotation的方式配置,就可以完成对服务供应方的接口(REST API)绑定。
假设我们有三个服务:
Notification Service
Account Service
Statistics Service
服务之间的依赖关系如下图所示:
要使用Feign来完成声明式的服务调用,需要在作为调用者的服务中创建Client。Client通过Eureka Server调用注册的对应服务,这样可以解除服务之间的耦合。结构如下图所示:
为了使用Feign,需要对应微服务的pom.xml文件中添加如下依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-feign</artifactId></dependency>
同时,还需要在被消费的微服务Application中添加 @EnableFeignClients
注解。例如在Statistics服务的应用程序类中:
@[email protected]@EnableFeignClientspublic class StatisticsApplication { public static void main(String[] args) { SpringApplication.run(StatisticsApplication.class, args);}}
由于Account服务需要调用Statistics服务,因此需要在Account服务项目中增加对应的client接口:
@FeignClient(name = "statistics-service")public interface StatisticsServiceClient { @RequestMapping(method = RequestMethod.PUT, value = "/statistics/{accountName}", consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)void updateStatistics(@PathVariable("accountName") String accountName, Account account);}
StatisticsServiceClient接口的 updateStatistics()
方法会调用URI为 /statistics/{accountName}
的REST服务,且HTTP动词为put。这个服务其实对应就是Statistics Service中StatisticsController类中的 saveStatistics()
方法:
@RestControllerpublic class StatisticsController { @Autowiredprivate StatisticsService statisticsService; @RequestMapping(value = "/{accountName}", method = RequestMethod.PUT)public void saveStatistics(@PathVariable String accountName, @Valid @RequestBody Account account) { statisticsService.save(accountName, account);}}
在Account服务中,如果要调用Statistics服务,都应该通过StatisticsServiceClient接口进行调用。例如,Account服务中的AccountServiceImpl要调用 updateStatistics()
方法,就可以在该类的实现中通过 @autowired
注入StatisticsServiceClient接口:
@Servicepublic class AccountServiceImpl implements AccountService { @Autowiredprivate StatisticsServiceClient statisticsClient; @Autowiredprivate AccountRepository repository; @Overridepublic void saveChanges(String name, Account update) { //...statisticsClient.updateStatistics(name, account);}}
Notification服务对Account服务的调用如法炮制。
服务容错保护
在微服务架构中,微服务之间可能存在依赖关系,例如Notification Service会调用Account Service,Account Service调用Statistics Service。真实产品中,微服务之间的调用会更加寻常。倘若上游服务出现了故障,就可能会因为依赖关系而导致故障的蔓延,最终导致整个系统的瘫痪。
Spring Cloud Hystrix通过实现断路器(Circuit Breaker)模式以及线程隔离等功能,实现服务的容错保护。
仍然参考前面的例子。现在系统的微服务包括:
上游服务:demo-service
下游服务:demo-consumer
Eureka服务器:eureka-server
假设上游服务可能会出现故障,为保证系统的健壮性,需要在下游服务中加入容错包含功能。首先需要在demo-consumer服务中添加对hystrix的依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-hystrix</artifactId></dependency>
然后在demo-consumer的应用程序类中加入 @EnableCircuitBreaker
开启断路器功能:
@EnableCircuitBrea[email protected]@SpringBootApplicationpublic class ConsumerApplication { @[email protected] restTemplate() { return new RestTemplate();} public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args)}}
注意:Spring Cloud提供了 @SpringCloudApplication
注解简化如上代码。该注解事实上已经包含了前面所述的三个注解。 @SpringCloudApplication
注解的定义如下所示:
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@[email protected]@[email protected]@EnableCircuitBreakerpublic @interface SpringCloudApplication {}
接下来,需要引入一个新的服务类来封装hystrix提供的断路器保护功能,主要是定义当故障发生时需要执行的回调逻辑,即代码中指定的fallbackMethod:
@Servicepublic class ConsumerService { @AutowiredRestTemplate restTemplate; @HystrixCommand(fallbackMethod = "consumerFallback")public String consume() { return restTemplate.getForEntity("http://demo-service/demo", String.class).getBody();} public String consumerFallback() { return "error";}}@RestControllerpublic class ConsumerController { @AutowiredConsumerService consumerService; @RequestMapping(value = "/demo-consumer", method = RequestMethod.Get)public String helloConsumer() { return consumerService.consume();}}
服务监控
微服务架构将服务的粒度分解的足够细,这使得它在保证服务足够灵活、足够独立的优势下,也带来了管理和监控上的挑战,服务与服务之间的依赖也变得越来越复杂。因此,对服务健康度和运行指标的监控就变得非常重要。
Hystrix提供了Dashboard用以监控Hystrix的各项指标信息。为了监控整个系统的微服务,我们需要为Hystrix Dashboard建立一个Spring Boot微服务。在该服务项目的pom文件中,添加如下依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-hystrix</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-hystrix-dashboard</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-actuator</artifactId></dependency>
服务的Application类需要添加 @EnableHystrixDashboard
,以启用Hystrix Dashboard功能。同时,可能需要根据实际情况修改application.properties配置文件,例如选择可用的端口号等。
如果要实现对集群的监控,则需要加入Turbine。
API网关
理论上,客户端可以直接向每个微服务直接发送请求。但是这种方式是存在挑战和限制的,调用者需要知道所有端点的地址,分别对每一段信息执行http请求,然后将结果合并到客户端。
一般而言,针对微服务架构模式的系统,采用的都是 前后端分离 的架构。为了明显地隔离开前端与后端的边界,我们通常可以专门为前端的消费者定义更加粗粒度的Open Service。这些Open Service是对外的RESTful API服务,可以通过F5、nginx等网络设备或工具软件实现对各个微服务的路由与负载均衡,并公开给外部的客户端调用(注意,内部微服务之间的调用并不需要通过Open Service)。这种对外公开的Open Service通常又被称为边缘服务(edge service)。
如果这些Open Service需要我们自己去开发实现并进行服务的运维,在系统规模不断增大的情况下,会变得越来越困难。例如,当增加了新的微服务又或者IP地址发生变动时,都需要运维人员手工维护这些路由规则与服务实例列表。又例如针对所有垂直分隔的微服务,不可避免存在重用的横切关注点,例如用户身份认证、授权或签名校验等机制。我们不能在所有微服务中都去添加这些相同的功能,因为这会造成横切关注点的冗余。
解决的办法是引入API网关(API Gateway)。它是系统的单个入口点,用于通过将请求路由到适当的后端服务或者通过调用多个后端服务并聚合结果来处理请求。此外,它还可以用于认证、insights、压力测试、金丝雀测试(canary testing)、服务迁移、静态响应处理和主动变换管理。Spring Cloud为API网关提供的解决方案就是Spring Cloud Zuul,它是对Netflix Zuul的包装。
路由规则与服务实例维护
Zuul解决路由规则与服务实例维护的方法是通过Spring Cloud Eureka。API Gateway自身就是一个Spring Boot服务,该服务自身被注册为Eureka服务治理下的应用,同时它会从Eureka中获得所有其他微服务的实例信息。这样的设计符合DRY原则,因为Eureka已经维护了一套服务实例信息,Zuul直接重用了这些信息,无需人工介入。
对于路由规则,Zuul默认会将服务名作为ContextPath创建路由映射,基本上这种路由映射机制就可以满足微服务架构的路由需求。倘若需要一些特殊的配置,Zuul也允许我们自定义路由规则,可以通过在API网关的Application类中创建PatternServiceRouteMapper来定义自己的规则。
横切关注点
诸如授权认证、签名校验等业务逻辑本身与微服务应用所要处理的业务逻辑没有直接关系,我们将这些可能横跨多个微服务的功能称为“横切关注点”。这些横切关注点往往会作为“装饰”功能在服务方法的前后被调用。Spring Cloud Zuul提供了一套 过滤器机制 ,允许开发者创建各种过滤器,并指定哪些规则的请求需要执行哪个过滤器。
自定义的过滤器继承自ZuulFilter类。例如我们要求客户端发过来的请求在路由之前需要先验证请求中是否包含accessToken参数,如果有就进行路由,否则就拒绝,并返回401 Unauthorized错误,则可以定义AccessFilter类:
public class AccessFilter extends ZuulFilter { private static Logger log = LoggerFactory.getLogger(AccessFilter.class); @Overridepublic String filterType() { return "pre"} @Overridepublic int filterOrder() { return 0;} @Overridepublic boolean shouldFilter() { return true;} @Overridepublic Object run() {RequestContext ctx = RequestContext.getCurrentContext();HttpServletRequest request = ctx.getRequest();log.info("send {} request to {}", request.getMethod(), request.getRequestURL().toString());Object accessToken = request.getParameter("accessToken"); if (accessToken == null) {log.warn("access token is empty");ctx.setSendZuulResponse(false);ctx.setResponseStatusCode(401); return null;}log.info("access token ok"); return null;}}
要让该自定义过滤器生效,还需要在Zuul服务的Application中创建具体的Bean:
@[email protected] class ZuulApplication { public static void main(String[] args) { new SpringApplicatonBuilder(ZuulApplication.class).web(true).run(args);}@Beanpublic AccessFilter accessFilter() { return new AccessFilter();}}
Zuul一共提供了四种过滤器:
pre filter
routing filter
post filter
error filter
下图来自官网,它展现了客户端请求到达Zuul API网关的生命周期与过滤过程:
通过starter添加Zuul的依赖时,自身包含了spring-cloud-starter-hystrix与spring-cloud-starter-ribbon模块的依赖,因此Zuul自身就拥有线程隔离与断路器的服务容错功能,以及客户端负载均衡。但是,倘若我们使用path与url的映射关系来配置路由规则,则路由转发的请求并不会采用HystrixCommand来包装,因而这类路由是没有服务容错与客户端负载均衡作用的。所以在使用Zuul时,应尽量使用path和serviceId的组合对路由进行配置。
分布式配置中心
为什么要引入一个分布式配置中心?一个微服务就需要至少一个配置文件,怎么管理分散在各个微服务中的配置文件呢?如果微服务采用的是不同的技术栈,如何来统一微服务的配置呢?微服务是部署在不同的节点中,显然我们无法在单机中实现对分布式节点的配置管理。这就是引入Spring Cloud Config的目的。
Spring Cloud Config提供了服务端和客户端支持。服务端是一个独立的微服务,同样可以注册到Eureka服务器中。每个需要使用分布式配置中心的微服务都是Spring Cloud Config的客户端。Spring Cloud Config默认实现基于Git仓库,既可以进行版本管理,还可以通过本地Git库起到缓存作用。Spring Cloud Config不限于基于Spring Cloud开发的系统,而是可以用于任何语言开发的程序,并支持自定义实现。
配置中心服务端
Spring Cloud Config Server作为配置中心服务端,提供如下功能:
拉取配置时更新git仓库副本,保证是最新结果
支持数据结构丰富,yml, json, properties等
配合Eureke可实现服务发现,配合cloud bus可实现配置推送更新
配置存储基于git仓库,可进行版本管理
简单可靠,有丰富的配套方案
建立一个Config服务,需要添加如下依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-server</artifactId></dependency>
服务的Application类需要添加 @EnableConfigServer
注解:
@[email protected] class ConfigApplication { public static void main(String[] args) { SpringApplication.run(ConfigApplication.class, args);}}
配置服务的基本信息和Git仓库的信息放在application.yml文件中:
spring: cloud: config: server: git: uri: http://localhost/workspace/springcloud-demo username: user password: passwordserver: port: 8888security: user: password: ${CONFIG_SERVICE_PASSWORD}
Git库与配置服务
在Config服务中配置了Git服务器以及Git库的信息后,我们就可以在git库中提交配置文件。存储在git库中配置文件的名字以及分支名(默认为master分支)会组成访问Config服务的URI。假设有一个服务为Notification服务,则它在配置中心服务端的配置文件为notification-dev.yml,内容如下:
devMode: truespring: application: name: notification jdbc: host: localhost port: 3306 user: root password: 123456logging: file: demo
配置中心客户端
需要读取配置中心服务端信息的微服务都是配置中心的客户端,为了能够读取配置服务端的信息,这些微服务需要:
在pom中添加对spring-cloud-starter-config的依赖
在bootstrap.properties或者bootstrap.yml中配置获取配置的config-server位置
例如,Account服务的配置是由Spring Cloud Config进行管理的。在它的资源目录下,提供了bootstrap.yml配置文件,内容如下所示:
spring: application: name: account-service cloud: config: uri: http://config:8888 fail-fast: true password: ${CONFIG_SERVICE_PASSWORD} username: user
注意,该配置文件除了配置了该Account服务应用的name之外,主要是支持该应用获得配置服务端的信息。微服务自身的配置信息则统一放到配置中心服务端的文件中,并由Git库进行管理。例如,Account服务的详细配置在配置中心服务端的account-dev.yml文件中:
security: oauth2: client: clientId: account-service clientSecret: ${ACCOUNT_SERVICE_PASSWORD} accessTokenUri: http://auth-service:5000/uaa/oauth/token grant-type: client_credentials scope: serverspring: data: mongodb: host: account-mongodb username: user password: ${MONGODB_PASSWORD} database: piggymetrics port: 27017server: context-path: /accounts port: 6000
Spring Cloud Config通过Git实现分布式的配置管理。当配置中心服务端的配置信息发生变更时,各个作为配置客户端的微服务会向Git库提交pull更新,获得最新的配置信息。
当然,Spring Cloud Config还可以使用SVN库进行配置管理,也支持简单的本地文件系统的存储方式。此时需要将 spring.profiles.active
设置为native,并设置搜索配置文件的路径。如果不配置路径,默认在 src/main/resources
目录下搜索。如下配置文件:
spring: cloud: config: server: native: search-locations: classpath:/shared profiles: active: native
搜索路径放在classpath下的shared目录下,那么在代码中,目录就是 resources/shared
。如果使用本地文件系统管理配置文件,则无法支持分布式配置管理以及版本管理,因此在生产系统下,还是推荐使用Git库的方式。
总结
在实施微服务时,我们可以将微服务视为两个不同的边界。一个是与前端UI的通信,称为Open Service(Edge Service),通过引入API Gateway来实现与前端UI的通信。另一个是在边界内业务微服务之间的通信,通过Feign实现微服务之间的协作。所有的微服务都会通过Eureka来完成微服务的注册与发现。一个典型的基于Spring Cloud的微服务架构如下所示:
微服务的集成可以通过Feign+Ribbon以RESTful方式实现通信,也可以基于RPC方式(可以结合Protocol Buffer)完成服务之间的通信,甚至可以通过发布事件与订阅事件的机制。事件机制可以使微服务之间更加松散耦合。这时,我们可以引入RabbitMQ或Kafka来做到服务与服务之间的解耦。事件机制是异步和非阻塞的,在某些业务场景下,它的性能会更加的好。Spring Cloud也提供了相关的组件Spring Cloud Stream来支持这种事件机制。
看到一篇关于线程池写的很透彻的文章
一简介
线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用。为我们在开发中处理线程的问题提供了非常大的帮助。
二:线程池
线程池的作用:
线程池作用就是限制系统中执行线程的数量。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
为什么要用线程池:
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
比较重要的几个类:
ExecutorService真正的线程池接口。
ScheduledExecutorService能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。
ThreadPoolExecutorExecutorService的默认实现。
ScheduledThreadPoolExecutor继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
1. newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2.newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3. newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
4.newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
实例
1:newSingleThreadExecutor
MyThread.java
publicclassMyThread extends Thread {
@Override
publicvoid run() {
System.out.println(Thread.currentThread().getName() + "正在执行。。。");
}
}
TestSingleThreadExecutor.java
publicclassTestSingleThreadExecutor {
publicstaticvoid main(String[] args) {
//创建一个可重用固定线程数的线程池
ExecutorService pool = Executors. newSingleThreadExecutor();
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//关闭线程池
pool.shutdown();
}
}
输出结果
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
2newFixedThreadPool
TestFixedThreadPool.Java
publicclass TestFixedThreadPool {
publicstaticvoid main(String[] args) {
//创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//关闭线程池
pool.shutdown();
}
}
输出结果
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
3 newCachedThreadPool
TestCachedThreadPool.java
publicclass TestCachedThreadPool {
publicstaticvoid main(String[] args) {
//创建一个可重用固定线程数的线程池
ExecutorService pool = Executors.newCachedThreadPool();
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new MyThread();
Thread t2 = new MyThread();
Thread t3 = new MyThread();
Thread t4 = new MyThread();
Thread t5 = new MyThread();
//将线程放入池中进行执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
//关闭线程池
pool.shutdown();
}
}
输出结果:
pool-1-thread-2正在执行。。。
pool-1-thread-4正在执行。。。
pool-1-thread-3正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-5正在执行。。。
4newScheduledThreadPool
TestScheduledThreadPoolExecutor.java
publicclass TestScheduledThreadPoolExecutor {
publicstaticvoid main(String[] args) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间就触发异常
@Override
publicvoid run() {
//throw new RuntimeException();
System.out.println("================");
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间打印系统时间,证明两者是互不影响的
@Override
publicvoid run() {
System.out.println(System.nanoTime());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}
输出结果
================
8384644549516
8386643829034
8388643830710
================
8390643851383
8392643879319
8400643939383
三:ThreadPoolExecutor详解
ThreadPoolExecutor的完整构造方法的签名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize-池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
ThreadPoolExecutor是Executors类的底层实现。
在JDK帮助文档中,有如此一段话:
“强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)
它们均为大多数使用场景预定义了设置。”
下面介绍一下几个类的源码:
ExecutorService newFixedThreadPool (int nThreads):固定大小线程池。
可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。
1. public static ExecutorService newFixedThreadPool(int nThreads) {
2. return new ThreadPoolExecutor(nThreads, nThreads,
3. 0L, TimeUnit.MILLISECONDS,
4. new LinkedBlockingQueue());
5. }
ExecutorService newSingleThreadExecutor():单线程
1. public static ExecutorService newSingleThreadExecutor() {
2. return new FinalizableDelegatedExecutorService
3. (new ThreadPoolExecutor(1, 1,
4. 0L, TimeUnit.MILLISECONDS,
5. new LinkedBlockingQueue()));
6. }
ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收
这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。
1. public static ExecutorService newCachedThreadPool() {
2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3. 60L, TimeUnit.SECONDS,
4. new SynchronousQueue());
}
先从BlockingQueue workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。
所有BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
如果运行的线程少于 corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)
如果运行的线程等于或多于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
queue上的三种类型。
排队有三种通用策略:
直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
BlockingQueue的选择。
例子一:使用直接提交策略,也即SynchronousQueue。
首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。
我们使用一下参数构造ThreadPoolExecutor:
1. new ThreadPoolExecutor(
2. 2, 3, 30, TimeUnit.SECONDS,
3. new SynchronousQueue(),
4. new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue(),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
当核心线程已经有2个正在运行.
此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中。
例子二:使用无界队列策略,即LinkedBlockingQueue
这个就拿newFixedThreadPool来说,根据前文提到的规则:
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。那么当任务继续增加,会发生什么呢?
如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,不一会儿就爆了。
例子三:有界队列,使用ArrayBlockingQueue。
这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。
举例来说,请看如下构造方法:
1. new ThreadPoolExecutor(
2. 2, 4, 30, TimeUnit.SECONDS,
3. new ArrayBlockingQueue(2),
4. new RecorderThreadFactory("CookieRecorderPool"),
5. new ThreadPoolExecutor.CallerRunsPolicy());
new ThreadPoolExecutor(
2, 4, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue(2),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
假设,所有的任务都永远无法执行完。
对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queue中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。
keepAliveTime
jdk中的解释是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
有点拗口,其实这个不难理解,在使用了“池”的应用中,大多都有类似的参数需要配置。比如数据库连接池,DBCP中的maxIdle,minIdle参数。
什么意思?接着上面的解释,后来向老板派来的工人始终是“借来的”,俗话说“有借就有还”,但这里的问题就是什么时候还了,如果借来的工人刚完成一个任务就还回去,后来发现任务还有,那岂不是又要去借?这一来一往,老板肯定头也大死了。
合理的策略:既然借了,那就多借一会儿。直到“某一段”时间后,发现再也用不到这些工人时,便可以还回去了。这里的某一段时间便是keepAliveTime的含义,TimeUnit为keepAliveTime值的度量。
RejectedExecutionHandler
另一种情况便是,即使向老板借了工人,但是任务还是继续过来,还是忙不过来,这时整个队伍只好拒绝接受了。
RejectedExecutionHandler接口提供了对于拒绝任务的处理的自定方法的机会。在ThreadPoolExecutor中已经默认包含了4中策略,因为源码非常简单,这里直接贴出来。
CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2. if (!e.isShutdown()) {
3. r.run();
4. }
5. }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。
AbortPolicy:处理程序遭到拒绝将抛出运行时RejectedExecutionException
1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2. throw new RejectedExecutionException();
3. }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
这种策略直接抛出异常,丢弃任务。
DiscardPolicy:不能执行的任务将被删除
1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2. }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2. if (!e.isShutdown()) {
3. e.getQueue().poll();
4. e.execute(r);
5. }
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
设想:如果其他线程都还在运行,那么新来任务踢掉旧任务,缓存在queue中,再来一个任务又会踢掉queue中最老任务。
总结:
keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。
反之,如果核心数较小,有界BlockingQueue数值又较小,同时keepAliveTime又设的很小,如果任务频繁,那么系统就会频繁的申请回收线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
大家可以点击加入群:454377428【JAVA大牛学习交流】
里面有Java高级大牛直播讲解知识点 走的就是高端路线
(如果你想跳槽换工作 但是技术又不够 或者工作上遇到了
瓶颈 我这里有一个JAVA的免费直播课程 讲的是高端的知识点
基础不好的误入哟 只要你有1-5年的开发经验
可以加群找我要课堂链接 注意:是免费的 没有开发经验误入哦)
1、具有1-5工作经验的,面对目前流行的技术不知从何下手,
需要突破技术瓶颈的。
2、在公司待久了,过得很安逸,
但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的。
3、如果没有工作经验,但基础非常扎实,对java工作机制,
常用设计思想,常用java开发框架掌握熟练的。
4、觉得自己很牛B,一般需求都能搞定。
但是所学的知识点没有系统化,很难在技术领域继续突破的。
群号:高级架构群 454377428备注好信息!
6.阿里Java高级大牛直播讲解知识点,分享知识,
多年工作经验的梳理和总结,带着大家全面、
科学地建立自己的技术体系和技术认知!
对于微服务之间的协作,到底选择Feign这种REST方式、事件机制或者RPC方式,取决于业务场景是否需要同步方式,还是异步方式;是高性能高并发,还是普通方式;是要求彻底解耦,还是做到一般的松散耦合。我们需要针对实际情况作出实际的判断,作出正确的选择。没有谁坏谁好之分,而是看谁更加的适合。
以上是关于基于Spring Cloud的微服务落地的主要内容,如果未能解决你的问题,请参考以下文章
基于Spring Cloud的微服务构建学习-3 服务治理:Spring Cloud Eureka
基于Spring Cloud的微服务构建学习-3 Spring Cloud Eureka配置详解