Java多线程系列-线程池


概要


前面分别介绍了"Java多线程基础"、"JUC原子类"和"JUC锁"。本章介绍JUC的最后一部分的内容——线程池。内容包括:
线程池架构图
线程池示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509903.html

线程池架构图


线程池的架构图如下:

1. Executor

它是"执行者"接口,它是来执行任务的。准确的说,Executor提供了execute()接口来执行已提交的 Runnable 任务的对象。Executor存在的目的是提供一种将"任务提交"与"任务如何运行"分离开来的机制。
它只包含一个函数接口:

void execute(Runnable command)


2. ExecutorService

ExecutorService继承于Executor。它是"执行者服务"接口,它是为"执行者接口Executor"服务而存在的;准确的话,ExecutorService提供了"将任务提交给执行者的接口(submit方法)","让执行者执行任务(invokeAll, invokeAny方法)"的接口等等。

ExecutorService的函数列表


 1 // 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
 2 boolean awaitTermination(long timeout, TimeUnit unit) 3 // 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
 4 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 5 // 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
 6 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 7 // 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
 8 <T> T invokeAny(Collection<? extends Callable<T>> tasks) 9 // 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
10 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)11 // 如果此执行程序已关闭,则返回 true。
12 boolean isShutdown()13 // 如果关闭后所有任务都已完成,则返回 true。
14 boolean isTerminated()15 // 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
16 void shutdown()17 // 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
18 List<Runnable> shutdownNow()19 // 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
20 <T> Future<T> submit(Callable<T> task)21 // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
22 Future<?> submit(Runnable task)23 // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
24 <T> Future<T> submit(Runnable task, T result)



3. AbstractExecutorService

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
AbstractExecutorService存在的目的是为ExecutorService中的函数接口提供了默认实现。

AbstractExecutorService函数列表
由于它的函数列表和ExecutorService一样,这里就不再重复列举了。

4. ThreadPoolExecutor

ThreadPoolExecutor就是大名鼎鼎的"线程池"。它继承于AbstractExecutorService抽象类。

ThreadPoolExecutor函数列表


 1 // 用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
 2 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 3 // 用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
 4 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 5 // 用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
 6 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 7 // 用给定的初始参数创建新的 ThreadPoolExecutor。
 8 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 9 
10 // 基于完成执行给定 Runnable 所调用的方法。
11 protected void afterExecute(Runnable r, Throwable t)12 // 如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。
13 void allowCoreThreadTimeOut(boolean value)14 // 如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。
15 boolean allowsCoreThreadTimeOut()16 // 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
17 boolean awaitTermination(long timeout, TimeUnit unit)18 // 在执行给定线程中的给定 Runnable 之前调用的方法。
19 protected void beforeExecute(Thread t, Runnable r)20 // 在将来某个时间执行给定任务。
21 void execute(Runnable command)22 // 当不再引用此执行程序时,调用 shutdown。
23 protected void finalize()24 // 返回主动执行任务的近似线程数。
25 int getActiveCount()26 // 返回已完成执行的近似任务总数。
27 long getCompletedTaskCount()28 // 返回核心线程数。
29 int getCorePoolSize()30 // 返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。
31 long getKeepAliveTime(TimeUnit unit)32 // 返回曾经同时位于池中的最大线程数。
33 int getLargestPoolSize()34 // 返回允许的最大线程数。
35 int getMaximumPoolSize()36 // 返回池中的当前线程数。
37 int getPoolSize()38 // 返回此执行程序使用的任务队列。
39 BlockingQueue<Runnable> getQueue()40 // 返回用于未执行任务的当前处理程序。
41 RejectedExecutionHandler getRejectedExecutionHandler()42 // 返回曾计划执行的近似任务总数。
43 long getTaskCount()44 // 返回用于创建新线程的线程工厂。
45 ThreadFactory getThreadFactory()46 // 如果此执行程序已关闭,则返回 true。
47 boolean isShutdown()48 // 如果关闭后所有任务都已完成,则返回 true。
49 boolean isTerminated()50 // 如果此执行程序处于在 shutdown 或 shutdownNow 之后正在终止但尚未完全终止的过程中,则返回 true。
51 boolean isTerminating()52 // 启动所有核心线程,使其处于等待工作的空闲状态。
53 int prestartAllCoreThreads()54 // 启动核心线程,使其处于等待工作的空闲状态。
55 boolean prestartCoreThread()56 // 尝试从工作队列移除所有已取消的 Future 任务。
57 void purge()58 // 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
59 boolean remove(Runnable task)60 // 设置核心线程数。
61 void setCorePoolSize(int corePoolSize)62 // 设置线程在终止前可以保持空闲的时间限制。
63 void setKeepAliveTime(long time, TimeUnit unit)64 // 设置允许的最大线程数。
65 void setMaximumPoolSize(int maximumPoolSize)66 // 设置用于未执行任务的新处理程序。
67 void setRejectedExecutionHandler(RejectedExecutionHandler handler)68 // 设置用于创建新线程的线程工厂。
69 void setThreadFactory(ThreadFactory threadFactory)70 // 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。
71 void shutdown()72 // 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
73 List<Runnable> shutdownNow()74 // 当 Executor 已经终止时调用的方法。
75 protected void terminated()



