NOTE本篇笔记基于《Java并发编程实战》第7章 - 取消与关闭
1. 任务取消
如果一段代码能够在某个操作完成之前将其变为”完成“状态,那么这个操作就被称为可取消的。例如GUI中的取消按钮,对于计时任务的超时处理等等。但是在Java中,并没有一种安全的抢占式方式来停止线程,而只有一些协作式的机制。例如在下面这段代码中,就介绍了通过设置”已请求取消“标志,来使任务提前结束。在这段代码中,当我们需要停止这个任务时,只需要在外部调用cancel()
方法。但是假如在循环中调用了某些阻塞方法,很可能导致任务永远不会检查标志位,导致其无法结束。
public class Example1 implements Runnable{
// 为了确保可靠性,标志必须为volatile
private volatile boolean cancelled = false;
@Override
public void run() {
while (!cancelled){
doSomething();
}
}
public void cancel(){
cancelled = true;
}
}
中断
为了应对这种情况,我们可以采用线程中断的方式来取消任务。在第五章中曾介绍过,线程中断是一种协作机制,可以通知另一个线程在合适的情况下停止当前的工作。在每个线程中,都有一个boolean
类型的中断状态,当中断线程时,这个状态将被设置为true
。
在Thread
中,设置中断与查询中断状态的方法如下所示:
Thread.interrupt()
:用于请求中断一个线程的执行。调用该方法并不会立即停止线程,而是设置线程的中断标志。Thread.interrupted()
:静态方法,检测当前线程是否已经被中断,并且在调用后会清除该线程的中断标志 。Thread.isInterrupted()
:检测指定线程的中断标志是否被设置为true
,但不会清除该线程的中断标志。
那么借助中断,我们就可以在调用了阻塞方法时,仍然实现任务的取消。在这个过程中,首先主线程调用interrupt()
方法,向目标线程发送中断信号,设置其中断标志。当目标线程在阻塞时收到中断信号,会抛出InterruptedException
,同时清除中断标志。最后进入处理环节,此处选择打印提示信息并继续执行后续代码,当然也可以根据实际情况,选择直接结束任务。
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(4);
AtomicInteger i = new AtomicInteger(0);
Thread thread = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
queue.put(i.getAndIncrement());
System.out.println("Produced: " + i.get());
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println("Finish");
});
thread.start();
// 等待队列填充
thread.interrupt();
}
输出结果如下:
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Interrupted
Finish
响应中断
当面对InterruptedException
时,我们有两种常用的策略进行处理:
传递异常: 只需借助
throws
将异常抛给调用者,这样该方法也会变成可中断的阻塞方法恢复中断状态: 如果不想或无法传递异常时,需要通过另一种方法保存中断请求。标准的做法是再次调用
interrupt()
方法恢复中断状态。如果不恢复中断状态,外部的代码无法得知线程是否已被中断。在这种情况下,应该在返回前恢复中断而非捕获时恢复。如果过早的设置中断状态,可能会导致无限循环,因为大多数的阻塞方法都会在入口检查中断状态。例如下面这个例子:public Task getNextTask(BlockingQueue<Task> queue) { Boolean interrupted = false; try { while (true) { try { return queue.take(); } catch (InterruptedException e) { interrupted = true; // 重新尝试 } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
2.停止基于线程的服务
应用程序往往会创建有着多个线程的服务,由于无法通过抢占式的方式来停止线程,因此仍然需要有合适的方式来关闭拥有线程的服务。
2.1 示例:日志服务
在下面的代码中,给出了一个简单的日志服务示例。在这个服务中,日志信息通过BlockingQueue
提交给专门的日志线程,并由日志线程完成写入操作。但是想要将这个日志服务投入使用,还需要实现一种终止日志服务的方法,从而避免使JVM无法正常关闭。如果只是停止服务,那么使用中断就可以做到,关键在于这样直接关闭会丢失那些待写入的日志信息。此外,其他线程再调用log时还会被阻塞,因为队列是满的。因此我们可以得知,在取消生产者-消费者操作时,要同时取消生产者和消费者。但是在这个示例中,生产者并非单独的线程,想要取消十分困难。
另一种关闭LogWriter
的方式则是设置”已请求关闭“的标识,以避免进一步向log提交消息。但是这本质上还是一种先判断后运行的代码序列,仍然有可能会发生故障。
public class LogWriter{
private final BlockingQueue<String> queue;
private final LoggerThread logger;
public LogWriter(Writer writer){
this.queue = new LinkedBlockingQueue<>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start(){
logger.start();
}
public void log(String msg) throws InterruptedException{
queue.put(msg);
}
private class LoggerThread extends Thread{
private final PrintWriter writer;
public LoggerThread(Writer writer){
this.writer = new PrintWriter(writer);
}
public void run(){
try{
while(true){
writer.println(queue.take());
}
}catch(InterruptedException ignored){
}finally{
writer.close();
}
}
}
}
为LogWriter
提供可靠关闭操作的方法是解决竞态条件,因而要使日志消息的提交成为原子操作。然而,我们不希望消息加入队列时持有一个锁,因为put()
方法本身就是阻塞的。因此,我们可以通过原子的方式来检查关闭请求,并且有条件地递增一个计数器来”保持“提交消息的权力。
public class LogService{
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
private boolean isShutdown;
private int reservations;
public LogService(Writer writer){
this.queue = new LinkedBlockingQueue<>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start(){
loggerThread.start();
}
public void stop(){
synchronized (this){
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this){
if(isShutdown){
throw new IllegalStateException("Logger is shut down");
}
++reservations;
}
queue.put(msg);
}
private class LoggerThread extends Thread{
public void run(){
try{
while(true){
try{
synchronized (LogService.this){
if(isShutdown && reservations == 0){
break;
}
}
String msg = queue.take();
synchronized (LogService.this){
--reservations;
}
writer.println(msg);
}catch (InterruptedException e){
// retry
}
}
}finally {
writer.close();
}
}
}
}
2.2 “毒丸”对象
另一种关闭生产者-消费者服务的方式就是使用“毒丸“对象。”毒丸”是指一个放在队列上的对象,其含义是:当得到这个对象后,立即停止。在FIFO队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,而生产者提供了“毒丸”之后,将不会再提交任何新的工作。下面一份代码介绍了如何在生产者与消费者中使用”毒丸“对象:
// 消费者线程
class Consumer implements Runnable {
private final BlockingQueue<Object> queue;
public Consumer(BlockingQueue<Object> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Object item = queue.take();
if (item == PoisonPill.INSTANCE) {
System.out.println("Consumer received Poison Pill. Exiting...");
break;
}
// 处理普通任务
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Consumer interrupted.");
}
}
}
// 生产者线程
class Producer implements Runnable {
private final BlockingQueue<Object> queue;
public Producer(BlockingQueue<Object> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Produced: " + i);
queue.put(i); // 生产任务
}
queue.put(PoisonPill.INSTANCE); // 生产毒丸
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Producer interrupted.");
}
}
}
2.3 关闭ExecutorService
ExecutorService
提供了两种主要的关闭方法:shutdown()
和shutdownNow()
。调用shutdown()
后,线程池将不再接受新任务,但会继续执行已提交的任务,包括在队列中等待的任务。而调用shutdownNow()
后,线程池会尝试停止所有正在执行的任务,并返回未开始执行的任务列表。
虽然shutdownNow()
会返回所有未开始执行的任务列表,但是对于那些已经开始执行但未完成的任务,我们就无法再获取到。因此,我们需要对ExecutorService
进行进一步的封装,以确保我们能够获得到那些开始了但未完成的任务。如下面代码所示,通过结合任务返回时的线程中断状态,我们就可以得知该任务是否是在进行过程中被取消:
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated()) {
throw new IllegalStateException("Not terminated");
}
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(() -> {
try {
runnable.run();
} finally {
if (isShutdown() && Thread.currentThread().isInterrupted()) {
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
3.JVM关闭
3.1 关闭钩子(Shutdown Hook)
关闭钩子是一种特殊的线程,当JVM正常关闭 时会被调用,用于在JVM退出前执行一些清理工作,例如释放资源、保存状态等。可以通过 Runtime.getRuntime().addShutdownHook(Thread hook)
方法向JVM注册一个关闭钩子线程。
- 关闭钩子的执行顺序不保证。
- 应避免在关闭钩子中执行耗时操作,否则可能延长关闭时间。
- 多个钩子线程可能同时运行,因此需要处理线程安全问题。
下面是一份使用示例:
public class ShutdownHookExample {
public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown Hook is running...");
// 清理资源的逻辑
}));
System.out.println("Application is running...");
try {
Thread.sleep(3000); // 模拟运行
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Exiting application...");
}
}
3.2 守护线程(Daemon Thread)
当我们希望创建一个线程来执行一些辅助性工作时,但又不希望这个线程阻碍JVM关闭,此时就要用到守护线程。当所有非守护线程退出后,JVM会自动结束所有守护线程并关闭。在JVM启动时创建的所有线程中,除了主线程外,其他的线程都是守护线程(如JVM的垃圾回收线程和后台任务调度器)。
- 守护线程的生命周期: 与JVM生命周期绑定。当所有用户线程结束时,守护线程立即终止,无论是否完成任务。
- 慎用守护线程: 如果守护线程中有未完成的重要任务(如文件写入、数据库操作),在用户线程结束后这些任务可能无法完成。
TIP当创建一个新线程时,新线程将继承创建它的线程的守护状态。因此在默认情况下,主线程创建的所有线程都是普通线程。