1942 words
10 minutes
Java并发编程-线程池的使用
NOTE

本篇笔记基于《Java并发编程实战》第8章 - 线程池的使用

1. 配置ThreadPoolExecutor#

ThreadPoolExecutorExecutor提供了一些基本的实现,但是在大多数情况下,我们都会使用自定义的线程池来完成任务,其通用构造函数如下所示:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

参数说明#

  1. corePoolSize :核心线程数,线程池中始终保持的线程数量。
  2. maximumPoolSize :最大线程数,线程池能够容纳的最大线程数量。
  3. keepAliveTime :非核心线程的存活时间,当线程池中的线程超过核心线程数时,多余的线程在空闲时会在此时间后被终止。
  4. unit :keepAliveTime 的时间单位,可以是 TimeUnit.SECONDS, TimeUnit.MILLISECONDS 等。
  5. workQueue :用于存放待执行任务的阻塞队列。
    • newFixedThreadPoolnewSingleThreadExecutor默认情况下使用无界的LinkedBlockingQueue,这就导致当任务快速抵达时,队列将无限制的增加。
    • 更稳妥的方法是使用有界的工作队列,例如ArrayBlockingQueue有界的LinkedBlockingQueuePriorityBlockingQueue
    • 对于非常庞大或无线的线程池,可以使用SynchronousQueue来避免任务排队,这并不是一个真正的队列,而是一种转交机制。若目前如果没有可用的线程处理任务且小于最大上限,线程池会创建新线程来处理。否则就会依据饱和策略直接拒绝这个任务。
  6. threadFactory :线程工厂,用于创建新线程的工厂。可以自定义线程的名称、优先级等属性。
    • 默认的线程工程方法将创建一个新的,非守护线程
    • 后文中介绍了如何自定义线程工厂
  7. handler :拒绝策略(饱和策略),用于处理当线程池和工作队列都满时的新任务。常用的拒绝策略包括:
    • AbortPolicy: 抛出异常,默认策略。
    • CallerRunsPolicy: 调用者运行,将任务回退到调用者。
    • DiscardPolicy: 丢弃任务。
    • DiscardOldestPolicy: 丢弃最旧的任务。
TIP

即使在构造完成之后,仍可以通过Setter来修改其参数

2. 拓展ThreadPoolExecutor#

2.1 自定义饱和策略阻塞execute#

当工作队列被填充满之后,并没有预定义的饱和策略来阻塞execute。但是,我们可以通过使用Semaphore信号量来实现这个功能。由于不能限制队列大小和任务到达率,因此此处采用了一个无界队列,并通过信号量来控制正在执行和等待执行的任务数量:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(() -> {
                try {
                    command.run();
                } finally {
                    semaphore.release();
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

2.2 自定义线程工厂#

想要自定义线程工程其实很简单,因为ThreadFactory中只定义了一个newThread,每当线程池需要创建一个新线程时就会调用这个。

public class MyThreadFactory implements ThreadFactory {
    private final String poolName;
    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }
    @Override
    public Thread newThread(Runnable r) {
        return new MyAppThread(r, poolName);
    }
}

MyAppThread中,我们还可以自定义其他行为,例如:为线程指定名字,设置自定义UncaughtExceptionhandlerLogger中写入信息,维护一些统计信息等等。

public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable r, String name) {
        super(r, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(
                (t, e) -> log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e));
    }

    @Override
    public void run() {
        // 复制debug标志以确保一致的值
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

3. 线程池的潜在问题#

3.1 线程饥饿死锁#

线程池中的线程饥饿死锁(Thread starvation deadlock)是一种常见的并发问题,通常发生在线程池使用不当的情况下。在这种情况下,由于线程池中的线程资源被耗尽,导致任务之间相互依赖而无法继续执行,从而陷入死锁状态。线程饥饿死锁通常出现的场景是:

  1. 线程池的线程数量有限 :比如线程池大小设置得过小,没有足够的线程来处理所有任务。
  2. 任务之间存在依赖关系 :线程池中的某些任务在执行时需要等待其他任务完成,而这些被依赖的任务需要新的线程来执行。
  3. 线程池中的线程被阻塞 :由于线程池的线程被占满,而被占用的线程可能在等待其他任务完成,从而导致任务无法继续执行。

这种情况下,任务之间形成了循环等待,从而导致死锁。

除了在线程池大小上的显示限制外,还可能由于其他资源上的约束而存在一些隐式限制。如何应用程序使用一个包含10个连接池的JDBC连接池,并且每个任务都需要一个数据库连接,那么线程池就被隐式限制在了10个线程。

3.2 运行时间较长的任务#

当任务阻塞时间过长时,那么即使不出现死锁,线程池的表现也会十分糟糕。执行较长时间的任务不仅会导致线程池堵塞,甚至还会增加执行时间较短的任务的服务时间。

因此,我们可以通过限定任务等待资源的时间来缓解长任务的影响。在平台类库的大多可阻塞方法中,都同时定义了限时版本和无限时版本。如果等待超时,我么可以把任务标识为失败,然后中止任务或者将任务重新放回队列。这样,我们能确保任务总能持续执行下去,并将线程释放出来以执行一些更快完成的任务。

4. 递归算法的并行化#

递归算法是解决许多问题的重要工具,例如分治法、回溯法等,但其通常是单线程运行的。为了提高效率,可以通过线程池将递归算法并行化,将任务分发到多个线程中,从而利用多核 CPU 的计算能力来加速执行。

递归算法的并行化适用于以下情况:

  1. 分治法:
    • 例如归并排序、快速排序、矩阵乘法等问题。
    • 这些问题可以通过划分子问题并行计算,然后合并结果。
  2. 搜索算法:
    • 例如棋盘问题、路径搜索等,可以将不同分支的搜索任务分配到不同线程执行。
  3. 耗时计算问题:
    • 对于计算密集型问题(如斐波那契数列)或大规模数据处理,并行化可以显著提升效率。

例:并行化计算斐波那契数列

传递递归方法实现,当 n 较大时,递归深度增加,效率低下

public class Fibonacci {
    public static int fibonacci(int n) {
        if (n <= 1) {
            return n;
        }
        return fibonacci(n - 1) + fibonacci(n - 2);
    }
}

通过线程池,我们可以将 F(n-1)F(n-2) 的计算分配到不同的线程中,从而实现并行化

public class ParallelFibonacci {
    private static final ExecutorService executor = Executors.newFixedThreadPool(4);

    public static int parallelFibonacci(int n) throws ExecutionException, InterruptedException {
        if (n <= 1) {
            return n;
        }

        Future<Integer> f1 = executor.submit(() -> parallelFibonacci(n - 1));
        Future<Integer> f2 = executor.submit(() -> parallelFibonacci(n - 2));

        return f1.get() + f2.get();
    }
}
Java并发编程-线程池的使用
https://mj3622.github.io/posts/学习笔记/java并发编程/线程池的使用/
Author
Minjer
Published at
2024-11-22