5. ScheduledExecutorService

ScheduledExecutorService是一个接口,它继承于于ExecutorService。它相当于提供了"延时"和"周期执行"功能的ExecutorService。
ScheduledExecutorService提供了相应的函数接口,可以安排任务在给定的延迟后执行,也可以让任务周期的执行。

ScheduledExecutorService函数列表


1 // 创建并执行在给定延迟后启用的 ScheduledFuture。
2 <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)3 // 创建并执行在给定延迟后启用的一次性操作。
4 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)5 // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
6 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)7 // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
8 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)



6. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。它相当于提供了"延时"和"周期执行"功能的ScheduledExecutorService。
ScheduledThreadPoolExecutor类似于Timer,但是在高并发程序中,ScheduledThreadPoolExecutor的性能要优于Timer。

ScheduledThreadPoolExecutor函数列表


 1 // 使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor。
 2 ScheduledThreadPoolExecutor(int corePoolSize) 3 // 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
 4 ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) 5 // 使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor。
 6 ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) 7 // 使用给定初始参数创建一个新 ScheduledThreadPoolExecutor。
 8 ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) 9 
10 // 修改或替换用于执行 callable 的任务。
11 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)12 // 修改或替换用于执行 runnable 的任务。
13 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)14 // 使用所要求的零延迟执行命令。
15 void execute(Runnable command)16 // 获取有关在此执行程序已 shutdown 的情况下、是否继续执行现有定期任务的策略。
17 boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()18 // 获取有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
19 boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()20 // 返回此执行程序使用的任务队列。
21 BlockingQueue<Runnable> getQueue()22 // 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
23 boolean remove(Runnable task)24 // 创建并执行在给定延迟后启用的 ScheduledFuture。
25 <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)26 // 创建并执行在给定延迟后启用的一次性操作。
27 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)28 // 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
29 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)30 // 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
31 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)32 // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有定期任务的策略。
33 void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)34 // 设置有关在此执行程序已 shutdown 的情况下是否继续执行现有延迟任务的策略。
35 void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)36 // 在以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。
37 void shutdown()38 // 尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。
39 List<Runnable> shutdownNow()40 // 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
41 <T> Future<T> submit(Callable<T> task)42 // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
43 Future<?> submit(Runnable task)44 // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
45 <T> Future<T> submit(Runnable task, T result)



7. Executors

Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

Executors函数列表


 1 // 返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。
 2 static Callable<Object> callable(PrivilegedAction<?> action) 3 // 返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果。
 4 static Callable<Object> callable(PrivilegedExceptionAction<?> action) 5 // 返回 Callable 对象,调用它时可运行给定的任务并返回 null。
 6 static Callable<Object> callable(Runnable task) 7 // 返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。
 8 static <T> Callable<T> callable(Runnable task, T result) 9 // 返回用于创建新线程的默认线程工厂。
