1. 什么是线程池
线程池(ThreadPool)是一种基于池化思想管理和使用线程的机制。它是将多个线程预先存储在一个“池子”内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从“池子”内取出相应的线程执行对应的任务即可。
使用线程池主要有以下优点:
1.降低资源消耗
2.提高响应速度
3.提高线程的可管理性
同时,阿里巴巴在其《Java开发手册》中也强制规定:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
2. 使用线程池
线程池的创建方法总共有 7 种,但总体来说可分为 2 类:
- 通过
ThreadPoolExecutor
创建的线程池 - 通过
Executors
创建的线程池
线程池的创建方式总共包含以下 7 种(其中 6 种是通过 Executors
创建的,1 种是通过 ThreadPoolExecutor
创建的):
Executors.newFixedThreadPool
:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;Executors.newCachedThreadPool
:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;Executors.newSingleThreadExecutor
:创建单个线程数的线程池,它可以保证先进先出的执行顺序;Executors.newScheduledThreadPool
:创建一个可以执行延迟任务的线程池;Executors.newSingleThreadScheduledExecutor
:创建一个单线程的可以执行延迟任务的线程池;Executors.newWorkStealingPool
:创建一个抢占式执行的线程池(任务执行顺序不确定)JDK 1.8 添加ThreadPoolExecutor
:最原始的创建线程池的方式,它包含了 7 个参数可供设置,后面会详细讲
单线程池的意义: 虽然 newSingleThreadExecutor
和 newSingleThreadScheduledExecutor
是单线程池,但提供了工作队列,生命周期管理,工作线程维护等功能。
2.1 FixedThreadPool
FixedThreadPool
:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
- keepAliveTime = 0 该参数默认对核心线程无效,而
FixedThreadPool
全部为核心线程; - workQueue 为
LinkedBlockingQueue
(无界阻塞队列),队列最大值为Integer.MAX_VALUE
。如果任务提交速度持续大于任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势; FixedThreadPool
的任务执行是无序的;
适用场景:可用于 Web 服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
public class NewFixedThreadPoolTest {
public static void main(String[] args) {
System.out.println("主线程启动");
// 1.创建1个有2个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
}
};
// 2.线程池执行任务(添加4个任务,每次执行2个任务,得执行两次)
threadPool.submit(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
System.out.println("主线程结束");
}
}
上述代码:创建了一个有2个线程的线程池,但一次给它分配了4个任务,每次只能执行2个任务,所以,得执行两次。
该线程池重用固定数量的线程在共享的无界队列中运行。 在任何时候,最多 nThreads 线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。所以,它会一次执行 2 个任务(2 个活跃的线程),另外 2 个任务在工作队列中等待着。
submit()
方法和 execute()
方法都是执行任务的方法。它们的区别是:submit()
方法有返回值,而 execute()
方法没有返回值。
2.2 CachedThreadPool
CachedThreadPool
:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
- keepAliveTime = 60s,线程空闲 60s 后自动结束
- workQueue 为
SynchronousQueue
同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool
线程创建无限制,不会有队列等待,所以使用SynchronousQueue
适用场景:快速处理大量耗时较短的任务,如 Netty
的 NIO 接受请求时,可使用 CachedThreadPool
。
public class NewCachedThreadPool {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
从最终结果来看,线程池创建了 10 个线程来执行相应的任务。
2.3 SingleThreadExecutor
SingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序。
public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + ":任务被执行");
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
执行结果如下:
2.4 ScheduledThread
ScheduledThreadPool:创建一个可以执行延迟任务的线程池。
public class ScheduledThreadTest {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
System.out.println("添加任务,时间:" + new Date());
threadPool.schedule(() -> {
System.out.println("任务被执行,时间:" + new Date());
}, 2, TimeUnit.SECONDS);
}
}
执行结果如下:
从上述结果可以看出,任务在 2 秒之后被执行了,符合我们的预期。
2.5 SingleThreadScheduledExecutor
SingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池。
public class SingleThreadScheduledExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
System.out.println("添加任务,时间:" + new Date());
threadPool.schedule(() -> {
System.out.println("任务被执行,时间:" + new Date());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}, 2, TimeUnit.SECONDS);
}
}
2.6 NewWorkStealingPool
NewWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。
public class NewWorkStealingPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newWorkStealingPool();
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
});
}
// 确保任务执行完成
while (!threadPool.isTerminated()) {
}
}
}
从上述代码来看,任务的执行顺序是不确定的,因为它是抢占式执行的。
2.7 ThreadPoolExecutor
ThreadPoolExecutor:最原始的创建线程池的方式
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
// 执行任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
ThreadPoolExecutor 最多可以设置 7 个参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
7 个参数代表的含义如下:
-
corePoolSize:核心线程数,线程池中始终存活的线程数。
-
maximumPoolSize:最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数。
-
keepAliveTime:最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程。
- unit:单位是和参数 3 存活时间配合使用的,合在一起用于设定线程的存活时间。参数 keepAliveTime 的时间单位有以下 7 种可选:
- TimeUnit.DAYS:天
- TimeUnit.HOURS:小时
- TimeUnit.MINUTES:分
- TimeUnit.SECONDS:秒
- TimeUnit.MILLISECONDS:毫秒
- TimeUnit.MICROSECONDS:微妙
- TimeUnit.NANOSECONDS:纳秒
5.workQueue:一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全。它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种,包含以下 7 种类型:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
较常用的是 LinkedBlockingQueue 和 SynchronousQueue,线程池的排队策略与 BlockingQueue 有关。 6.threadFactory:线程工厂,主要用来创建线程。
7.handler:拒绝策略,拒绝处理任务时的策略,系统提供了 4 种可选:
- AbortPolicy:拒绝并抛出异常。
- CallerRunsPolicy:使用当前调用的线程来执行此任务。
- DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
- DiscardPolicy:忽略并抛弃当前任务。
默认策略为 AbortPolicy。
任务队列
任务队列:一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列。
1、直接提交队列:设置为 SynchronousQueue
队列,SynchronousQueue 是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之,每一个删除操作也都要等待对应的插入操作。
public class SynchronousQueueTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 100, TimeUnit.SECONDS, new SynchronousQueue<>());
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> System.out.println(Thread.currentThread().getName()));
}
}
}
可以看到,当任务队列为 SynchronousQueue
,创建的线程数大于 maximumPoolSize 时,直接执行了拒绝策略抛出异常。
使用 SynchronousQueue
队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略。
2、有界的任务队列:有界的任务队列可以使用 ArrayBlockingQueue
实现。
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用 ArrayBlockingQueue
有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到 corePoolSize 时,则会将新的任务加入到等待队列中。若等待队列已满,即超过 ArrayBlockingQueue 初始化的容量,则继续创建线程,直到线程数量达到 maximumPoolSize 设置的最大线程数量,若大于 maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在 corePoolSize 以下,反之当任务队列已满时,则会以 maximumPoolSize 为最大线程数上限。
3、无界的任务队列:无界任务队列可以使用 LinkedBlockingQueue
实现
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你 corePoolSize 设置的数量,也就是说在这种情况下 maximumPoolSize 这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到 corePoolSize 后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
线程池的执行流程
ThreadPoolExecutor 关键节点的执行流程如下:
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务。
线程拒绝策略
我们来演示一下 ThreadPoolExecutor
的拒绝策略的触发,我们使用 DiscardPolicy
的拒绝策略,它会忽略并抛弃当前任务的策略,实现代码如下:
public class ThreadPoolStrategyTest {
public static void main(String[] args) {
// 线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
// 任务
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("当前任务被执行,执行时间:" + new Date() +
" 执行线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
};
// 开启4个任务
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
}
}
我们创建了一个核心线程数和最大线程数都为 1 的线程池,并且给线程池的任务队列设置为 1,这样当我们有 2 个以上的任务时就会触发拒绝策略,执行的结果如下图所示:
从上述结果可以看出只有两个任务被正确执行了,其他多余的任务就被舍弃并忽略了。其他拒绝策略的使用类似,这里就不一一赘述了。
自定义拒绝策略
除了 Java 自身提供的 4 种拒绝策略之外,我们也可以自定义拒绝策略,示例代码如下:
public class MyThreadPoolStrategyTest {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("当前任务被执行,执行时间:" + new Date() +
" 执行线程:" + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 执行自定义拒绝策略的相关操作
System.out.println("我是自定义拒绝策略~");
}
});
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
threadPool.execute(runnable);
}
}
程序的执行结果如下:
3. 自定义线程池
以下是自定义线程池,使用了有界队列,自定义 ThreadFactory
和拒绝策略的demo:
public class MyThreadPoolTest {
public static void main(String[] args) throws Exception {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
NameThreadFactory threadFactory = new NameThreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
workQueue, threadFactory, handler);
// 预启动所有核心线程
executor.prestartAllCoreThreads();
for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
//阻塞主线程
System.in.read();
}
static class NameThreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
doLog(r, executor);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
// 让任务执行慢点
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
其中线程1-4先占满了核心线程和最大线程数量,然后4、5、6任务进入等待队列,7-10任务五被直接忽略拒绝执行,等1-4任务中有线程执行完后通知4、5、6任务继续执行。
4. 究竟选用哪种线程池?
经过以上的学习我们对整个线程池也有了一定的认识了,那究竟该如何选择线程池呢?
我们来看下阿里巴巴《Java开发手册》给我们的答案:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。2)CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
所以,综上情况所述,我们推荐使用 ThreadPoolExecutor
的方式进行线程池的创建,因为这种创建方式更可控,并且更加明确了线程池的运行规则,可以规避一些未知的风险。
--END--