前言

关于Java 的线程问题,我们上大学的时候,计算机专业的学生肯定会遇到这两个名词–线程和进程,老师和我们说一个进程里面可以有多个线程,这里也引出了多线程的概念。可以看一看下面这张图

提到Java中的线程,我们就会想到Thread以及Runnable,也了解了Thread的生命周期,请看下图:

相信大家也写过关于线程的小程序,比如卖火车票啊,生产者消费者模式等。我们使用到了synchronized加锁技术(或者使用Lock方法),也用到了关于线程通知,等待任务的notifyAll()和wait()方法。更多的使用大家找一本大书去看吧,比如《Java核心技术》什么的,Java基础不是本文的重点。下面我们谈谈线程池和多线程的原理和使用,以及整合springboot框架。

一、线程池

在多线程任务环境中,每次开启一个任务,我们把它提交到线程池中,交给线程池来管理,由线程池来调度任务。如果每次开启任务都要创建一个线程,那么对于大量任务的环境下,服务器资源将会很快耗尽,不仅对服务是一个灾难,对于用户体验来说也是一个灾难。此时,我们来谈谈Java中线程池技术。

1、线程池种类
(1)newSingleThreadExecutor

单线程的线程池,在这个线程池中的所有任务只有一个线程去执行,如果当前的线程因为异常结束,将会有新的线程来执行,保证任务按照顺序执行下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}
}
(2)newFixedThreadPool

创建一个固定线程数目的线程池,每次执行一个新的任务都会创建一个新的线程,直到达到我们设定的线程数。同样的,如果某个线程因为异常而结束,就会补充一个新的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}
}
(3)newCachedThreadPool

创建一个可缓存的线程池,如果线程池大小超过了任务所需的线程数目,线程池将会回收多余的线程,相反,任务所需线程增加,线程池也会增加线程,弹性修改线程池中的线程数目。

1
ExecutorService pool = Executors.newCachedThreadPool();
(4)newScheduledThreadPool

此线程支持周期性和定时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.schedule(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
}, 10, TimeUnit.SECONDS);//延迟10秒执行任务
}
}
}

当我们有定时和周期性任务需求的时候,下面是每隔1秒执行一次

1
2
3
4
//pool.scheduleWithFixedDelay也可以
pool.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");
}, 1, 1, TimeUnit.SECONDS);
(5)newWorkStealingPool

jdk8才有的,会根据所给的并行层次来动态地开启关闭线程,通过使用多个队列减少竞争,底层使用ForkJoinPool来实现的。充分利用多CPU多核,将一个任务拆分位多个小任务,放到多个处理器中执行,等这些小任务完成之后,再次合并为这个完整的任务。

源码:参数parallelism应该是并行层次吧,参考无参的那个方法,这个parallelism应该是可用处理器数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

/**
* Creates a work-stealing thread pool using all
* {@link Runtime#availableProcessors available processors}
* as its target parallelism level.
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
//Runtime.getRuntime().availableProcessors()返回的是可用处理器数目
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
使用场景
  • newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。
  • newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。
  • newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。
  • newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
  • newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。
2、线程池的拒绝策略

当任务过多,而且处理器无法处理额外的任务,我们所要要做的就是拒绝服务,java中提供了RejectedExecutionHandler接口,通过实现这个接口的rejectedExecution()方法实现不同的拒绝策略。下面是源码查看一波。

(1)AbortPolicy策略

该策略直接抛出异常,阻止系统正常运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }

/**
* 总是抛出异常
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
(2)CallerRunsPolicy 策略

只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }

/**
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
(3)DiscardOldestPolicy策略

该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//获取队列,并且将e插入到队列中,等待合适的时间去执行
e.getQueue().poll();
//执行任务
e.execute(r);
}
}
}
(4)DiscardPolicy 策略

这个策略默默丢弃无法处理的任务,并且不做任何处理

1
2
3
4
5
6
7
8
9
10
11
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//什么也不做
}
}
3、线程池关闭

shutdownNow:对未开始的任务全部取消执行,并且返回未执行任务的list集合。

shutdown:不接受新提交的任务,不影响已经提交的任务执行。

二、生产活动中使用

1、CountDownLatch 方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");

final static String startTime = sdf.format(new Date());

/**
* IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
* CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
* 混合型任务 = 视机器配置和复杂度自测而定
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize用于指定核心线程数量
* maximumPoolSize指定最大线程数
* keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
* workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
* 监控队列长度,确保队列有界
* 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
* 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
* ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
//使用execute方法
executor.execute(new Stats("任务A", 1000, latch));
executor.execute(new Stats("任务B", 1000, latch));
executor.execute(new Stats("任务C", 1000, latch));
executor.execute(new Stats("任务D", 1000, latch));
executor.execute(new Stats("任务E", 1000, latch));
latch.await();// 等待所有人任务结束
System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
}

static class Stats implements Runnable {
String statsName;
int runTime;
CountDownLatch latch;

public Stats(String statsName, int runTime, CountDownLatch latch) {
this.statsName = statsName;
this.runTime = runTime;
this.latch = latch;
}

public void run() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模拟任务执行时间
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
latch.countDown();//单次任务结束,计数器减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果

2、Future 方式

重点是和springboot整合,采用注解bean方式生成ThreadPoolTaskExecutor

@Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//spring依赖包
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {

/**
* 默认线程池线程池
*
* @return Executor
*/
@Bean
public ThreadPoolTaskExecutor defaultThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程数目
executor.setCorePoolSize(16);
//指定最大线程数
executor.setMaxPoolSize(64);
//队列中最大的数目
executor.setQueueCapacity(16);
//线程名称前缀
executor.setThreadNamePrefix("defaultThreadPool_");
//rejection-policy:当pool已经达到max size的时候,如何处理新任务
//CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
//对拒绝task的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//线程空闲后的最大存活时间
executor.setKeepAliveSeconds(60);
//加载
executor.initialize();
return executor;
}
}
使用
1
2
3
//通过注解引入配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//使用Future方式执行多任务
//生成一个集合
List<Future> futures = new ArrayList<>();

//获取后台全部有效运营人员的集合
List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);

for (AdminUserMsgResponse response : adminUserDOList) {
//并发处理
if (response.getMobile() != null) {
Future<?> future = executor.submit(() -> {
//发送短信
mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
});
futures.add(future);
}
}

//查询任务执行的结果
for (Future<?> future : futureList) {
while (true) {
//CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
if (future.isDone()&& !future.isCancelled()) {
//获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。

break;//当前future获取结果完毕,跳出while
} else {
Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
}
}
}