10 static ThreadFactory defaultThreadFactory()11 // 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
12 static ExecutorService newCachedThreadPool()13 // 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。
14 static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)15 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
16 static ExecutorService newFixedThreadPool(int nThreads)17 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。
18 static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)19 // 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
20 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)21 // 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
22 static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)23 // 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
24 static ExecutorService newSingleThreadExecutor()25 // 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。
26 static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)27 // 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
28 static ScheduledExecutorService newSingleThreadScheduledExecutor()29 // 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
30 static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)31 // 返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象。
32 static <T> Callable<T> privilegedCallable(Callable<T> callable)33 // 返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象。
34 static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable)35 // 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。
36 static ThreadFactory privilegedThreadFactory()37 // 返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。
38 static ExecutorService unconfigurableExecutorService(ExecutorService executor)39 // 返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。
40 static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)



线程池示例


下面通过示例来对线程池的使用做简单演示。

 1 import java.util.concurrent.Executors; 2 import java.util.concurrent.ExecutorService; 3 
 4 public class ThreadPoolDemo1 { 5 
 6     public static void main(String[] args) { 7         // 创建一个可重用固定线程数的线程池
 8         ExecutorService pool = Executors.newFixedThreadPool(2); 9         // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
10         Thread ta = new MyThread();11         Thread tb = new MyThread();12         Thread tc = new MyThread();13         Thread td = new MyThread();14         Thread te = new MyThread();15         // 将线程放入池中进行执行
16         pool.execute(ta);17         pool.execute(tb);18         pool.execute(tc);19         pool.execute(td);20         pool.execute(te);21         // 关闭线程池
22         pool.shutdown();23     }24 }25 
26 class MyThread extends Thread {27 
28     @Override29     public void run() {30         System.out.println(Thread.currentThread().getName()+ " is running.");31     }32 }


运行结果

pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.


结果说明
主线程中创建了线程池pool,线程池的容量是2。即,线程池中最多能同时运行2个线程。
紧接着,将ta,tb,tc,td,te这3个线程添加到线程池中运行。
最后,通过shutdown()关闭线程池。





概要


在上一章"Java多线程系列--“JUC线程池”01之 线程池架构"中,我们了解了线程池的架构。线程池的实现类是ThreadPoolExecutor类。本章,我们通过分析ThreadPoolExecutor类,来了解线程池的原理。内容包括:
ThreadPoolExecutor简介
ThreadPoolExecutor数据结构
线程池调度

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509941.html

ThreadPoolExecutor简介


ThreadPoolExecutor是线程池类。对于线程池,可以通俗的将它理解为"存放一定数量线程的一个线程集合。线程池允许若个线程同时允许,允许同时运行的线程数量就是线程池的容量;当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。"

ThreadPoolExecutor数据结构


ThreadPoolExecutor的数据结构如下图所示:


各个数据在ThreadPoolExecutor.java中的定义如下:

// 阻塞队列。
private final BlockingQueue<Runnable> workQueue;// 互斥锁
private final ReentrantLock mainLock = new ReentrantLock();// 线程集合。一个Worker对应一个线程。
private final HashSet<Worker> workers = new HashSet<Worker>();// “终止条件”,与“mainLock”绑定。
private final Condition termination = mainLock.newCondition();// 线程池中线程数量曾经达到过的最大值。
private int largestPoolSize;// 已完成任务数量
private long completedTaskCount;// ThreadFactory对象,用于创建线程。
private volatile ThreadFactory threadFactory;// 拒绝策略的处理句柄。
private volatile RejectedExecutionHandler handler;// 保持线程存活时间。
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;// 核心池大小
private volatile int corePoolSize;// 最大池大小
private volatile int maximumPoolSize;


1. workers
    workers是HashSet<Work>类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了"一个线程集合"。当Worker对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。
    wokers的作用是,线程池通过它实现了"允许多个线程同时运行"。

2. workQueue
    workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。
    通过workQueue,线程池实现了阻塞功能。

