博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java中 ThreadPoolExecutor 类学习笔记
阅读量:4605 次
发布时间:2019-06-09

本文共 12226 字,大约阅读时间需要 40 分钟。

 

Java中 ThreadPoolExecutor 类学习笔记

1、定义

Java中的线程池。ThreadPoolExecutor类是接口Executor的实现类。如下图所示: 

2、作用

线程池解决两个不同的问题:由于每个任务的调用开销减少,它们通常在执行大量异步任务时提供改进的性能,并且它们提供了一种限制和管理资源(包括执行一个任务。 每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

3、创建类解释

ThreadPoolExecutor extends AbstractExecutorService。

构造方法:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

其中:

  1.   corePoolSize : 线程池的核心线程数,即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut.
  2.   maximumPoolSize : 池中允许的最大线程数
  3.        keepAliveTime :  线程的存活时间。当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则线程退出。
  4.        unit  : 指定keepAliveTime的单位
  5.        workQueue :  一个阻塞队列,提交的任务将会被放到这个队列里
  6.     threadFactory :  执行程序创建新线程时使用的工厂
  7.        handler : 执行被阻止时使用的处理程序,因为达到线程限制和队列容量 

4.线程池执行流程

任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,如果小于则创建线程来执行提交的任务,否则将任务放入workQueue队列,如果workQueue满了,则判断当前线程数量是否小于maximumPoolSize,如果小于则创建线程执行任务,否则就会调用handler,以表示线程池拒绝接收任务。

 5.线程池的几个主要方法分析

5.1 主方法:ThreadPoolExector的execute

 
public void execute(Runnable command) {
if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */             //1 当前运行的线程数量小于核心线程数量,直接将任务加入worker启动运行。        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }  

  /**

  2 运行线程数量大于核心线程数量时,上面的if分支针对大于corePoolSize,并且缓存队列加入任务操作成功的情况。
  运行中并且将任务加入缓冲队列成功,正常来说这样已经完成了处理逻辑。
  但是为了保险起见,增加了状态出现异常的确认判断,如果状态出现异常会继续remove操作,如果执行true,则按照拒绝处理策略驳回任务;
  */

        if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }

  /**

  3 这里针对运行线程数量超过了corePoolSize,并且缓存队列也已经放满的情况。
  注意第二个参数是false,可以在下面addWorker方法看到,就是针对线程池最大线程数量maximumPoolSize的判断。
  */

  else if (!addWorker(command, false))            reject(command);  } }

 5.2 关键方法:ThreadPoolExector的addWorker(增加工作线程)

private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {                int wc = workerCountOf(c);                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize)) //创建非核心线程时,即core等于false。判断当前线程数是否大于等于maximumPoolSize,如果大于等于则返回false                    return false;                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            w = new Worker(firstTask);            final Thread t = w.thread;//创建Worker对象,同时也会实例化一个Thread,并启动。            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }

5.3 Worker中的runWorker方法,也是worker中的run方法主体。

/** Delegates main run loop to outer runWorker  */ public void run() {      runWorker(this); }
/**     * Main worker run loop.  Repeatedly gets tasks from queue and     * executes them, while coping with a number of issues:     *     * 1. We may start out with an initial task, in which case we     * don't need to get the first one. Otherwise, as long as pool is     * running, we get tasks from getTask. If it returns null then the     * worker exits due to changed pool state or configuration     * parameters.  Other exits result from exception throws in     * external code, in which case completedAbruptly holds, which     * usually leads processWorkerExit to replace this thread.     *     * 2. Before running any task, the lock is acquired to prevent     * other pool interrupts while the task is executing, and then we     * ensure that unless pool is stopping, this thread does not have     * its interrupt set.     *     * 3. Each task run is preceded by a call to beforeExecute, which     * might throw an exception, in which case we cause thread to die     * (breaking loop with completedAbruptly true) without processing     * the task.     *     * 4. Assuming beforeExecute completes normally, we run the task,     * gathering any of its thrown exceptions to send to afterExecute.     * We separately handle RuntimeException, Error (both of which the     * specs guarantee that we trap) and arbitrary Throwables.     * Because we cannot rethrow Throwables within Runnable.run, we     * wrap them within Errors on the way out (to the thread's     * UncaughtExceptionHandler).  Any thrown exception also     * conservatively causes thread to die.     *     * 5. After task.run completes, we call afterExecute, which may     * also throw an exception, which will also cause thread to     * die. According to JLS Sec 14.20, this exception is the one that     * will be in effect even if task.run throws.     *     * The net effect of the exception mechanics is that afterExecute     * and the thread's UncaughtExceptionHandler have as accurate     * information as we can provide about any problems encountered by     * user code.     *     * @param w the worker     */    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {
//线程调用runWoker,会while循环调用getTask方法从workerQueue里读取任务,然后执行任务 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//只要getTask方法不返回null,此线程就不会退出。 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

5.4 getTask方法实现

* Performs blocking or timed wait for a task, depending on     * current configuration settings, or returns null if this worker     * must exit because of any of:     * 1. There are more than maximumPoolSize workers (due to     *    a call to setMaximumPoolSize).     * 2. The pool is stopped.     * 3. The pool is shutdown and the queue is empty.     * 4. This worker timed out waiting for a task, and timed-out     *    workers are subject to termination (that is,     *    {
@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;               //如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。        //如果有设置允许线程超时或者线程数量超过了核心线程数量,并且线程在规定时间内均未poll到任务且队列为空则递减worker数量 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

 

转载于:https://www.cnblogs.com/Edward-Wang/p/11098579.html

你可能感兴趣的文章
(转载) Android Studio你不知道的调试技巧
查看>>
POJ2231 Moo Volume 递推 C语言
查看>>
struts2类型转换的具体流程
查看>>
Hdu 1203 I NEED A OFFER!
查看>>
php文件上传类
查看>>
CF219D Choosing Capital for Treeland
查看>>
luogu P3809 【模板】后缀排序
查看>>
JVM 调优工具
查看>>
SCTF 2014 pwn题目分析
查看>>
集合以及特殊集合
查看>>
USACO 2.2 Runaround Numbers
查看>>
Matlab画图-非常具体,非常全面
查看>>
365. Water and Jug Problem
查看>>
SQL数据库数据检索top和distinct
查看>>
平衡搜索树--红黑树 RBTree
查看>>
sqlite驱动下载
查看>>
让IE6/IE7/IE8浏览器支持CSS3属性
查看>>
队列实现霍夫曼树
查看>>
【Java】图片高质量缩放类
查看>>
详解定位与定位应用
查看>>