2197 words
11 minutes
Java并发编程-任务执行
NOTE

本篇笔记基于《Java并发编程实战》第6章 - 任务执行

1. 在线程中执行任务#

在并发应用程序中,大多都是围绕着“任务执行”来进行构建的。而在执行任务的过程中,往往会有不同的调度策略可供选择。下面将简单介绍两种调度策略,以探究如何更好的构建并发性良好的策略。

策略一:串行地执行任务

最简单的策略就是在单个线程中串行地执行各项任务,在下面这个例子中,Web服务按序对请求进行处理。这种方式理论上是正确的,但是在实际生产环境中,它的并发性极为糟糕。由于同时只能处理一个请求,因此非常容易造成阻塞和资源浪费,因此在大多数的应用中都不会采取这种策略。

TIP

在某些情况下,串行处理方式能够带来简单性和安全性,大多数GUI框架都通过单一线程来处理任务,后续第9章中将进一步探讨串行模型。

class singleThreadWebServer{
    public static void main(String[] args) {
        ServerSocket serverSocket = new ServerSocket(80);
        while (true){
            Socket connection = serverSocket.accept();
            handleRequest(connection);
        }
    }
}

策略二:显式地为任务创建线程

通过为每一个请求都创建一个新的线程来提供服务,可以有效提高响应性。但是当需要创建大量线程时,这种方法也有严重的问题。由于线程生命周期的开销非常高,并且活跃的线程会消耗系统资源,这就导致当创建过多线程时,性能反而下降。此外,由于可创建的线程数量存在一定的上限,如果创建过多的线程,还可能会导致系统抛出OutOfMemoryError异常,将严重损害系统稳定性。

class ThreadPerTaskWebServer {
    public static void main(String[] args) {
        ServerSocket serverSocket = new ServerSocket(80);
        while (true) {
            final Socket connection = serverSocket.accept();
            new Thread(() -> {
                handleRequest(connection);
            }).start();
        }
    }
}

2. Executor框架#

Executor框架是一套高级的工具,用于管理线程池、任务调度和并发执行。它抽象出了任务的提交和任务的执行,将它们解耦。开发者只需关注提交任务,而不必处理线程的生命周期和复杂的同步问题。

Executor接口定义了一个单方法:

public interface Executor {
    void execute(Runnable command);
}
  • 设计思想:任务提交给Executor对象,Executor负责执行任务,而不是开发者直接管理线程,通过这种方式就实现了解耦。

下面就给出基于Executor实现的Web服务,通过创建一个线程池,可以有效解决前面两种策略所带来的问题