3. mainLock
    mainLock是互斥锁,通过mainLock实现了对线程池的互斥访问。

4. corePoolSize和maximumPoolSize
    corePoolSize是"核心池大小",maximumPoolSize是"最大池大小"。它们的作用是调整"线程池中实际运行的线程的数量"。
    例如,当新任务提交给线程池时(通过execute方法)。
          -- 如果此时,线程池中运行的线程数量< corePoolSize,则创建新线程来处理请求。
          -- 如果此时,线程池中运行的线程数量> corePoolSize,但是却< maximumPoolSize;则仅当阻塞队列满时才创建新线程。
          如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

5. poolSize
    poolSize是当前线程池的实际大小,即线程池中任务的数量。

6. allowCoreThreadTimeOut和keepAliveTime
    allowCoreThreadTimeOut表示是否允许"线程在空闲状态时,仍然能够存活";而keepAliveTime是当线程池处于空闲状态的时候,超过keepAliveTime时间之后,空闲的线程会被终止。

7. threadFactory
    threadFactory是ThreadFactory对象。它是一个线程工厂类,"线程池通过ThreadFactory创建线程"。

8. handler
    handler是RejectedExecutionHandler类型。它是"线程池拒绝策略"的句柄,也就是说"当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler进行相应的处理"。



综上所说,线程池通过workers来管理"线程集合",每个线程在启动后,会执行线程池中的任务;当一个任务执行完后,它会从线程池的阻塞队列中取出任务来继续运行。阻塞队列是管理线程池任务的队列,当添加到线程池中的任务超过线程池的容量时,该任务就会进入阻塞队列进行等待。


线程池调度


我们通过下面的图看看下面线程池中任务的调度策略,加深对线程池的理解。

图-01:





图-02:





说明
    在"图-01"中,线程池中有N个任务。"任务1", "任务2", "任务3"这3个任务在执行,而"任务3"到"任务N"在阻塞队列中等待。正在执行的任务,在workers集合中,workers集合包含3个Worker,每一个Worker对应一个Thread线程,Thread线程每次处理一个任务。
    当workers集合中处理完某一个任务之后,会从阻塞队列中取出一个任务来继续执行,如图-02所示。图-02表示"任务1"处理完毕之后,线程池将"任务4"从阻塞队列中取出,放到workers中进行处理。









概要


在前面一章"Java多线程系列--“JUC线程池”02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明。内容包括:
线程池示例
参考代码(基于JDK1.7.0_40)
线程池源码分析
    (一) 创建“线程池”
    (二) 添加任务到“线程池”
    (三) 关闭“线程池”

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.html


线程池示例


在分析线程池之前,先看一个简单的线程池示例。

 1 import java.util.concurrent.Executors; 2 import java.util.concurrent.ExecutorService; 3 
 4 public class ThreadPoolDemo1 { 5 
 6     public static void main(String[] args) { 7         // 创建一个可重用固定线程数的线程池
 8         ExecutorService pool = Executors.newFixedThreadPool(2); 9         // 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
10         Thread ta = new MyThread();11         Thread tb = new MyThread();12         Thread tc = new MyThread();13         Thread td = new MyThread();14         Thread te = new MyThread();15         // 将线程放入池中进行执行
16         pool.execute(ta);17         pool.execute(tb);18         pool.execute(tc);19         pool.execute(td);20         pool.execute(te);21         // 关闭线程池
22         pool.shutdown();23     }24 }25 
26 class MyThread extends Thread {27 
28     @Override29     public void run() {30         System.out.println(Thread.currentThread().getName()+ " is running.");31     }32 }


运行结果

pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.


示例中,包括了线程池的创建,将任务添加到线程池中,关闭线程池这3个主要的步骤。稍后,我们会从这3个方面来分析ThreadPoolExecutor。


参考代码(基于JDK1.7.0_40)


Executors完整源码

src

ThreadPoolExecutor完整源码

src

线程池源码分析


(一) 创建“线程池”

下面以newFixedThreadPool()介绍线程池的创建过程。

1. newFixedThreadPool()

