2646 words
13 minutes
Java并发编程-取消与关闭
NOTE

本篇笔记基于《Java并发编程实战》第7章 - 取消与关闭

1. 任务取消#

如果一段代码能够在某个操作完成之前将其变为”完成“状态,那么这个操作就被称为可取消的。例如GUI中的取消按钮,对于计时任务的超时处理等等。但是在Java中,并没有一种安全的抢占式方式来停止线程,而只有一些协作式的机制。例如在下面这段代码中,就介绍了通过设置”已请求取消“标志,来使任务提前结束。在这段代码中,当我们需要停止这个任务时,只需要在外部调用cancel()方法。但是假如在循环中调用了某些阻塞方法,很可能导致任务永远不会检查标志位,导致其无法结束。

public class Example1 implements Runnable{
    // 为了确保可靠性,标志必须为volatile
    private volatile boolean cancelled = false;
    @Override
    public void run() {
        while (!cancelled){
            doSomething();
        }
    }
    public void cancel(){
        cancelled = true;
    }
}

中断#

为了应对这种情况,我们可以采用线程中断的方式来取消任务。在第五章中曾介绍过,线程中断是一种协作机制,可以通知另一个线程在合适的情况下停止当前的工作。在每个线程中,都有一个boolean类型的中断状态,当中断线程时,这个状态将被设置为true

Thread中,设置中断与查询中断状态的方法如下所示:

  • Thread.interrupt():用于请求中断一个线程的执行。调用该方法并不会立即停止线程,而是设置线程的中断标志。
  • Thread.interrupted():静态方法,检测当前线程是否已经被中断,并且在调用后会清除该线程的中断标志
  • Thread.isInterrupted():检测指定线程的中断标志是否被设置为 true,但不会清除该线程的中断标志。

那么借助中断,我们就可以在调用了阻塞方法时,仍然实现任务的取消。在这个过程中,首先主线程调用interrupt()方法,向目标线程发送中断信号,设置其中断标志。当目标线程在阻塞时收到中断信号,会抛出InterruptedException,同时清除中断标志。最后进入处理环节,此处选择打印提示信息并继续执行后续代码,当然也可以根据实际情况,选择直接结束任务。

public static void main(String[] args) {
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(4);
    AtomicInteger i = new AtomicInteger(0);
    Thread thread = new Thread(() -> {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                queue.put(i.getAndIncrement());
                System.out.println("Produced: " + i.get());
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        System.out.println("Finish");
    });
    thread.start();

    // 等待队列填充

    thread.interrupt();
}

输出结果如下:

Produced: 1
Produced: 2
Produced: 3
Produced: 4
Interrupted
Finish

响应中断#

