线程的基本状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public enum State {
/**
* Thread state for a thread which has not yet started.
* 初始状态,线程被构建,但是还没有调用start()方法
*/
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
* 运行状态,Java线程将操作系统中的就绪和运行状态笼统的称作:运行中
*
*/
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
* 阻塞状态,表示线程阻塞于锁
*/
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
* 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其它线程做出一些特定的动作
*(通知或中断)
*/
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
* 超时等待状态,该状态不同于WAITING,它是可以在指定时间自行返回的
*
*/
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
* 终止状态,表示当前线程已经执行完毕
*/
TERMINATED;
}

线程并不是始终固定于某一个状态而是随着代码的执行在不同状态之间进行切换的。


线程的创建方式

继承Thread类

继承 Thread 类,并重写 run ⽅法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestThread {

public static void main(String[] args) {

Thread thread = new Thread(new ThreadDemo());
thread.start();
}

static class ThreadDemo extends Thread{
@Override
public void run() {
System.out.println("MyThread");
}
}
}

我们在程序里面调用了start()方法后,虚拟机会先为我们创建⼀个线程,然后等到这个线程第⼀次得到时间片时再调⽤run()方法。 注意不可多次调用start()方法。在第⼀次调start()方法后,再次调用start() 方法会抛出异常


实现Runnable接口

接着我们来看⼀下 Runnable 接⼝(JDK 1.8 +):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

一个经典的函数式接口,这意味着我们可以使用Java 8 的函数式编程来简化代码。

1
2
3
4
5
6
7
8
public class ImplRunnable {

public static void main(String[] args) {
//静态代理的方式实现线程
new Thread(()-> System.out.println("Impl Runnable Thread")).start();
}

}

new Thread(Runnable target)本质上是使用静态代理的方式实现的。

Thread类构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

public Thread(Runnable target, String name) {
init(null, target, name, 0);
}

// ⽚段1 - init⽅法
private void init(ThreadGroup g, Runnable target, String name,long stackSize,AccessControlContext acc,boolean inheritThreadLocals)

// ⽚段2 - 构造函数调⽤init⽅法
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

// ⽚段3 - 使⽤在init⽅法⾥初始化AccessControlContext类型的私有属性
this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext();

// ⽚段4 - 两个对⽤于⽀持ThreadLocal的私有属性
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

Thread类的几个常用的方法
  • currentThread():静态方法,返回对当前正在执行的线程对象的引用;
  • start():开始执行线程的方法,java虚拟机会调⽤线程内的run()方法;
  • yield():yield在英语里有放弃的意思,同样,这里的yield()指的是当前线程愿意让出对当前处理器的占用。这里需要注意的是,就算当前线程调⽤了yield()方法,程序在调度的时候,也还有可能继续运行这个线程的;
  • sleep():静态方法,使当前线程睡眠⼀段时间;
  • join():使当前线程等待另⼀个线程执行完毕之后再继续执行,内部调用的是Object类的wait方法实现的;
Thread类与Runnable接口的比较

实现⼀个自定义的线程类,可以有继承 Thread 类或者实现 Runnable 接⼝这两种方式,它们之间有什么优劣呢?

  • 由于Java“单继承,多实现”的特性,Runnable接口使⽤起来比Thread更灵活。
  • Runnable接口出现更符合面向对象,将线程单独进行对象的封装。
  • Runnable接口出现,降低了线程对象和线程任务的耦合性。
  • 如果使用线程时不需要使⽤Thread类的诸多⽅法,显然使用Runnable接口更为轻量。

所以,我们通常优先使用“实现 Runnable 接口”这种方式来自定义线程类


Callable接口、Future接口

使用以上两种方式创建的线程是无返回值的。而有时候我们希望开启线程去执行某项任务时能动态的返回执行完毕后的结果。

JDK提供了 Callable 接口与 Future 类为我们解决这个问题,这也是所谓的“异步”模型。

Callable接口
1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

Callable接口一般是配合线程池工具ExecutorService来使用的。通过submit()方法让一个Callable接口执行。它会返回一个Future。通过Futureget方法可以得到执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ImplCallable implements Callable<String> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
@Override
public String call() throws Exception {
return "Impl Callable";
}

public static void main(String[] args) {
//这里是演示使用该方法,正常情况下是不允许的
ExecutorService executorService = Executors.newCachedThreadPool();
ImplCallable implCallable = new ImplCallable();
Future<String> submit = executorService.submit(implCallable);
System.out.println(submit.get());
executorService.shutdown();
}
}

/**
* 运行结果:
* Impl Callable
*/