newFixedThreadPool()在Executors.java中定义,源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}


说明:newFixedThreadPool(int nThreads)的作用是创建一个线程池,线程池的容量是nThreads。
         newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。
关于LinkedBlockingQueue的实现细节,读者可以参考"Java多线程系列--“JUC集合”08之 LinkedBlockingQueue"。



2. ThreadPoolExecutor()

ThreadPoolExecutor()在ThreadPoolExecutor.java中定义,源码如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}


说明:该函数实际上是调用ThreadPoolExecutor的另外一个构造函数。该函数的源码如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 核心池大小
    this.corePoolSize = corePoolSize;
    // 最大池大小
    this.maximumPoolSize = maximumPoolSize;
    // 线程池的等待队列
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    // 线程工厂对象
    this.threadFactory = threadFactory;
    // 拒绝策略的句柄
    this.handler = handler;
}


说明:在ThreadPoolExecutor()的构造函数中,进行的是初始化工作。
corePoolSize, maximumPoolSize, unit, keepAliveTime和workQueue这些变量的值是已知的,它们都是通过newFixedThreadPool()传递而来。下面看看threadFactory和handler对象。

2.1 ThreadFactory

线程池中的ThreadFactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。
上面所说的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源码如下:

public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}


defaultThreadFactory()返回DefaultThreadFactory对象。Executors.java中的DefaultThreadFactory()源码如下:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    // 提供创建线程的API。
    public Thread newThread(Runnable r) {
        // 线程对应的任务是Runnable对象r
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        // 设为“非守护线程”
        if (t.isDaemon())
            t.setDaemon(false);
        // 将优先级设为“Thread.NORM_PRIORITY”
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}


说明:ThreadFactory的作用就是提供创建线程的功能的线程工厂。
         它是通过newThread()提供创建线程功能的,下面简单说说newThread()。newThread()创建的线程对应的任务是Runnable对象,它创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

2.2 RejectedExecutionHandler

handler是ThreadPoolExecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
线程池默认会采用的是defaultHandler策略,即AbortPolicy策略。在AbortPolicy策略中,线程池拒绝任务时会抛出异常!
defaultHandler的定义如下:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();


AbortPolicy的源码如下:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    // 抛出异常
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}


(二) 添加任务到“线程池”

1. execute()

execute()定义在ThreadPoolExecutor.java中,源码如下:

public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
    int c = ctl.get();
    // 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。
    // 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 当线程池中的任务数量 >= "核心池大小"时,
    // 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}


说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
        情况1 -- 如果"线程池中任务数量" < "核心池大小"时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
        情况2 -- 如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
        情况3 -- 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。

2. addWorker()

addWorker()的源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 更新"线程池状态和计数"标记,即更新ctl。
    for (;;) {
        // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
        int c = ctl.get();
        // 获取线程池状态。
        int rs = runStateOf(c);

        // 有效性检查
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程池中任务的数量。
            int wc = workerCountOf(c);
            // 如果"线程池中任务的数量"超过限制,则返回false。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 通过CAS函数将c的值+1。操作失败的话,则退出循环。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    // 添加任务到线程池,并启动任务所在的线程。
    try {
        final ReentrantLock mainLock = this.mainLock;
        // 新建Worker,并且指定firstTask为Worker的第一个任务。
        w = new Worker(firstTask);
        // 获取Worker对应的线程。
        final Thread t = w.thread;
        if (t != null) {
            // 获取锁
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);

                // 再次确认"线程池状态"
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将Worker对象(w)添加到"线程池的Worker集合(workers)"中
                    workers.add(w);
                    // 更新largestPoolSize
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 如果"成功将任务添加到线程池"中,则启动任务所在的线程。 
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回任务是否启动。
    return workerStarted;
}


说明
    addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
    core为true的话,则以corePoolSize为界限,若"线程池中已有任务数量>=corePoolSize",则返回false;core为false的话,则以maximumPoolSize为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回false。
    addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
    更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。

    从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。(01) 通过将Worker对象添加到"线程的workers集合"中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。



3. submit()

补充说明一点,submit()实际上也是通过调用execute()实现的,源码如下:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}


