创建线程池的7种方法及自定义线程池

lz 1年前 ⋅ 799 阅读

1. 什么是线程池

线程池(ThreadPool)是一种基于池化思想管理和使用线程的机制。它是将多个线程预先存储在一个“池子”内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从“池子”内取出相应的线程执行对应的任务即可。

使用线程池主要有以下优点:

1.降低资源消耗
2.提高响应速度
3.提高线程的可管理性

同时,阿里巴巴在其《Java开发手册》中也强制规定:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

2. 使用线程池

线程池的创建方法总共有 7 种,但总体来说可分为 2 类:

  • 通过 ThreadPoolExecutor 创建的线程池
  • 通过 Executors 创建的线程池

线程池的创建方式总共包含以下 7 种(其中 6 种是通过 Executors 创建的,1 种是通过 ThreadPoolExecutor 创建的):

  1. Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;
  2. Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;
  3. Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序;
  4. Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池;
  5. Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池;
  6. Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)JDK 1.8 添加
  7. ThreadPoolExecutor:最原始的创建线程池的方式,它包含了 7 个参数可供设置,后面会详细讲

单线程池的意义: 虽然 newSingleThreadExecutor 和 newSingleThreadScheduledExecutor 是单线程池,但提供了工作队列,生命周期管理,工作线程维护等功能。

2.1 FixedThreadPool

FixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
  • keepAliveTime = 0 该参数默认对核心线程无效,而 FixedThreadPool 全部为核心线程;
  • workQueue 为 LinkedBlockingQueue(无界阻塞队列),队列最大值为 Integer.MAX_VALUE如果任务提交速度持续大于任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势
  • FixedThreadPool 的任务执行是无序的;

适用场景:可用于 Web 服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

public class NewFixedThreadPoolTest {

    public static void main(String[] args) {
        System.out.println("主线程启动");
        // 1.创建1个有2个线程的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
            }
        };
        // 2.线程池执行任务(添加4个任务,每次执行2个任务,得执行两次)
        threadPool.submit(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        System.out.println("主线程结束");
    }
}

上述代码:创建了一个有2个线程的线程池,但一次给它分配了4个任务,每次只能执行2个任务,所以,得执行两次。

该线程池重用固定数量的线程在共享的无界队列中运行。 在任何时候,最多 nThreads 线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。所以,它会一次执行 2 个任务(2 个活跃的线程),另外 2 个任务在工作队列中等待着。

submit() 方法和 execute() 方法都是执行任务的方法。它们的区别是:submit() 方法有返回值,而 execute() 方法没有返回值。

2.2 CachedThreadPool

CachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
  • keepAliveTime = 60s,线程空闲 60s 后自动结束
  • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为 CachedThreadPool 线程创建无限制,不会有队列等待,所以使用 SynchronousQueue

适用场景:快速处理大量耗时较短的任务,如 Netty 的 NIO 接受请求时,可使用 CachedThreadPool

