动态线程池Dinamic-Tp(接入篇)

Posted gogoed

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了动态线程池Dinamic-Tp(接入篇)相关的知识,希望对你有一定的参考价值。

这世界上没有优秀的理念,只有脚踏实地的结果 。 

线程池系列:

【Executors】线程池的4种常见创建方式

【ThreadPoolExecutor】自定义线程池详解(一篇透彻)

 动态线程池Dinamic-Tp(基础篇)

 动态线程池Dinamic-Tp(接入篇)

前言

DynamicTp项目地址:

官网:首页 | dynamic-tp

gitee地址:https://gitee.com/dromara/dynamic-tp

github地址:https://github.com/dromara/dynamic-tp

本文主要讲一下怎么接入dynamic-tp

一、配置中心

 后续我们以spring boot  集成 nacos 应用接入为例。

提示

  1. 动态线程池配置文件,建议单独开一个文件放在 nacos 中,如:user-center-dtp-dev.yml

  2. 如果配置中心支持 yml 格式,建议最好使用 yml 文件配置,可读性、可操作性更友好

  3. 给出的是全配置项,使用不到的项或者使用默认值的项都可以删除,减少配置量

  4. 集成失败 90% 以上情况都是使用姿势有误 / 版本兼容问题,有版本兼容性问题可以提 Issues 或加群反馈。

  5. nacos-config-spring-boot-starter 0.2.10 及以下版本对应 springboot 2.3.12.RELEASE 及以下版本

  6. 0.2.11-beta 及以上版本对应 springboot 版本 2.4.0 及以上版本,具体看官方说明

二、maven 依赖

<dependency>
        <groupId>cn.dynamictp</groupId>
        <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
        <version>1.1.0</version>
 </dependency>

三、线程池配置文件

3.1 yml 格式

spring:
  dynamic:
    tp:
      enabled: true
      enabledCollect: true                    # 是否开启监控指标采集,默认false
      collectorTypes: micrometer,logging      # 监控数据采集器类型(logging | micrometer | internal_logging),默认micrometer
      logPath: /home/logs                     # 监控日志数据路径,默认 $user.home/logs,采集类型非logging不用配置
      monitorInterval: 5                      # 监控时间间隔(报警检测、指标采集),默认5s
      nacos:                                  # nacos配置,不配置有默认值(user-center-dev.yml这样)
        dataId: dynamic-tp-demo-dev.yml       # 该配置文件的 dataId,如上述 user-center-dtp-dev.yml
        group: DEFAULT_GROUP
      configType: yml                         # 配置文件类型
      platforms:                              # 通知报警平台配置
        - platform: wechat
          urlKey: 3a700-127-4bd-a798-c53d8b69c     # 替换
          receivers: test1,test2                   # 接受人企微名称
        - platform: ding
          urlKey: f80dad441fcd655438f4a08dcd6a     # 替换
          secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式可以没有此值
          receivers: 18888888888                   # 钉钉账号手机号
        - platform: lark
          urlKey: 0d944ae7-b24a-40                 # 替换
          receivers: test1,test2                   # 接受人飞书名称/openid
        - platform: email
          receivers: 123456@qq.com,789789@qq.com   # 收件人
      executors:                                   # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量
        - threadPoolName: dtpExecutor1
          threadPoolAliasName: 测试线程池             # 线程池别名
          executorType: common                     # 线程池类型common、eager:适用于io密集型
          corePoolSize: 6
          maximumPoolSize: 8
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue   # 任务队列,查看源码QueueTypeEnum枚举类
          rejectedHandlerType: CallerRunsPolicy    # 拒绝策略,查看RejectedTypeEnum枚举类
          keepAliveTime: 50
          allowCoreThreadTimeOut: false                  # 是否允许核心线程池超时
          threadNamePrefix: test                         # 线程名前缀
          waitForTasksToCompleteOnShutdown: false        # 参考spring线程池设计,优雅关闭线程池
          awaitTerminationSeconds: 5                     # 单位(s)
          preStartAllCoreThreads: false                  # 是否预热所有核心线程,默认false
          runTimeout: 200                                # 任务执行超时阈值,目前只做告警用,单位(ms)
          queueTimeout: 100                              # 任务在队列等待超时阈值,目前只做告警用,单位(ms)
          taskWrapperNames: ["ttl", "mdc"]               # 任务包装器名称,继承TaskWrapper接口
          notifyEnabled: true                            # 是否开启报警,默认true
          notifyItems:                     # 报警项,不配置自动会按默认值配置(变更通知、容量报警、活性报警、拒绝报警、任务超时报警)
            - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类
              enabled: true
              threshold: 80                # 报警阈值
              platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台
              interval: 120                # 报警间隔(单位:s)
            - type: change
              enabled: true
            - type: liveness
              enabled: true
              threshold: 80
            - type: reject
              enabled: true
              threshold: 1
            - type: run_timeout
              enabled: true
              threshold: 1
            - type: queue_timeout
              enabled: true
              threshold: 1

 3.2 properties 格式