(三) 关闭“线程池”

shutdown()的源码如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // 获取锁
    mainLock.lock();
    try {
        // 检查终止线程池的“线程”是否有权限。
        checkShutdownAccess();
        // 设置线程池的状态为关闭状态。
        advanceRunState(SHUTDOWN);
        // 中断线程池中空闲的线程。
        interruptIdleWorkers();
        // 钩子函数,在ThreadPoolExecutor中没有任何动作。
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        // 释放锁
        mainLock.unlock();
    }
    // 尝试终止线程池
    tryTerminate();
}


说明:shutdown()的作用是关闭线程池。








转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509960.html



本章介绍线程池的生命周期。在"Java多线程系列--“基础篇”01之 基本概念"中,我们介绍过,线程有5种状态:新建状态就绪状态运行状态阻塞状态死亡状态。线程池也有5种状态;然而,线程池不同于线程,线程池的5种状态是:Running, SHUTDOWN, STOP, TIDYING, TERMINATED

线程池状态定义代码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;private static int ctlOf(int rs, int wc) { return rs | wc; }


说明
ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。
ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

RUNNING    -- 对应的高3位值是111。
SHUTDOWN   -- 对应的高3位值是000。
STOP       -- 对应的高3位值是001。
TIDYING    -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。


线程池各个状态之间的切换如下图所示:



1. RUNNING

(01) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态!
道理很简单,在ctl的初始化代码中(如下),就将它初始化为RUNNING状态,并且"任务数量"初始化为0。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


2. SHUTDOWN

(01) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(02) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3. STOP

(01) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(02) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。



4. TIDYING
(01) 状态说明:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(02) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5. TERMINATED
(01) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(02) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。









概要


本章介绍线程池的拒绝策略。内容包括:
拒绝策略介绍
拒绝策略对比和示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3512947.html

拒绝策略介绍


线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。
当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。

线程池共包括4种拒绝策略,它们分别是:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicyDiscardPolicy

AbortPolicy         -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
CallerRunsPolicy    -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
DiscardPolicy       -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。


线程池默认的处理策略是AbortPolicy!


拒绝策略对比和示例


下面通过示例,分别演示线程池的4种拒绝策略。
1. DiscardPolicy 示例
2. DiscardOldestPolicy 示例
3. AbortPolicy 示例
4. CallerRunsPolicy 示例

1. DiscardPolicy 示例

 1 import java.lang.reflect.Field; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.ThreadPoolExecutor; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; 6 
 7 public class DiscardPolicyDemo { 8 
 9     private static final int THREADS_SIZE = 1;10     private static final int CAPACITY = 1;11 
12     public static void main(String[] args) throws Exception {13 
14         // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
15         ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,16                 new ArrayBlockingQueue<Runnable>(CAPACITY));17         // 设置线程池的拒绝策略为"丢弃"
18         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());19 
20         // 新建10个任务,并将它们添加到线程池中。
21         for (int i = 0; i < 10; i++) {22             Runnable myrun = new MyRunnable("task-"+i);23             pool.execute(myrun);24         }25         // 关闭线程池
26         pool.shutdown();27     }28 }29 
30 class MyRunnable implements Runnable {31     private String name;32     public MyRunnable(String name) {33         this.name = name;34     }35     @Override36     public void run() {37         try {38             System.out.println(this.name + " is running.");39             Thread.sleep(100);40         } catch (Exception e) {41             e.printStackTrace();42         }43     }44 }


运行结果

task-0 is running.
task-1 is running.


结果说明线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!