注意:阿里巴巴开发手册:线程池不允许使用 Executors 去创建后文会提到。

Future接口
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Future<V> {

//试图取消一个线程的执行,返回是否取消成功
boolean cancel(boolean mayInterruptIfRunning);
//判断当前方法是否取消
boolean isCancelled();
//判断当前方法是否完成
boolean isDone();
//当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕
V get() throws InterruptedException, ExecutionException;
//多等待timeout的时间就会返回结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

一般Future是和线程池搭配使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

/**
* Future和ExecutorService搭配使用
* 运行程序的主机是12线程的,因此启动了12个线程,每次会同时执行12个task。
*
*/
public class TestFuture {

public static void main(String[] args) throws InterruptedException, ExecutionException {
//匿名内部类声明
Callable<Long> callable = new Callable<Long>() {
@Override
public Long call() throws Exception {

long start = System.currentTimeMillis();
Thread.sleep(100);
long end = System.currentTimeMillis();

long seed = end - start;
System.out.println(Thread.currentThread().getName()+" seed=" + seed);

return seed;
}
};
List<Callable<Long>> tasks = new ArrayList<>();
//循环添加24次
for(int i = 0; i < 24; i++) {
tasks.add(callable);
}
//判断计算机的线程
int poolSize = Runtime.getRuntime().availableProcessors();
System.out.println("poolSize=" + poolSize);
ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
List<Future<Long>> futures = executorService.invokeAll(tasks);

long result = 0;
for (Future<Long> future : futures) {
result += future.get();
}
System.out.println("result=" + result);

executorService.shutdown();
}
}
运行结果:
    poolSize=12
    pool-1-thread-8 seed=100
    pool-1-thread-4 seed=100
    pool-1-thread-2 seed=100
    pool-1-thread-3 seed=100
    pool-1-thread-7 seed=100
    pool-1-thread-5 seed=100
    pool-1-thread-1 seed=100
    pool-1-thread-6 seed=100
    pool-1-thread-9 seed=100
    pool-1-thread-10 seed=100
    pool-1-thread-11 seed=100
    pool-1-thread-12 seed=100
    pool-1-thread-9 seed=101
    pool-1-thread-1 seed=101
    pool-1-thread-3 seed=101
    pool-1-thread-10 seed=101
    pool-1-thread-12 seed=101
    pool-1-thread-11 seed=101
    pool-1-thread-8 seed=101
    pool-1-thread-4 seed=101
    pool-1-thread-2 seed=101
    pool-1-thread-7 seed=101
    pool-1-thread-5 seed=101
    pool-1-thread-6 seed=101
    result=2412

基于线程池的方式

1
池化技术相信大家已经屡见不鲜了,线程池、数据库连接池、 Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
使用线程池的好处
  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
如何创建线程池

《阿里巴巴Java开发手册》中强制线程池不允许使用Executors 去创建,而是通过ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险 。

《阿里巴巴Java开发手册-嵩山版》

ThreadPoolExecutor 类提供了4个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);

//⽤给定的初始参数创建⼀个新的ThreadPoolExecutor。以上3个构造方法都是建立在这个基础上
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);

ThreadPoolExecutor构造函数重要参数分析
ThreadPoolExecutor 3个最重要的参数:

  • corePoolSize: 核心线程数线程数定义了最小可以同时运行的线程数量。

核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会⼀直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。

  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

该值等于核心线程数量 + 非核心线程数量。

  • workQueue :阻塞队列, 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他参数:

  • keepAliveTime: 非核心线程闲置超时时长。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime才会被回收销毁;

  • unit: 上面参数的时间单位。

    unit是⼀个枚举类型 ,包括以下属性:

    • NANOSECONDS : 1微毫秒 = 1微秒 / 1000
    • MICROSECONDS : 1微秒 = 1毫秒 / 1000
    • MILLISECONDS : 1毫秒 = 1秒 /1000
    • SECONDS : 秒
    • MINUTES : 分
    • HOURS : 小时
    • DAYS : 天
    • threadFactory: 创建线程的⼯⼚ ,⽤于批量创建线程,统⼀在创建线程时设置⼀些参数,如是
      否守护线程、线程的优先级等。如果不指定,会新建⼀个默认的线程工厂
  • handler:饱和策略。

饱和策略:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor定义一些策略:

RejectedExecutionHandler具体的实现类有4种:

RejectedExecutionHandler实现类