spring.dynamic.tp.enabled=true
spring.dynamic.tp.enabledCollect=true
spring.dynamic.tp.collectorTypes=micrometer,logging
spring.dynamic.tp.logPath=/home/logs
spring.dynamic.tp.monitorInterval=5
spring.dynamic.tp.nacos.dataId=dynamic-tp-demo-dev.properties
spring.dynamic.tp.nacos.group=DEFAULT_GROUP
spring.dynamic.tp.configType=properties
spring.dynamic.tp.platforms[0].platform=wechat
spring.dynamic.tp.platforms[0].urlKey=3a700-127-4bd-a798-c53d8b69c
spring.dynamic.tp.platforms[0].receivers=test1,test2
spring.dynamic.tp.platforms[1].platform=ding
spring.dynamic.tp.platforms[1].urlKey=f80dad441fcd655438f4a08dcd6a
spring.dynamic.tp.platforms[1].secret=SECb5441fa6f375d5b9d21
spring.dynamic.tp.platforms[1].receivers=18888888888
spring.dynamic.tp.platforms[2].platform=lark
spring.dynamic.tp.platforms[2].urlKey=0d944ae7-b24a-40
spring.dynamic.tp.platforms[2].receivers=test1,test2
spring.dynamic.tp.platforms[3].platform=email
spring.dynamic.tp.platforms[3].receivers=123456@qq.com,789789@qq.com
spring.dynamic.tp.executors[0].threadPoolName=dtpExecutor1
spring.dynamic.tp.executors[0].threadPoolAliasName=测试线程池
spring.dynamic.tp.executors[0].executorType=common
spring.dynamic.tp.executors[0].corePoolSize=6
spring.dynamic.tp.executors[0].maximumPoolSize=8
spring.dynamic.tp.executors[0].queueCapacity=200
spring.dynamic.tp.executors[0].queueType=VariableLinkedBlockingQueue
spring.dynamic.tp.executors[0].rejectedHandlerType=CallerRunsPolicy
spring.dynamic.tp.executors[0].keepAliveTime=50
spring.dynamic.tp.executors[0].allowCoreThreadTimeOut=false
spring.dynamic.tp.executors[0].threadNamePrefix=test
spring.dynamic.tp.executors[0].waitForTasksToCompleteOnShutdown=false
spring.dynamic.tp.executors[0].awaitTerminationSeconds=5
spring.dynamic.tp.executors[0].preStartAllCoreThreads=false
spring.dynamic.tp.executors[0].runTimeout=200
spring.dynamic.tp.executors[0].queueTimeout=100
spring.dynamic.tp.executors[0].taskWrapperNames[0]=ttl
spring.dynamic.tp.executors[0].taskWrapperNames[1]=mdc
spring.dynamic.tp.executors[0].notifyEnabled=true
spring.dynamic.tp.executors[0].notifyItems[0].type=capacity
spring.dynamic.tp.executors[0].notifyItems[0].enabled=true
spring.dynamic.tp.executors[0].notifyItems[0].threshold=80
spring.dynamic.tp.executors[0].notifyItems[0].platforms[0]=ding
spring.dynamic.tp.executors[0].notifyItems[0].platforms[1]=wechat
spring.dynamic.tp.executors[0].notifyItems[0].interval=120
spring.dynamic.tp.executors[0].notifyItems[1].type=change
spring.dynamic.tp.executors[0].notifyItems[1].enabled=true
spring.dynamic.tp.executors[0].notifyItems[2].type=liveness
spring.dynamic.tp.executors[0].notifyItems[2].enabled=true
spring.dynamic.tp.executors[0].notifyItems[2].threshold=80
spring.dynamic.tp.executors[0].notifyItems[3].type=reject
spring.dynamic.tp.executors[0].notifyItems[3].enabled=true
spring.dynamic.tp.executors[0].notifyItems[3].threshold=1
spring.dynamic.tp.executors[0].notifyItems[4].type=run_timeout
spring.dynamic.tp.executors[0].notifyItems[4].enabled=true
spring.dynamic.tp.executors[0].notifyItems[4].threshold=1
spring.dynamic.tp.executors[0].notifyItems[5].type=queue_timeout
spring.dynamic.tp.executors[0].notifyItems[5].enabled=true
spring.dynamic.tp.executors[0].notifyItems[5].threshold=1