2. DiscardOldestPolicy 示例


 1 import java.lang.reflect.Field; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.ThreadPoolExecutor; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; 6 
 7 public class DiscardOldestPolicyDemo { 8 
 9     private static final int THREADS_SIZE = 1;10     private static final int CAPACITY = 1;11 
12     public static void main(String[] args) throws Exception {13 
14         // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
15         ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,16                 new ArrayBlockingQueue<Runnable>(CAPACITY));17         // 设置线程池的拒绝策略为"DiscardOldestPolicy"
18         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());19 
20         // 新建10个任务,并将它们添加到线程池中。
21         for (int i = 0; i < 10; i++) {22             Runnable myrun = new MyRunnable("task-"+i);23             pool.execute(myrun);24         }25         // 关闭线程池
26         pool.shutdown();27     }28 }29 
30 class MyRunnable implements Runnable {31     private String name;32     public MyRunnable(String name) {33         this.name = name;34     }35     @Override36     public void run() {37         try {38             System.out.println(this.name + " is running.");39             Thread.sleep(200);40         } catch (Exception e) {41             e.printStackTrace();42         }43     }44 }


运行结果

task-0 is running.
task-9 is running.


结果说明将"线程池的拒绝策略"由DiscardPolicy修改为DiscardOldestPolicy之后,当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中末尾的任务,然后将被拒绝的任务添加到末尾。

3. AbortPolicy 示例


 1 import java.lang.reflect.Field; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.ThreadPoolExecutor; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; 6 import java.util.concurrent.RejectedExecutionException; 7 
 8 public class AbortPolicyDemo { 9 
10     private static final int THREADS_SIZE = 1;11     private static final int CAPACITY = 1;12 
13     public static void main(String[] args) throws Exception {14 
15         // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
16         ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,17                 new ArrayBlockingQueue<Runnable>(CAPACITY));18         // 设置线程池的拒绝策略为"抛出异常"
19         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());20 
21         try {22 
23             // 新建10个任务,并将它们添加到线程池中。
24             for (int i = 0; i < 10; i++) {25                 Runnable myrun = new MyRunnable("task-"+i);26                 pool.execute(myrun);27             }28         } catch (RejectedExecutionException e) {29             e.printStackTrace();30             // 关闭线程池
31             pool.shutdown();32         }33     }34 }35 
36 class MyRunnable implements Runnable {37     private String name;38     public MyRunnable(String name) {39         this.name = name;40     }41     @Override42     public void run() {43         try {44             System.out.println(this.name + " is running.");45             Thread.sleep(200);46         } catch (Exception e) {47             e.printStackTrace();48         }49     }50 }


(某一次)运行结果

java.util.concurrent.RejectedExecutionException
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
    at AbortPolicyDemo.main(AbortPolicyDemo.java:27)
task-0 is running.
task-1 is running.


结果说明将"线程池的拒绝策略"由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。

4. CallerRunsPolicy 示例


 1 import java.lang.reflect.Field; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.ThreadPoolExecutor; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 6 
 7 public class CallerRunsPolicyDemo { 8 
 9     private static final int THREADS_SIZE = 1;10     private static final int CAPACITY = 1;11 
12     public static void main(String[] args) throws Exception {13 
14         // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
15         ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,16                 new ArrayBlockingQueue<Runnable>(CAPACITY));17         // 设置线程池的拒绝策略为"CallerRunsPolicy"
18         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());19 
20         // 新建10个任务,并将它们添加到线程池中。
21         for (int i = 0; i < 10; i++) {22             Runnable myrun = new MyRunnable("task-"+i);23             pool.execute(myrun);24         }25 
26         // 关闭线程池
27         pool.shutdown();28     }29 }30 
31 class MyRunnable implements Runnable {32     private String name;33     public MyRunnable(String name) {34         this.name = name;35     }36     @Override37     public void run() {38         try {39             System.out.println(this.name + " is running.");40             Thread.sleep(100);41         } catch (Exception e) {42             e.printStackTrace();43         }44     }45 }


(某一次)运行结果

task-2 is running.
task-3 is running.
task-4 is running.
task-5 is running.
task-6 is running.
task-7 is running.
task-8 is running.
task-9 is running.
task-0 is running.
task-1 is running.



结果说明将"线程池的拒绝策略"由DiscardPolicy修改为CallerRunsPolicy之后,当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到"线程池正在运行的线程"中取运行。







概要


本章介绍线程池中的Callable和Future。
Callable 和 Future 简介
示例和源码分析(基于JDK1.7.0_40)

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3544116.html

