ThreadPoolExecutor线程池参数设置技巧
一、ThreadPoolExecutor的重要参数
corePoolSize:核心线程数
核心线程会一直存活,及时没有任务需要执行
当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
queueCapacity:任务队列容量(阻塞队列)
当核心线程数达到最大时,新任务会放在队列中排队等待执行
maxPoolSize:最大线程数
当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
keepAliveTime:线程空闲时间
当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
如果allowCoreThreadTimeout=true,则会直到线程数量=0
allowCoreThreadTimeout:允许核心线程超时
rejectedExecutionHandler:任务拒绝处理器
两种情况会拒绝处理任务:
当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
ThreadPoolExecutor类有几个内部实现类来处理这类情况:
AbortPolicy 丢弃任务,抛运行时异常
CallerRunsPolicy 执行任务
DiscardPolicy 忽视,什么都不会发生
DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
实现RejectedExecutionHandler接口,可自定义处理器
二、ThreadPoolExecutor执行顺序:
线程池按以下行为执行任务
当线程数小于核心线程数时,创建线程。
当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程
若线程数等于最大线程数,抛出异常,拒绝任务
三、如何设置参数
默认值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
如何来设置
需要根据几个值来决定
tasks :每秒的任务数,假设为500~1000
taskcost:每个任务花费时间,假设为0.1s
responsetime:系统允许容忍的最大响应时间,假设为1s
做几个计算
corePoolSize = 每秒需要多少个线程处理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
计算可得 maxPoolSize = (1000-80)/10 = 92
(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。
案例:
maven依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
1、定义线程池异步任务配置类:com.zhixi.config
application.properties
# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size=5
# 配置最大线程数
async.executor.thread.max_pool_size=5
# 配置队列大小
async.executor.thread.queue_capacity=99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix=async-service-
线程池配置类:ThreadPoolTaskConfig
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @ClassName ThreadPoolTaskConfig
* @Description 线程池配置类
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTaskConfig.class);
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// 拒绝策略:CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
2、定义业务接口:com.zhixi.service
发送短信:AsyncEmailService
/**
* @ClassName AsyncService
* @Description 发送短信业务
*/
public interface AsyncEmailService {
/**
* 发送短信
*/
void executeAsync();
}
发送快递:syncCommodityService
/**
* @ClassName AsyncCommodityService
* @Description 发送快递的任务
*/
public interface AsyncCommodityService {
/**
* 发送快递
*/
void expressDelivery();
}
3、业务接口实现类:com.zhixi.service.impl
短信接口实现类:AsyncEmailServiceImpl
import com.zhixi.service.AsyncEmailService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class AsyncEmailServiceImpl implements AsyncEmailService {
private static final Logger logger = LoggerFactory.getLogger(AsyncEmailServiceImpl.class);
@Override
@Async("taskExecutor")
public void executeAsync() {
logger.info("发送短信事件开始执行~");
logger.info("发送短信中……");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("发送短信事件执行完毕");
}
}
发送快递接口实现类:AsyncCommodityServiceImpl
import com.zhixi.service.AsyncCommodityService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class AsyncCommodityServiceImpl implements AsyncCommodityService {
private static final Logger logger = LoggerFactory.getLogger(AsyncCommodityServiceImpl.class);
@Async("taskExecutor")
@Override
public void expressDelivery() {
logger.info("发送快递事件开始执行~");
logger.info("发送快递中……");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("发送快递事件执行完毕");
}
}
4、视图访问层:com.zhixi.controller
AsyncController
import com.zhixi.service.AsyncCommodityService;
import com.zhixi.service.AsyncEmailService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncController {
@Autowired
private AsyncEmailService emailService;
@Autowired
private AsyncCommodityService commodityService;
@RequestMapping(value = "/async")
public void async() {
/*寄快递业务方法*/
commodityService.expressDelivery();
/*发送短信业务方法*/
emailService.executeAsync();
}
}