3.3 application.yml 配置

nacos:
  config:
    server-addr: localhost:8848
    type: yaml
    data-ids: user-center-dev.yml,user-center-dtp-dev.yml  # 逗号分隔,第一个是项目主配置,第二个是动态线程池配置
    auto-refresh: true
    group: DEFAULT_GROUP
    bootstrap:
      enable: true
      log-enable: true

四、demo实现

新创建一个spring boot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
        <relativePath/>
    </parent>
    <groupId>com.dzw</groupId>
    <artifactId>dynamic</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>dynamic</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.3.12.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.nacos</groupId>
                <artifactId>nacos-client</artifactId>
                <version>1.2.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-undertow</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>nacos-config-spring-boot-starter</artifactId>
            <version>0.2.7</version>
        </dependency>
        <dependency>
            <groupId>cn.dynamictp</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

4.2 启动类添加@EnableDynamicTp注解

@EnableDynamicTp
@SpringBootApplication
public class DynamicApplication 

	public static void main(String[] args) 
		SpringApplication.run(DynamicApplication.class, args);
	

4.3 添加一个线程池配置文件

package com.dzw.dynamic;

import com.dtp.common.em.QueueTypeEnum;
import com.dtp.common.em.RejectedTypeEnum;
import com.dtp.core.support.ThreadPoolBuilder;
import com.dtp.core.support.ThreadPoolCreator;
import com.dtp.core.thread.DtpExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description Description
 * @Author lyb
 * @Date Created in 2023/2/3
 */
@Configuration
public class ThreadPoolConfiguration 
	@Bean
	public DtpExecutor dtpExecutor() 
		return ThreadPoolCreator.createDynamicFast("dynamic-tp-test-1");
	

	@Bean
	public ThreadPoolExecutor dynamicExecutor() 
		return ThreadPoolBuilder.newBuilder()
				.threadPoolName("dynamicExecutor")
				.corePoolSize(2)
				.maximumPoolSize(2)
				.keepAliveTime(6000L)
				.timeUnit(TimeUnit.MILLISECONDS)
				.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE.getName(), 2, true)
				.rejectedExecutionHandler(RejectedTypeEnum.DISCARD_POLICY.getName())
				.buildDynamic();
	

	@Bean
	public ThreadPoolExecutor dynamicExecutor2() 
		return ThreadPoolBuilder.newBuilder()
				.threadPoolName("dynamicExecutor2")
				.corePoolSize(2)
				.maximumPoolSize(2)
				.keepAliveTime(6000L)
				.timeUnit(TimeUnit.MILLISECONDS)
				.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE.getName(), 2, true)
				.rejectedExecutionHandler(RejectedTypeEnum.DISCARD_POLICY.getName())
				.buildDynamic();
	

4.4 写一个测试接口

package com.dzw.dynamic;

import com.dtp.core.DtpRegistry;
import com.dtp.core.thread.DtpExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Description Description
 * @Author lyb
 * @Date Created in 2023/2/3
 */
@Slf4j
@RestController
public class TestController 
	@Autowired
	private ThreadPoolExecutor dynamicExecutor;

	/**
	 * @Description: 测试自定义线程池
	 * @Author: lyb
	 * @Date: 2023/2/6 11:18 上午
	 * @Version: 1.0
	 * @Return:
	 */
	@GetMapping("/thread/customize/test")
	public String customizeTest() 
		new Thread(() -> 
			try 
				task();
			 catch (InterruptedException e) 
				e.printStackTrace();
			
		).start();
		return "success";
	

	public void task() throws InterruptedException 
//		DtpExecutor threadPoolExecutor = DtpRegistry.getDtpExecutor("dynamicExecutor");
		for (int i = 0; i < 100; i++) 
			dynamicExecutor.execute(() -> 
				try 
					log.info("i am dynamic-tp-test-2 task");
					Thread.sleep(2000);
				 catch (InterruptedException e) 
					e.printStackTrace();
				
			);
		
	

	/**
	 * @Description: 测试undertow线程池
	 * @Author: lyb
	 * @Date: 2023/2/6 11:22 上午
	 * @Version: 1.0
	 * @Return:
	 */
	@GetMapping("/thread/undertow/test")
	public String undertowTest() 
		try 
			Thread.sleep(10000L);
		 catch (InterruptedException e) 
			e.printStackTrace();
		
		return "success";
	

demo链接地址:

GitHub - 754466488/dynamictp

以上是关于动态线程池Dinamic-Tp(接入篇)的主要内容,如果未能解决你的问题,请参考以下文章

线程池

线程池

线程池学习

线程池实现。

Java 自定义线程池

Java线程池参数