当面对InterruptedException时,我们有两种常用的策略进行处理:

  • 传递异常: 只需借助throws将异常抛给调用者,这样该方法也会变成可中断的阻塞方法

  • 恢复中断状态: 如果不想或无法传递异常时,需要通过另一种方法保存中断请求。标准的做法是再次调用interrupt()方法恢复中断状态。如果不恢复中断状态,外部的代码无法得知线程是否已被中断。在这种情况下,应该在返回前恢复中断而非捕获时恢复。如果过早的设置中断状态,可能会导致无限循环,因为大多数的阻塞方法都会在入口检查中断状态。例如下面这个例子:

    public Task getNextTask(BlockingQueue<Task> queue) {
        Boolean interrupted = false;
        try {
            while (true) {
                try {
                    return queue.take();
                } catch (InterruptedException e) {
                    interrupted = true;
                    // 重新尝试
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
    

2.停止基于线程的服务#

应用程序往往会创建有着多个线程的服务,由于无法通过抢占式的方式来停止线程,因此仍然需要有合适的方式来关闭拥有线程的服务。

2.1 示例:日志服务#

在下面的代码中,给出了一个简单的日志服务示例。在这个服务中,日志信息通过BlockingQueue提交给专门的日志线程,并由日志线程完成写入操作。但是想要将这个日志服务投入使用,还需要实现一种终止日志服务的方法,从而避免使JVM无法正常关闭。如果只是停止服务,那么使用中断就可以做到,关键在于这样直接关闭会丢失那些待写入的日志信息。此外,其他线程再调用log时还会被阻塞,因为队列是满的。因此我们可以得知,在取消生产者-消费者操作时,要同时取消生产者和消费者。但是在这个示例中,生产者并非单独的线程,想要取消十分困难。

另一种关闭LogWriter的方式则是设置”已请求关闭“的标识,以避免进一步向log提交消息。但是这本质上还是一种先判断后运行的代码序列,仍然有可能会发生故障。

public class LogWriter{
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    
    public LogWriter(Writer writer){
        this.queue = new LinkedBlockingQueue<>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }
    
    public void start(){
        logger.start();
    }
    
    public void log(String msg) throws InterruptedException{
        queue.put(msg);
    }
    
    private class LoggerThread extends Thread{
        private final PrintWriter writer;
        
        public LoggerThread(Writer writer){
            this.writer = new PrintWriter(writer);
        }
        
        public void run(){
            try{
                while(true){
                    writer.println(queue.take());
                }
            }catch(InterruptedException ignored){
            }finally{
                writer.close();
            }
        }
    }
}

LogWriter提供可靠关闭操作的方法是解决竞态条件,因而要使日志消息的提交成为原子操作。然而,我们不希望消息加入队列时持有一个锁,因为put()方法本身就是阻塞的。因此,我们可以通过原子的方式来检查关闭请求,并且有条件地递增一个计数器来”保持“提交消息的权力。

public class LogService{
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    private boolean isShutdown;
    private int reservations;

    public LogService(Writer writer){
        this.queue = new LinkedBlockingQueue<>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start(){
        loggerThread.start();
    }

    public void stop(){
        synchronized (this){
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this){
            if(isShutdown){
                throw new IllegalStateException("Logger is shut down");
            }
            ++reservations;
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread{
        public void run(){
            try{
                while(true){
                    try{
                        synchronized (LogService.this){
                            if(isShutdown && reservations == 0){
                                break;
                            }
                        }
                        String msg = queue.take();
                        synchronized (LogService.this){
                            --reservations;
                        }
                        writer.println(msg);
                    }catch (InterruptedException e){
                        // retry
                    }
                }
            }finally {
                writer.close();
            }
        }
    }
}

2.2 “毒丸”对象#

另一种关闭生产者-消费者服务的方式就是使用“毒丸“对象。”毒丸”是指一个放在队列上的对象,其含义是:当得到这个对象后,立即停止。在FIFO队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,而生产者提供了“毒丸”之后,将不会再提交任何新的工作。下面一份代码介绍了如何在生产者与消费者中使用”毒丸“对象:

// 消费者线程
class Consumer implements Runnable {
    private final BlockingQueue<Object> queue;

    public Consumer(BlockingQueue<Object> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Object item = queue.take();
                if (item == PoisonPill.INSTANCE) {
                    System.out.println("Consumer received Poison Pill. Exiting...");
                    break;
                }
                // 处理普通任务
                System.out.println("Consumed: " + item);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Consumer interrupted.");
        }
    }
}

// 生产者线程
class Producer implements Runnable {
    private final BlockingQueue<Object> queue;

    public Producer(BlockingQueue<Object> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println("Produced: " + i);
                queue.put(i); // 生产任务
            }
            queue.put(PoisonPill.INSTANCE); // 生产毒丸
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Producer interrupted.");
        }
    }
}

2.3 关闭ExecutorService#

ExecutorService提供了两种主要的关闭方法:shutdown()shutdownNow()。调用shutdown()后,线程池将不再接受新任务,但会继续执行已提交的任务,包括在队列中等待的任务。而调用shutdownNow()后,线程池会尝试停止所有正在执行的任务,并返回未开始执行的任务列表。

虽然shutdownNow()会返回所有未开始执行的任务列表,但是对于那些已经开始执行但未完成的任务,我们就无法再获取到。因此,我们需要对ExecutorService进行进一步的封装,以确保我们能够获得到那些开始了但未完成的任务。如下面代码所示,通过结合任务返回时的线程中断状态,我们就可以得知该任务是否是在进行过程中被取消:

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());
    
    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated()) {
            throw new IllegalStateException("Not terminated");
        }
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }
    
    public void execute(final Runnable runnable) {
        exec.execute(() -> {
            try {
                runnable.run();
            } finally {
                if (isShutdown() && Thread.currentThread().isInterrupted()) {
                    tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

3.JVM关闭#

3.1 关闭钩子(Shutdown Hook)#

关闭钩子是一种特殊的线程,当JVM正常关闭 时会被调用,用于在JVM退出前执行一些清理工作,例如释放资源、保存状态等。可以通过 Runtime.getRuntime().addShutdownHook(Thread hook) 方法向JVM注册一个关闭钩子线程。

  • 关闭钩子的执行顺序不保证。
  • 应避免在关闭钩子中执行耗时操作,否则可能延长关闭时间。
  • 多个钩子线程可能同时运行,因此需要处理线程安全问题。

下面是一份使用示例:

public class ShutdownHookExample {
    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutdown Hook is running...");
            // 清理资源的逻辑
        }));

        System.out.println("Application is running...");
        try {
            Thread.sleep(3000); // 模拟运行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Exiting application...");
    }
}

3.2 守护线程(Daemon Thread)#

当我们希望创建一个线程来执行一些辅助性工作时,但又不希望这个线程阻碍JVM关闭,此时就要用到守护线程。当所有非守护线程退出后,JVM会自动结束所有守护线程并关闭。在JVM启动时创建的所有线程中,除了主线程外,其他的线程都是守护线程(如JVM的垃圾回收线程和后台任务调度器)。

  • 守护线程的生命周期: 与JVM生命周期绑定。当所有用户线程结束时,守护线程立即终止,无论是否完成任务。
  • 慎用守护线程: 如果守护线程中有未完成的重要任务(如文件写入、数据库操作),在用户线程结束后这些任务可能无法完成。
TIP

当创建一个新线程时,新线程将继承创建它的线程的守护状态。因此在默认情况下,主线程创建的所有线程都是普通线程。

Java并发编程-取消与关闭
https://mj3622.github.io/posts/学习笔记/java并发编程/取消与关闭/
Author
Minjer
Published at
2024-11-16