public class NewCachedThreadPool {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            threadPool.execute(() -> {
                System.out.println("任务被执行,线程:" + Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

从最终结果来看,线程池创建了 10 个线程来执行相应的任务。

2.3 SingleThreadExecutor

SingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序。

public class SingleThreadExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            threadPool.execute(() -> {
                System.out.println(index + ":任务被执行");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

执行结果如下:

2.4 ScheduledThread

ScheduledThreadPool:创建一个可以执行延迟任务的线程池。

public class ScheduledThreadTest {

    public static void main(String[] args) {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
        System.out.println("添加任务,时间:" + new Date());
        threadPool.schedule(() -> {
            System.out.println("任务被执行,时间:" + new Date());
        }, 2, TimeUnit.SECONDS);
    }
}

执行结果如下:

从上述结果可以看出,任务在 2 秒之后被执行了,符合我们的预期。

2.5 SingleThreadScheduledExecutor

SingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池。

public class SingleThreadScheduledExecutorTest {

    public static void main(String[] args) {
        ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
        System.out.println("添加任务,时间:" + new Date());
        threadPool.schedule(() -> {
            System.out.println("任务被执行,时间:" + new Date());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        }, 2, TimeUnit.SECONDS);
    }
}

2.6 NewWorkStealingPool

NewWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用。

public class NewWorkStealingPoolTest {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newWorkStealingPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            threadPool.execute(() -> {
                System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
            });
        }
        // 确保任务执行完成
        while (!threadPool.isTerminated()) {
        }
    }
}

从上述代码来看,任务的执行顺序是不确定的,因为它是抢占式执行的。

2.7 ThreadPoolExecutor

ThreadPoolExecutor:最原始的创建线程池的方式

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
        // 执行任务
        for (int i = 0; i < 10; i++) {
            final int index = i;
            threadPool.execute(() -> {
                System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

ThreadPoolExecutor 最多可以设置 7 个参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
                          
}

7 个参数代表的含义如下:

  1. corePoolSize:核心线程数,线程池中始终存活的线程数。

  2. maximumPoolSize:最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数。

  3. keepAliveTime:最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程。

  4. unit:单位是和参数 3 存活时间配合使用的,合在一起用于设定线程的存活时间。参数 keepAliveTime 的时间单位有以下 7 种可选:
  • TimeUnit.DAYS:天
  • TimeUnit.HOURS:小时
  • TimeUnit.MINUTES:分
  • TimeUnit.SECONDS:秒
  • TimeUnit.MILLISECONDS:毫秒
  • TimeUnit.MICROSECONDS:微妙
  • TimeUnit.NANOSECONDS:纳秒

   5.workQueue:一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全。它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种,包含以下 7 种类型:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

较常用的是 LinkedBlockingQueue 和 SynchronousQueue,线程池的排队策略与 BlockingQueue 有关。       6.threadFactory:线程工厂,主要用来创建线程。

  7.handler:拒绝策略,拒绝处理任务时的策略,系统提供了 4 种可选:

  • AbortPolicy:拒绝并抛出异常。
  • CallerRunsPolicy:使用当前调用的线程来执行此任务。
  • DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
  • DiscardPolicy:忽略并抛弃当前任务。

默认策略为 AbortPolicy。

任务队列

任务队列:一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列。

1、直接提交队列:设置为 SynchronousQueue 队列,SynchronousQueue 是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之,每一个删除操作也都要等待对应的插入操作。

public class SynchronousQueueTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 100, TimeUnit.SECONDS, new SynchronousQueue<>());
        for (int i = 0; i < 3; i++) {
            threadPool.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
    }
}

可以看到,当任务队列为 SynchronousQueue,创建的线程数大于 maximumPoolSize 时,直接执行了拒绝策略抛出异常。

使用 SynchronousQueue 队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略。

2、有界的任务队列:有界的任务队列可以使用 ArrayBlockingQueue 实现。

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, 
new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用 ArrayBlockingQueue 有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到 corePoolSize 时,则会将新的任务加入到等待队列中。若等待队列已满,即超过 ArrayBlockingQueue 初始化的容量,则继续创建线程,直到线程数量达到 maximumPoolSize 设置的最大线程数量,若大于 maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在 corePoolSize 以下,反之当任务队列已满时,则会以 maximumPoolSize 为最大线程数上限。

3、无界的任务队列:无界任务队列可以使用 LinkedBlockingQueue 实现

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, 
new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你 corePoolSize 设置的数量,也就是说在这种情况下 maximumPoolSize 这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到 corePoolSize 后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

线程池的执行流程

ThreadPoolExecutor 关键节点的执行流程如下:

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务。

线程拒绝策略

我们来演示一下 ThreadPoolExecutor 的拒绝策略的触发,我们使用 DiscardPolicy 的拒绝策略,它会忽略并抛弃当前任务的策略,实现代码如下:

public class ThreadPoolStrategyTest {

    public static void main(String[] args) {
        // 线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy());
        // 任务
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("当前任务被执行,执行时间:" + new Date() +
                        " 执行线程:" + Thread.currentThread().getName());

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        // 开启4个任务
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
    }
}

我们创建了一个核心线程数和最大线程数都为 1 的线程池,并且给线程池的任务队列设置为 1,这样当我们有 2 个以上的任务时就会触发拒绝策略,执行的结果如下图所示:

从上述结果可以看出只有两个任务被正确执行了,其他多余的任务就被舍弃并忽略了。其他拒绝策略的使用类似,这里就不一一赘述了。

自定义拒绝策略

除了 Java 自身提供的 4 种拒绝策略之外,我们也可以自定义拒绝策略,示例代码如下:

public class MyThreadPoolStrategyTest {

    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("当前任务被执行,执行时间:" + new Date() +
                        " 执行线程:" + Thread.currentThread().getName());

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 100,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 执行自定义拒绝策略的相关操作
                System.out.println("我是自定义拒绝策略~");
            }
        });

        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
        threadPool.execute(runnable);
    }
}

程序的执行结果如下:

3. 自定义线程池

以下是自定义线程池,使用了有界队列,自定义 ThreadFactory 和拒绝策略的demo:

public class MyThreadPoolTest {

    public static void main(String[] args) throws Exception {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        NameThreadFactory threadFactory = new NameThreadFactory();
        RejectedExecutionHandler handler = new MyIgnorePolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.SECONDS,
                workQueue, threadFactory, handler);
        // 预启动所有核心线程
        executor.prestartAllCoreThreads();

        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(String.valueOf(i));
            executor.execute(task);
        }

        //阻塞主线程
        System.in.read();
    }

    static class NameThreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    static class MyIgnorePolicy implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            doLog(r, executor);
        }

        private void doLog(Runnable r, ThreadPoolExecutor e) {
            // 可做日志记录等
            System.err.println( r.toString() + " rejected");
            System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
        }
    }

    static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(this.toString() + " is running!");
                // 让任务执行慢点
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "MyTask [name=" + name + "]";
        }
    }
}

 

其中线程1-4先占满了核心线程和最大线程数量,然后4、5、6任务进入等待队列,7-10任务五被直接忽略拒绝执行,等1-4任务中有线程执行完后通知4、5、6任务继续执行。

4. 究竟选用哪种线程池?

经过以上的学习我们对整个线程池也有了一定的认识了,那究竟该如何选择线程池呢?

我们来看下阿里巴巴《Java开发手册》给我们的答案:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

 

说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2)CachedThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以,综上情况所述,我们推荐使用 ThreadPoolExecutor 的方式进行线程池的创建,因为这种创建方式更可控,并且更加明确了线程池的运行规则,可以规避一些未知的风险。

 

--END--

 

版权 本文为TIMO社区原创文章,转载无需和我联系,但请注明来自TIMO社区 http://timo.aikanmv.cn