NOTE本篇笔记基于《Java并发编程实战》第6章 - 任务执行
1. 在线程中执行任务
在并发应用程序中,大多都是围绕着“任务执行”来进行构建的。而在执行任务的过程中,往往会有不同的调度策略可供选择。下面将简单介绍两种调度策略,以探究如何更好的构建并发性良好的策略。
策略一:串行地执行任务
最简单的策略就是在单个线程中串行地执行各项任务,在下面这个例子中,Web服务按序对请求进行处理。这种方式理论上是正确的,但是在实际生产环境中,它的并发性极为糟糕。由于同时只能处理一个请求,因此非常容易造成阻塞和资源浪费,因此在大多数的应用中都不会采取这种策略。
TIP在某些情况下,串行处理方式能够带来简单性和安全性,大多数GUI框架都通过单一线程来处理任务,后续第9章中将进一步探讨串行模型。
class singleThreadWebServer{
public static void main(String[] args) {
ServerSocket serverSocket = new ServerSocket(80);
while (true){
Socket connection = serverSocket.accept();
handleRequest(connection);
}
}
}
策略二:显式地为任务创建线程
通过为每一个请求都创建一个新的线程来提供服务,可以有效提高响应性。但是当需要创建大量线程时,这种方法也有严重的问题。由于线程生命周期的开销非常高,并且活跃的线程会消耗系统资源,这就导致当创建过多线程时,性能反而下降。此外,由于可创建的线程数量存在一定的上限,如果创建过多的线程,还可能会导致系统抛出OutOfMemoryError
异常,将严重损害系统稳定性。
class ThreadPerTaskWebServer {
public static void main(String[] args) {
ServerSocket serverSocket = new ServerSocket(80);
while (true) {
final Socket connection = serverSocket.accept();
new Thread(() -> {
handleRequest(connection);
}).start();
}
}
}
2. Executor框架
Executor
框架是一套高级的工具,用于管理线程池、任务调度和并发执行。它抽象出了任务的提交和任务的执行,将它们解耦。开发者只需关注提交任务,而不必处理线程的生命周期和复杂的同步问题。
Executor
接口定义了一个单方法:
public interface Executor {
void execute(Runnable command);
}
- 设计思想:任务提交给
Executor
对象,Executor
负责执行任务,而不是开发者直接管理线程,通过这种方式就实现了解耦。
下面就给出基于Executor
实现的Web服务,通过创建一个线程池,可以有效解决前面两种策略所带来的问题
class ThreadPreTaskWebServer() {
Executor executor = Executors.newFixedThreadPool(100);
try (ServerSocket socket = new ServerSocket(80)) {
while (true) {
Socket connection = socket.accept();
executor.execute(() -> handleRequest(connection));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
2.1 线程池
线程池是一个预先创建并维护固定数量线程的组件,用来执行多任务。任务提交到线程池后,由线程池中的线程按需取出执行。线程池通过复用线程来减少创建和销毁线程的开销,提供了一种高效的任务调度方式。
在Java中,java.util.concurrent.Executors
提供了一些静态工厂方法,用于创建常见的线程池实例。但是在实际的开发过程中应该尽可能避免(最好是禁止)使用Executors
返回的线程池对象,因为这些返回的线程池默认请求队列或允许创建线程的最大值为Integer.MAX_VALUE
,这会导致请求或线程的大量堆积,从而导致OOM。
因此,最好使用ThreadPoolExecutor
或第三方开源类库来构建线程池。
public ThreadPoolExecutor(int corePoolSize, //核心线程数
int maximumPoolSize, //线程池最大线程数
long keepAliveTime, //线程空闲时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler) //任务拒绝策略
2.2 Executor的生命周期
为了解决执行服务的生命周期问题,Executor
扩展了ExecutorService
接口,添加了一些用于管理生命周期的方法,如下所示:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ... 其他用于任务提交的方法
}
ExecutorService
的生命周期有三种状态:运行、关闭和已终止。如果ExecutorService
没有被正确关闭,它可能会导致资源泄漏(如线程占用、内存泄漏等)。因此,在应用程序结束时,必须确保调用shutdown()
或shutdownNow()
来关闭线程池。
shutdown()
:执行完所有已经提交的任务后,线程池将不再接收新的任务,并在当前任务执行完毕后关闭线程池。这是一个优雅的关闭方式,它不会立刻停止正在运行的任务,而是等待任务完成后再退出。shutdownNow()
:尝试停止当前正在执行的任务并返回尚未执行的任务列表。该方法试图中断当前正在执行的任务,但不能保证它会立即停止执行。这是一个粗暴的关闭方式,通常用于程序出现异常或无法正常关闭时的应急处理。
在调用shutdown()
或shutdownNow()
后,可以通过awaitTermination()
方法来等待线程池关闭完成。awaitTermination()
会一直阻塞,直到线程池关闭完成或者超时。
2.3 延迟任务与周期任务
在Java中Timer
类负责管理延迟任务和周期任务,但是Timer
在执行所有定时任务时只会创建一个线程,如果某个任务执行时间过长,就会破坏其他定时任务的时间精确性。例如某个任务需要10ms执行一次,但是该任务的执行时间超过了10ms,这就会导致任务的堆积或是丢失。此外,当Timer
抛出一个未检查的异常时,其将终止定时线程。在这种情况下,它不会恢复线程的执行,而是错误地认为整个任务线程都被取消了。
因此,我们可以使用ScheduledThreadPoolExecutor
来替代Timer
,它支持按给定延迟执行任务或按固定间隔周期执行任务,更加灵活和高效,特别是在需要并发执行多个定时任务时。
3. 找出可利用的并行性
在本小节中,将以构造一个网页渲染器。在下面这段代码中,给出了一套串行执行的方法。在这个过程中,文本和图像的渲染按顺序执行,但是由于图像的下载过程中大部分时间都是在等待I/O操作的完成,在此期间CPU资源被浪费,因此我们需要想办法进行改进,以提高资源利用效率。
public class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageData.add(imageInfo.downloadImage());
}
for (ImageData data : imageData) {
renderImage(data);
}
}
}
首先根据前文的内容,我们很容易想到,将渲染文本和下载图像分为两个任务,在不同线程中同时进行,这样就能更加充分的利用资源。但是事实上,由于这两个任务的任务量相差很大(渲染文本要远远快于图像下载),因此这种协调所带来的性能提升可能还不如额外的开销,只有大量互相独立且同构的任务可以并发处理时,才能带来明显的性能提升。
顺着这个思路,我们可以想到,为每一张图片的下载都创建一个单独的任务,并在线程池中执行它们,从而将串行的下载过程转换为并行过程,有效地减少下载所有图像的总时间。但这种方法还有一个潜在的问题,图片需要按序插入,而多线程下载可能会打乱顺序,这时,我们就可以借助CompletionService
来完成。
在 Java 中,CompletionService
是一个用于管理并发任务并返回任务执行结果的接口。它结合了 ExecutorService
和 BlockingQueue
,通过提供一个队列来存储完成的任务结果,使得任务执行的结果可以以非阻塞的方式逐个获取。
那么最终的代码如下所示:
public class Render{
private final ExecutorService executor;
Render(ExecutorService executor){
this.executor = executor;
}
void renderPage(CharSequence source){
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for (ImageInfo imageInfo : info) {
completionService.submit(imageInfo::download);
}
renderText(source);
try {
for (int i = 0; i < info.size(); i++) {
Future<ImageData> take = completionService.take();
ImageData imageData = take.get();
renderImage(imageData);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}