class ThreadPreTaskWebServer() {
    Executor executor = Executors.newFixedThreadPool(100);
    try (ServerSocket socket = new ServerSocket(80)) {
        while (true) {
            Socket connection = socket.accept();
            executor.execute(() -> handleRequest(connection));
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

2.1 线程池#

线程池是一个预先创建并维护固定数量线程的组件,用来执行多任务。任务提交到线程池后,由线程池中的线程按需取出执行。线程池通过复用线程来减少创建和销毁线程的开销,提供了一种高效的任务调度方式。

在Java中,java.util.concurrent.Executors 提供了一些静态工厂方法,用于创建常见的线程池实例。但是在实际的开发过程中应该尽可能避免(最好是禁止)使用Executors返回的线程池对象,因为这些返回的线程池默认请求队列或允许创建线程的最大值为Integer.MAX_VALUE,这会导致请求或线程的大量堆积,从而导致OOM。

因此,最好使用ThreadPoolExecutor或第三方开源类库来构建线程池。

public ThreadPoolExecutor(int corePoolSize,					//核心线程数
                          int maximumPoolSize,				//线程池最大线程数
                          long keepAliveTime,				//线程空闲时间
                          TimeUnit unit,					//时间单位
                          BlockingQueue<Runnable> workQueue,//任务队列
                          ThreadFactory threadFactory,		//线程工厂
                          RejectedExecutionHandler handler)	//任务拒绝策略

2.2 Executor的生命周期#

为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于管理生命周期的方法,如下所示:

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    // ... 其他用于任务提交的方法
}

ExecutorService的生命周期有三种状态:运行、关闭和已终止。如果ExecutorService没有被正确关闭,它可能会导致资源泄漏(如线程占用、内存泄漏等)。因此,在应用程序结束时,必须确保调用shutdown()shutdownNow()来关闭线程池。

  • shutdown():执行完所有已经提交的任务后,线程池将不再接收新的任务,并在当前任务执行完毕后关闭线程池。这是一个优雅的关闭方式,它不会立刻停止正在运行的任务,而是等待任务完成后再退出。
  • shutdownNow():尝试停止当前正在执行的任务并返回尚未执行的任务列表。该方法试图中断当前正在执行的任务,但不能保证它会立即停止执行。这是一个粗暴的关闭方式,通常用于程序出现异常或无法正常关闭时的应急处理。

在调用shutdown()shutdownNow()后,可以通过awaitTermination()方法来等待线程池关闭完成。awaitTermination()会一直阻塞,直到线程池关闭完成或者超时。

2.3 延迟任务与周期任务#

在Java中Timer类负责管理延迟任务和周期任务,但是Timer在执行所有定时任务时只会创建一个线程,如果某个任务执行时间过长,就会破坏其他定时任务的时间精确性。例如某个任务需要10ms执行一次,但是该任务的执行时间超过了10ms,这就会导致任务的堆积或是丢失。此外,当Timer抛出一个未检查的异常时,其将终止定时线程。在这种情况下,它不会恢复线程的执行,而是错误地认为整个任务线程都被取消了。

因此,我们可以使用ScheduledThreadPoolExecutor来替代Timer,它支持按给定延迟执行任务或按固定间隔周期执行任务,更加灵活和高效,特别是在需要并发执行多个定时任务时。

3. 找出可利用的并行性#

在本小节中,将以构造一个网页渲染器。在下面这段代码中,给出了一套串行执行的方法。在这个过程中,文本和图像的渲染按顺序执行,但是由于图像的下载过程中大部分时间都是在等待I/O操作的完成,在此期间CPU资源被浪费,因此我们需要想办法进行改进,以提高资源利用效率。

public class SingleThreadRenderer {
    void renderPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageData = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanForImageInfo(source)) {
            imageData.add(imageInfo.downloadImage());
        }
        for (ImageData data : imageData) {
            renderImage(data);
        }
    }
}

首先根据前文的内容,我们很容易想到,将渲染文本和下载图像分为两个任务,在不同线程中同时进行,这样就能更加充分的利用资源。但是事实上,由于这两个任务的任务量相差很大(渲染文本要远远快于图像下载),因此这种协调所带来的性能提升可能还不如额外的开销,只有大量互相独立且同构的任务可以并发处理时,才能带来明显的性能提升。

顺着这个思路,我们可以想到,为每一张图片的下载都创建一个单独的任务,并在线程池中执行它们,从而将串行的下载过程转换为并行过程,有效地减少下载所有图像的总时间。但这种方法还有一个潜在的问题,图片需要按序插入,而多线程下载可能会打乱顺序,这时,我们就可以借助CompletionService来完成。

在 Java 中,CompletionService 是一个用于管理并发任务并返回任务执行结果的接口。它结合了 ExecutorServiceBlockingQueue,通过提供一个队列来存储完成的任务结果,使得任务执行的结果可以以非阻塞的方式逐个获取。

那么最终的代码如下所示:

public class Render{
    private final ExecutorService executor;
    Render(ExecutorService executor){
        this.executor = executor;
    }
    void renderPage(CharSequence source){
        List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
        for (ImageInfo imageInfo : info) {
            completionService.submit(imageInfo::download);
        }
        renderText(source);
        try {
            for (int i = 0; i < info.size(); i++) {
                Future<ImageData> take = completionService.take();
                ImageData imageData = take.get();
                renderImage(imageData);
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}
Java并发编程-任务执行
https://mj3622.github.io/posts/学习笔记/java并发编程/任务执行/
Author
Minjer
Published at
2024-11-10