Callable 和 Future 简介


  Callable 和 Future 是比较有趣的一对组合。当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。

1. Callable

Callable 是一个接口,它只包含一个call()方法。Callable是一个返回结果并且可能抛出异常的任务。

为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法。

Callable的源码如下:

public interface Callable<V> {
    V call() throws Exception;
}


说明:从中我们可以看出Callable支持泛型。



2. Future

Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

Future的源码如下:

public interface Future<V> {
    // 试图取消对此任务的执行。
    boolean     cancel(boolean mayInterruptIfRunning)

    // 如果在任务正常完成前将其取消,则返回 true。
    boolean     isCancelled()

    // 如果任务已完成,则返回 true。
    boolean     isDone()

    // 如有必要,等待计算完成,然后获取其结果。
    V           get() throws InterruptedException, ExecutionException;

    // 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
    V             get(long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException;
}


说明: Future用于表示异步计算的结果。它的实现类是FutureTask,在讲解FutureTask之前,我们先看看Callable, Future, FutureTask它们之间的关系图,如下:


说明
(01) RunnableFuture是一个接口,它继承了Runnable和Future这两个接口。RunnableFuture的源码如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}


(02) FutureTask实现了RunnableFuture接口。所以,我们也说它实现了Future接口。


示例和源码分析(基于JDK1.7.0_40)


我们先通过一个示例看看Callable和Future的基本用法,然后再分析示例的实现原理。

 1 import java.util.concurrent.Callable; 2 import java.util.concurrent.Future; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.ExecutionException; 6 
 7 class MyCallable implements Callable { 8 
 9     @Override 10     public Integer call() throws Exception {11         int sum    = 0;12         // 执行任务
13         for (int i=0; i<100; i++)14             sum += i;15         //return sum; 
16         return Integer.valueOf(sum);17     } 18 }19 
20 public class CallableTest1 {21 
22     public static void main(String[] args) 23         throws ExecutionException, InterruptedException{24         //创建一个线程池
25         ExecutorService pool = Executors.newSingleThreadExecutor();26         //创建有返回值的任务
27         Callable c1 = new MyCallable();28         //执行任务并获取Future对象 
29         Future f1 = pool.submit(c1);30         // 输出结果
31         System.out.println(f1.get()); 32         //关闭线程池 
33         pool.shutdown(); 34     }35 }


运行结果

4950


结果说明
  在主线程main中,通过newSingleThreadExecutor()新建一个线程池。接着创建Callable对象c1,然后再通过pool.submit(c1)将c1提交到线程池中进行处理,并且将返回的结果保存到Future对象f1中。然后,我们通过f1.get()获取Callable中保存的结果;最后通过pool.shutdown()关闭线程池。



1. submit()

submit()在java/util/concurrent/AbstractExecutorService.java中实现,它的源码如下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 创建一个RunnableFuture对象
    RunnableFuture<T> ftask = newTaskFor(task);
    // 执行“任务ftask”
    execute(ftask);
    // 返回“ftask”
    return ftask;
}


说明:submit()通过newTaskFor(task)创建了RunnableFuture对象ftask。它的源码如下:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}




2. FutureTask的构造函数

FutureTask的构造函数如下:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    // callable是一个Callable对象
    this.callable = callable;
    // state记录FutureTask的状态
    this.state = NEW;       // ensure visibility of callable
}



3. FutureTask的run()方法

我们继续回到submit()的源码中。
在newTaskFor()新建一个ftask对象之后,会通过execute(ftask)执行该任务。此时ftask被当作一个Runnable对象进行执行,最终会调用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中实现,源码如下:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        // 将callable对象赋值给c。
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行Callable的call()方法,并保存结果到result中。
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            // 如果运行成功,则将result保存
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        // 设置“state状态标记”
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


说明run()中会执行Callable对象的call()方法,并且最终将结果保存到result中,并通过set(result)将result保存。
      之后调用FutureTask的get()方法,返回的就是通过set(result)保存的值。