Java并发未解决问题汇总

Java并发未解决问题汇总

  1. 什么是再入锁?
  2. 并发实践讲了啥?
    • 基础,并发性,线程安全性,基础类库提供的并发能力构建线程安全类
    • 利用线程提高并发程序的性能
    • 测试
    • 底层原理

线程池

  1. Executors 提供的四个线程池的缺点:(尽管它们内部都是通过创建 ThreadPoolExecutor 来实现的,但是本身不利于资源的合理利用)
    1. Executors.newFixedThreadPool() 和 Executors.newSingleThreadExecutor()
      • 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
      • 因为创建的线程池使用的是 new LinkedBlockingQueue<Runnable>() 无界队列。
    2. Executors.newCachedThreadPool() 和 Executors.newScheduledThreadPool()
      • 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
      • 同时 newCachedThreadPool() 线程池的 corePoolSize 为 0,表示进来的任务进到队列后就必须等待到队列满,然后通过<maximumPoolSize 来不停的创建新线程。
  2. 阿里巴巴编码规约中提到,线程池不允许使用 Executors创建,应该通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,避免资源耗尽的风险。

TheadPoolExecutor 创建线程池

是线程池的核心实现。

参数解释:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,  
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

corePoolSize & maximumPoolSize

  • corePoolSize 核心线程数,

  • maximumPoolSize 最大线程数

关系如下:

  1. 当新任务被提交到线程池中,如果当前运行的线程数 < corePoolSize 核心线程数,即使当前有空闲线程,也会新建一个线程来处理新提交的任务;
  2. 如果当前运行的线程数 > corePoolSize 核心线程数,但是 < maximumPoolSize 最大线程数,只有当等待队列已满的情况下才会创建新线程。

keepAliveTime & unit

  • keepAliveTime 为超过 corePoolSize 线程数的线程的最大空闲时间。超过这个时间如果线程依然空闲,且数量超过 corePoolSize 的数量,空闲线程就会被销毁。
  • unit 为时间单位。提供了一个枚举类型。

等待队列

参数类型为 BlockingQueue 接口,任何阻塞队列(BlockingQueue) 都可以用来缓存提交的队列。

线程池的大小和阻塞队列,共同约束线程池的能力:任务提交给线程池后,各种处理策略如下。

  1. 如果运行的线程数 < corePoolSize 核心线程数,提交新的任务后就会新建一个线程来运行;
  2. 如果运行的线程数 > corePoolSize 核心线程数,新提交的任务就会进入队列,等待被执行;
  3. 如果队列已满,且运行线程数 < maximumPoolSize 最大线程数,则新建一个线程来运行该任务;
  4. 如果队列已满,且运行线程数 > maximumPoolSize 最大线程数,新提交的任务将会根据 拒绝策略 来处理。

线程池中的线程不是一开始就全都创建启动的,而是根据任务量开始慢慢增加 的,这就算是一种懒加载,到用的时候再创建线程,节省资源

三种通用的队列处理方式

  • SynchronousQueue –这个不太熟悉,查一下
  • LinkedBlockingQueue,属于无界队列
    • 特点,当所有核心线程都在运行时,新增任务会进入队列等待,因为是无界队列,所以不会有大于 corePoolSize 的线程被创建,此时 maximumPoolSize 参数失去了作用。
    • 缺点,如果新增任务速度 > 可处理速度,可能导致等待队列无线增长,耗费很多内存,甚至OOM。
    • 适合场景,可以使得瞬间爆发的高频请求变得平滑。
  • ArrayBlockingQueue,属于有界队列
    • 队列有最大值,可以防止资源被耗尽;
    • 难点,需要平衡队列的大小与线程数 –这点需要再查资料

无界队列与有界队列有多种实现方式,上面列举的仅是举例

拒绝策略

当线程池已经关闭,或者达到饱和(达到最大线程数,且队列已满)状态时,新提交的任务会被拒绝。

TheadPoolExecutor 定义了4中拒绝策略:

  • AbortPolicy , 默认策略,拒绝(丢弃)并抛出 RejectedExecutionException
  • CallerRunsPolicy , 直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,则任务被丢弃;
  • DiscardPolicy , 直接丢弃任务,不报错;
  • DiscardOldestPolicy , 丢弃队列中等待时间最长的任务,并执行当前提交的任务。如果线程池已经关闭,则丢弃任务。

还可以自定义拒绝策略,实现 RejectedExecutionHandler 接口即可。

代码测试:

实现特点及部分提问:

  1. 为什么线程池中,队列使用 Runnable 而不是 Thread ?

  2. Thread 中的 start() 与 run() 有什么区别?

  3. 此时线程数小于核心线程数,并且线程都处于空闲状态,现在提交一个任务,是新起一个线程还是给之前创建的线程运行?

    • 答:execute方法的注释中写道,If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. 从注释可以知道,此时线程池会新起一个线程来运行新任务,不管老线程是否空闲。
    • 扩展,从注释我们可以知道,虽然默认线程池是懒加载,但它实际是想快速拥有核心线程数的线程数量。
    • 核心线程数代表着线程池中承载日常工作的中坚力量,也就是线程池希望可以尽快创建出这些线程来处理任务,所以是在懒中又急着创建核心线程数。
    • 而最大线程数 maximumPoolSize , 其实是为了应对突发状况,突发问题解决后,就把多出来的线程回收掉,只维持核心线程数量的线程。
  4. 线程池中,核心线程和最大线程是否有特殊标记?

    • 答:没有,无论是核心线程还是非核心线程,在线程池中都一样,当淘汰的时候,不关心是哪些线程,只需要留下核心线程数量个线程即可。
  5. 你是怎样理解 KeepAliveTime 的?

    • 答:线程池只想要核心线程数个线程,maximumPoolSize 中对出来的是为了预留一些来应对突发状态的,当突发状态过去后,线程池只希望维持核心线程数的线程,所以有了 KeepAliveTime,当线程数大于核心线程数之后,如果线程空闲时间超过 KeepAliveTime,就回收线程,直到数量与核心线程数持平。
  6. workQueue 有什么作用?

    • 答:缓存任务作用,需要注意限制工作队列大小,太长任务等待就会变长,同时缓存过多 可能导致资源耗尽系统崩溃。具体队列长度,需要结合线程数,任务执行时长,能承受的等待时间等决定。
  7. 拒绝策略的作用?

    • 答:来应对过载任务的。线程池会遇到都满且队列也满的情况,这时就需要一种策略来处理 后序提交的任务。这里默认有四种策略,还可以自定义,具体看上面拒绝策略内容。
  8. ThreadPoolExecutor 源码中的 ctl 是干嘛的?

    • ctl 是一个涵盖了两个概念的原子整数类,它将工作线程数( workerCount )和线程池状态( runState),结合在一起维护,这样通过一个字段来维护多个值,形成一个原子操作,更容易维护多个值之间的一致性,也算是一种极简主义。(并发包中有很多这种实现)
      • 低 29 位存放 workerCount ,所以有效的线程数为 (2 ^ 29) - 1 (大约 5亿个)
      • 高 3 位存放 runState
  9. 线程池有几种状态?

    • 看一下源码注解即可,在 ctl字段的注释中。
    • RUNNING : 能接受新任务,并处理阻塞队列中的任务;
    • SHUTDOWN :不接受新任务,但是可以处理阻塞队列中的任务;
    • STOP :不接受新任务,也不处理阻塞队列中的任务,并且还打断正在运行任务的线程,抛异常,就是不干了;
    • TIDYING :所有任务都终止,且工作线程也为 0,处于关闭之间的状态;
    • TERMINATED :已关闭。
  10. 线程池的状态是如何变迁的?

    • ThreadPoolExecutor 源码的 ctl字段的注释中写的很清楚。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      * RUNNING -> SHUTDOWN
      * On invocation of shutdown(), perhaps implicitly in finalize()
      * (RUNNING or SHUTDOWN) -> STOP
      * On invocation of shutdownNow()
      * SHUTDOWN -> TIDYING
      * When both queue and pool are empty
      * STOP -> TIDYING
      * When pool is empty
      * TIDYING -> TERMINATED
      * When the terminated() hook method has completed

      问题,如何判断状态从 shutdown -> tidying ? 通过判断 workCount 是否为0 。

    • 可以画个图

  11. 原生线程池的核心线程一定伴随着任务慢慢创建的吗?

    • 并不是,虽然线程池的默认策略是执行新任务时才启动一个核心线程,但是它也提供了两个预启动的方法来覆盖默认策略。创建线程池后可以调用这两个方法。
    • prestartCoreThread 当前线程数小于核心线程数时创建一个
    • prestartAllCoreThreads 创建所有核心线程
  12. 为什么要把任务先放在任务队列里面,而不是把线程先拉满到最大线程数?

    • 答:
      1. 首先我个人觉得没有为什么,就是这样设计的,而且设计的也很合理。
      2. 其次,从ctl 上面的注释可知,原线程池的设计本意是希望让核心线程工作,最大线程数是一个应急方案,新建线程帮助消化过多的任务。所以你可以把线程池的 corePoolSize 设置为你想要线程池工作的线程数,任务队列 workQueue 起到一个缓冲的作用。
      3. 如果一定要扯一扯 CPU 密集型和 I/O 密集型,原版线程池的实现可以认为是偏向 CPU 密集型的,我们知道处理 CPU 密集型任务时,线程太多反而会由于线程频繁切换的开销得不偿失,所以优先堆积任务而不是创建新线程。原版实现也是如此,当任务过多时,先通过任务队列缓存任务,让核心线程去消化。只有当核心线程和队列都满了,才利用最大线程来辅助消化。
      4. 对于像 Tomcat 这样的业务场景,大部分情况下是需要大量 I/O 处理的情况就做了一些定制,修改了 jdk 中原生线程池的实现,使得在队列没满的时候,可以创建线程至最大线程数。
  13. 如何修改原生线程池,可以先拉满最大线程数再入任务队列排队?

    答:关键点在队列的 offer 方法。

    1
    2
    3
    4
    5
    6
    public void execute(Runnable command) {
    ...
    if (isRunning(c) && workQueue.offer(command)) {
    ...
    }
    else if (!addWorker(command, false))

    从上面的 execute 方法(给线程池提交任务的方法)我们可以知道,在这个方法中只要在 offer 方法内部判断此时线程数小于最大线程数的时候返回 false,即可走下面 else if 中的 addWorker() (新增线程)的逻辑,如果数量已经达到最大线程数,直接入队即可。具体可以看一下 Tomcat 中的定制线程。

不懂的地方:

1. runWorker() 方法中是获取 Runnable task 执行其 run() 方法

答: 根据 Thread 的 start() 和 run() 的关系可知,一个线程调用 start() 即启动后,只要 run() 方法不执行结束,线程就不会终止(遇到异常或调用 stop() 方法主动停止除外)。那么复用线程的过程就是在其 run() 方法中执行循环的过程。所以这也是为什么 runWorker() 方法中是一个 while 循环获取并执行 Runnable task.run() 的过程,这里显示调用 run() 方法其实就是在线程中进行普通的方法调用。

2. 为什么 Integer 的低 29 位存放 workerCount ,有效的线程数为 (2 ^ 29) - 1 ?

答:首先 Java 中的 Integer 是4个字节,32bit。低29位能表示的最大值就是29个1,同时我们知道最低位上 1 = 2^0,那么 2^29其实表示二进制中第30位为1,其他低位为0的情况,再减去1就表示低29位全为1的情况了,即(2^29) -1 。

涉及知识点:

1. 为什么线程 Thread 要先调用 start() 再调用 run() ,而不是直接调用 run() ?

也是问 Thead 中 start() 方法与 run() 方法的关系。

答:

结论:因为 JVM 内存机制规定

  1. run() 就是 Thread 中的一个普通的方法,直接调用 run() 是在当前线程中执行而不是新增线程,程序执行路径还是只有一条,需要等 run() 执行完才能执行后面的内容,无法达到多线程执行的目的;
  2. start() 是启动 Thread 的方法,执行后,JVM 会新建一个线程,然后由这个新线程调用 run() ,这样 run() 就不在主线程中执行了。此时程序主线程无需等待 run() 方法执行完毕,而直接执行后面的代码。run() 在 Thread 中称为线程体,它包含了线程要执行的内容,通过调用 Thread 的 start() 启动线程后,线程处于就绪状态(可运行),并没有执行,在排队等待获取 CPU 时间片,就执行 run() 。run() 执行结束后,此线程即终止。(或者调用 stop() ,线程也会终止。)(在 main 方法中执行时主线程为 main)。

Java 线程的说明:

  • Java 的线程是通过 java.lang.Thread 类来实现的。VM 启动时会有一个由主方法所定义的线程。可以通过创建Thread的实例来创建新的线程。每个线程都是通过某个特定Thread对象所对应的方法run() 来完成其操作的,方法run()称为线程体。通过调用Thread类的 start() 方法来启动一个线程。

实现并启动线程的方法,2个:

  • 方法1,写一个类继承自 Thread 类,重写 run() 方法。创建该类实例后,调用 start() 方法启动线程;
  • 方法2,写一个类实现Runnable接口,实现 run() 方法。用 new Thread(Runnable target).start() 方法来启动;

Java 中 Thread 的 start() 中的基本流程:

1
2
3
4
5
6
7
8
9
10
11
Thread#start() --> native start0() --> JVM_StartThread --> thread_entry --> Thread#run()


说明:
start方法中调用了本地方法start0,native即是本地方法(也是底层方法)。怎么看start0方法,可以去访问openjdk源码。
start0映射的方法就是JVM_StartThread,再看一下jvm.cpp文件中JVM_StartThread方法,里面有段代码
native_thread = new JavaThread(&thread_entry,sz);

再看 thread_entry 方法
JavaCalls::call_virtual(&result,obj,KlassHandle(THREAD, SystemDictionary::Thread_klass()),vmSymbols::run_method_name(),vmSymbols::void_mehtod_signature(),THREAD);
// 这里 vmSymbols::run_method_name() 就是调用Thread 的 run() 方法。

遇到问题:

看一下 TheadPoolExecutor 是如何处理阻塞队列中的任务的?如何复用线程的?

  • run() 方法中循环调用提交的 run()方法,只要线程自身的 run() 方法不执行完,该线程就不会终止

20210827

下面使用 IntStream.forEach() 代码写的有问题,列出来纠正一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    IntStream.rangeClosed(1, 8).forEach(
(i) -> {
try {
poolExecutor.execute(() -> {
System.out.println("我是线程: " + i);

// 模拟线程执行时间,10s
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

});
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// TODO(SLi): 不能在for循环中添加这种 线程池的shutdown 方法。
// finally {
// poolExecutor.shutdown();
// }
}
);

原因:因为上面的代码写的有问题,try...finally... 代码块在forEach() 方法体中,即在for循环中,每次循环都将走完方法体,应该把finally部分挪到forEach的外面才对。
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022-2023 ligongzhao
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信