可以参考这4个实体类的源码,饱和后的操作都是实现rejectedExecution(Runnable r,ThreadPoolExecutor executor)方法 ,它们都是ThreadPoolExecutor 的静态内部类:

  1. ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException 来拒绝新任
    务的处理 。
  2. ThreadPoolExecutor.CallerRunsPolicy :简单点说就是后面排队的线程就在那儿等着。调用执行当前传入的线程进行运行。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果应用程序可以承受此延迟并且不能丢弃任何⼀个任务请求的话,你可以选择这个策略。
  3. ThreadPoolExecutor.DiscardPolicy : 不处理新任务,直接丢弃掉。
  4. ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。

线程池主要的任务处理流程 (重要)

线程池执行示意图

当线程池接收到一个任务时,主要是通过源码中的execute()方法去执行的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 1.当前线程数⼩于corePoolSize,则调⽤addWorker创建核⼼线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果不⼩于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行饱和策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果放⼊workQueue失败,则创建非核⼼线程执⾏任务,
// 如果这时创建非核⼼线程失败(当前线程总数不⼩于maximumPoolSize时),就会执行饱和策略。
else if (!addWorker(command, false))
reject(command);
}

主要分为3步:

1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

实例演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ThreadPoolExecutorDemo {

// 最大核心线程数
public static final int CORE_POOL_SIZE = 5;
// 最大线程数
public static final int MAXINUM_POOL_SIZE = 10;
// 最大队列长度
public static final int QUEUE_CAPACITY = 100;
// 非核心线程闲置超时时间
public static final Long KEEP_ALIVE_TIME = 1L;

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXINUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

//1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行
//任务(让核心线程数量快速达到corePoolSize,在核心线程数量 <corePoolSize时)。
// 注意,这一步需要获得全局锁。

//2. 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然
//后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。

//3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行
//这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得
//全局锁。

//4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的
//饱和策略进行处理。
//最大任务数
int maxTask = 1000;
for (int i = 0; i < maxTask; i++) {
Runnable task = ()->{
System.out.println(Thread.currentThread().getName() + " Start.Time = " +new SimpleDateFormat("HH:mm:ss").format(new Date()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " End.Time = " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
};

// 执行Runnable
executor.execute(task);
}
executor.shutdown();
}
}
1. 当maxTask小于缓存队列的长度时(如:maxTask = 10),线程池不需要创建非核心线程来处理任务
    运行结果:
    pool-1-thread-2 Start.Time = 09:25:19
    pool-1-thread-3 Start.Time = 09:25:19
    pool-1-thread-5 Start.Time = 09:25:19
    pool-1-thread-4 Start.Time = 09:25:19
    pool-1-thread-1 Start.Time = 09:25:19
    pool-1-thread-1 End.Time = 09:25:21
    pool-1-thread-2 End.Time = 09:25:21
    pool-1-thread-4 End.Time = 09:25:21
    pool-1-thread-3 End.Time = 09:25:21
    pool-1-thread-5 End.Time = 09:25:21
    pool-1-thread-4 Start.Time = 09:25:21
    pool-1-thread-3 Start.Time = 09:25:21
    pool-1-thread-2 Start.Time = 09:25:21
    pool-1-thread-1 Start.Time = 09:25:21
    pool-1-thread-5 Start.Time = 09:25:21
    pool-1-thread-3 End.Time = 09:25:23
    pool-1-thread-5 End.Time = 09:25:23
    pool-1-thread-1 End.Time = 09:25:23
    pool-1-thread-2 End.Time = 09:25:23
    pool-1-thread-4 End.Time = 09:25:23

    Process finished with exit code 0

2. 当maxTask大于缓存队列的长度时(如:maxTask = 210),线程池不需要创建非核心线程来处理任务
    运行结果为:
    pool-1-thread-10 Start.Time = 09:26:56
    pool-1-thread-7 Start.Time = 09:26:56
    pool-1-thread-1 Start.Time = 09:26:56
    pool-1-thread-9 Start.Time = 09:26:56
    pool-1-thread-8 Start.Time = 09:26:56
    pool-1-thread-2 Start.Time = 09:26:56
    pool-1-thread-4 Start.Time = 09:26:56
    pool-1-thread-6 Start.Time = 09:26:56
    main Start.Time = 09:26:56
    pool-1-thread-3 Start.Time = 09:26:56
    pool-1-thread-5 Start.Time = 09:26:56
    pool-1-thread-5 End.Time = 09:26:58
    pool-1-thread-3 End.Time = 09:26:58
    pool-1-thread-1 End.Time = 09:26:58
    pool-1-thread-6 End.Time = 09:26:58
    pool-1-thread-9 End.Time = 09:26:58
    ......

评论