Jtoss Jtoss
首页
  • 数据结构与算法

    • 数据结构与算法 - 概述
    • 数据结构与算法 - 复杂度分析
    • 数据结构 - 线性表
    • 算法 - 常见排序算法
  • 代码规范

    • 代码简洁之道
    • 阿里巴巴开发手册
    • 谷歌Java编程风格指南
  • 设计模式

    • 编写高质量代码概述
    • 面向对象
    • 设计原则
    • 设计模式-创建型
    • 设计模式-结构型
    • 设计模式-行为型(上)
    • 设计模式-行为型(下)
    • 浅析框架源码中的设计模式
    • 业务框架实战案例
  • MySQL 基础

    • MySQL - 数据库设计规范
    • MySQL - 必知必会
  • MySQL 进阶

    • MySQL - 基础架构
    • MySQL - InnoDB存储引擎
    • MySQL - InnoDB缓冲池
    • MySQL - 事务与锁
    • MySQL - 索引
    • MySQL - 查询执行计划
    • MySQL - 性能优化
  • Redis 系列

    • Redis入门 - 基础相关
    • Redis进阶 - 数据结构
    • Redis进阶 - 持久化RDB和AOF
    • Redis进阶 - 事件机制
    • Redis进阶 - 事务
    • Redis进阶 - 高可用高可扩展
    • Redis进阶 - 缓存问题
    • Redis进阶 - 性能调优
  • Java 基础

    • Java 基础 - 知识点
    • Java 基础 - 面向对象
    • Java 基础 - Q/A
  • Java 进阶 - 集合框架

    • Java 集合框架详解
  • Java 进阶 - 多线程与并发

    • Java 并发 - 理论基础
    • Java 并发 - 线程基础
    • Java 并发 - 各种锁
    • Java 并发 - 关键字 volatile
    • Java 并发 - 关键字 synchronized
    • JUC - CAS与原子操作
    • JUC - 锁核心类AQS
    • JUC - 锁接口和类简介
    • JUC - 并发容器简介
    • JUC - 通信工具类
    • JUC - Fork-Join框架
    • JUC - 线程池
  • Java 进阶 - JVM

    • JVM - 概述
    • JVM - 类加载机制
    • JVM - 内存结构
    • JVM - 垃圾回收机制
    • JVM - 性能调优
  • Maven系列

    • Maven基础知识
    • Maven项目构建
    • Maven多模块配置
  • Spring 框架

    • Spring 框架 - 框架介绍
    • Spring 框架 - IOC详解
    • Spring 框架 - AOP详解
    • Spring 框架 - SpringMVC详解
  • Spring Boot 系列

    • Spring Boot - 开发入门
    • Spring Boot - 接口相关
  • Spring Cloud 系列
  • Mybatis 系列

    • Mybatis - 总体框架设计
    • Mybatis - 初始化基本过程
    • Mybatis - sqlSession执行过程
    • Mybatis - 插件机制
    • Mybatis - 事务管理机制
    • Mybatis - 缓存机制
  • 业务常见问题

    • Java 业务开发常见错误(一)
    • Java 业务开发常见错误(二)
    • Java 业务开发常见错误(三)
    • Java 业务开发常见错误(四)
    • Java 业务开发常见错误(五)
    • Java 业务开发常见错误(六)
  • IDEA系列

    • IDEA 2021开发环境配置
    • IDEA 快捷键
  • Git系列

    • git status中文乱码
  • 其他

    • Typora+Picgo 自动上传图片
    • hsdis 和 jitwatch
  • 实用技巧
  • 收藏
  • 摄影
  • 学习
  • 标签
  • 归档

Jason Huang

后端程序猿
首页
  • 数据结构与算法

    • 数据结构与算法 - 概述
    • 数据结构与算法 - 复杂度分析
    • 数据结构 - 线性表
    • 算法 - 常见排序算法
  • 代码规范

    • 代码简洁之道
    • 阿里巴巴开发手册
    • 谷歌Java编程风格指南
  • 设计模式

    • 编写高质量代码概述
    • 面向对象
    • 设计原则
    • 设计模式-创建型
    • 设计模式-结构型
    • 设计模式-行为型(上)
    • 设计模式-行为型(下)
    • 浅析框架源码中的设计模式
    • 业务框架实战案例
  • MySQL 基础

    • MySQL - 数据库设计规范
    • MySQL - 必知必会
  • MySQL 进阶

    • MySQL - 基础架构
    • MySQL - InnoDB存储引擎
    • MySQL - InnoDB缓冲池
    • MySQL - 事务与锁
    • MySQL - 索引
    • MySQL - 查询执行计划
    • MySQL - 性能优化
  • Redis 系列

    • Redis入门 - 基础相关
    • Redis进阶 - 数据结构
    • Redis进阶 - 持久化RDB和AOF
    • Redis进阶 - 事件机制
    • Redis进阶 - 事务
    • Redis进阶 - 高可用高可扩展
    • Redis进阶 - 缓存问题
    • Redis进阶 - 性能调优
  • Java 基础

    • Java 基础 - 知识点
    • Java 基础 - 面向对象
    • Java 基础 - Q/A
  • Java 进阶 - 集合框架

    • Java 集合框架详解
  • Java 进阶 - 多线程与并发

    • Java 并发 - 理论基础
    • Java 并发 - 线程基础
    • Java 并发 - 各种锁
    • Java 并发 - 关键字 volatile
    • Java 并发 - 关键字 synchronized
    • JUC - CAS与原子操作
    • JUC - 锁核心类AQS
    • JUC - 锁接口和类简介
    • JUC - 并发容器简介
    • JUC - 通信工具类
    • JUC - Fork-Join框架
    • JUC - 线程池
  • Java 进阶 - JVM

    • JVM - 概述
    • JVM - 类加载机制
    • JVM - 内存结构
    • JVM - 垃圾回收机制
    • JVM - 性能调优
  • Maven系列

    • Maven基础知识
    • Maven项目构建
    • Maven多模块配置
  • Spring 框架

    • Spring 框架 - 框架介绍
    • Spring 框架 - IOC详解
    • Spring 框架 - AOP详解
    • Spring 框架 - SpringMVC详解
  • Spring Boot 系列

    • Spring Boot - 开发入门
    • Spring Boot - 接口相关
  • Spring Cloud 系列
  • Mybatis 系列

    • Mybatis - 总体框架设计
    • Mybatis - 初始化基本过程
    • Mybatis - sqlSession执行过程
    • Mybatis - 插件机制
    • Mybatis - 事务管理机制
    • Mybatis - 缓存机制
  • 业务常见问题

    • Java 业务开发常见错误(一)
    • Java 业务开发常见错误(二)
    • Java 业务开发常见错误(三)
    • Java 业务开发常见错误(四)
    • Java 业务开发常见错误(五)
    • Java 业务开发常见错误(六)
  • IDEA系列

    • IDEA 2021开发环境配置
    • IDEA 快捷键
  • Git系列

    • git status中文乱码
  • 其他

    • Typora+Picgo 自动上传图片
    • hsdis 和 jitwatch
  • 实用技巧
  • 收藏
  • 摄影
  • 学习
  • 标签
  • 归档
  • Java 基础

  • Java 进阶 - 集合框架

  • Java 进阶 - 多线程与并发

    • Java 并发 - 概述
    • Java 并发 - 理论基础
    • Java 并发 - 线程基础
    • Java 并发 - 各种锁
    • Java 并发 - JVM 锁优化
    • Java 并发 - 关键字 volatile
    • Java 并发 - 关键字 synchronized
    • Java 并发 - syschronized 应用及死锁问题
    • Java 并发 - 关键字 final
    • JUC - CAS与原子操作
    • JUC - 锁核心类AQS
    • JUC - 锁接口和类简介
    • JUC - 并发容器简介
    • JUC - 阻塞队列
      • 1. 阻塞队列的由来
      • 2. BlockingQueue 的操作方法
      • 3. BlockingQueue 的实现类
        • 3.1 ArrayBlockingQueue
        • 3.2 LinkedBlockingQueue
        • 3.3 DelayQueue
        • 3.4 PriorityBlockingQueue
        • 3.5 SynchronousQueue
      • 4. 阻塞队列的原理
      • 5. 示例和使用场景
        • 5.1 生产者-消费者模型
        • 5.2 线程池中使用阻塞队列
      • 参考
    • JUC - 通信工具类
    • JUC - Fork/Join框架
    • JUC - Stream并行计算原理
    • JUC - 线程池
  • Java 进阶 - JVM

  • Java 进阶 - 版本特性

  • Java
  • Java 进阶 - 多线程与并发
Jason
目录

JUC - 阻塞队列

# JUC - 阻塞队列

# 1. 阻塞队列的由来

我们假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的生产者-消费者模式。

该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。

我们自己 coding 实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。(这块不明白的同学,可以看最下方结语部分的链接)

这么容易出错的事情,JDK 当然帮我们做啦,这就是阻塞队列 (BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。

BlockingQueue 是 Java util.concurrent 包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。

BlockingQueue 一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。BlockingQueue 就是存放元素的容器。

# 2. BlockingQueue 的操作方法

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() - -
  • 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException 异常 。
  • 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是 true / false。
  • 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
  • 超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

注意之处

  • 不能往阻塞队列中插入 null,会抛出空指针异常。
  • 可以访问阻塞队列中的任意元素,调用 remove(o) 可以将队列之中的特定对象移除,但并不高效,尽量避免使用。

# 3. BlockingQueue 的实现类

# 3.1 ArrayBlockingQueue

由数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。

public ArrayBlockingQueue(int capacity, boolean fair){
    //..省略代码
}
1
2
3

可以初始化队列大小, 且一旦初始化不能改变。构造方法中的fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁。

# 3.2 LinkedBlockingQueue

由链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是Integer.MAX_VALUE,也可以指定大小。此队列按照先进先出的原则对元素进行排序。

# 3.3 DelayQueue

该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

# 3.4 PriorityBlockingQueue

基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来决定),内部控制线程同步的锁采用的是非公平锁。

网上大部分博客上 PriorityBlockingQueue 为公平锁,其实是不对的,查阅源码(感谢github:ambition0802同学的指出):

public PriorityBlockingQueue(int initialCapacity,
                                  Comparator<? super E> comparator) {
         this.lock = new ReentrantLock(); //默认构造方法-非公平锁
         ...//其余代码略
     }
1
2
3
4
5

# 3.5 SynchronousQueue

这个队列比较特殊,没有任何内部容量,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。

需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。

以下方法的返回值,可以帮助理解这个队列:

  • iterator() 永远返回空,因为里面没有东西
  • peek() 永远返回 null
  • put() 往queue放进去一个 element 以后就一直wait直到有其他 thread 进来把这个 element 取走。
  • offer() 往 queue 里放一个 element 后立即返回,如果碰巧这个 element 被另一个 thread 取走了,offer 方法返回true,认为 offer 成功;否则返回 false。
  • take() 取出并且 remove 掉 queue 里的 element,取不到东西他会一直等。
  • poll() 取出并且 remove 掉 queue 里的 element,只有到碰巧另外一个线程正在往 queue 里 offer 数据或者 put数据的时候,该方法才会取到东西。否则立即返回 null。
  • isEmpty() 永远返回 true
  • remove()&removeAll() 永远返回 false

注意

PriorityBlockingQueue 不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,**生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。**对于使用默认大小的 LinkedBlockingQueue 也是一样的。

# 4. 阻塞队列的原理

阻塞队列的原理很简单,利用了 Lock 锁的多条件(Condition)阻塞控制。接下来我们分析 ArrayBlockingQueue JDK 1.8 的源码。

首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是notEmpty 和 notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是 put 操作时,给他加上监视器 notFull,标记这个线程是一个生产者;当线程是 take 操作时,给他加上监视器 notEmpty,标记这个线程是消费者。

//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
private final Condition notFull;  

public ArrayBlockingQueue(int capacity, boolean fair) {
    //..省略其他代码
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

put操作的源码

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 1.自旋拿锁
    lock.lockInterruptibly();
    try {
        // 2.判断队列是否满了
        while (count == items.length)
            // 2.1如果满了,阻塞该线程,并标记为notFull线程,
            // 等待notFull的唤醒,唤醒之后继续执行while循环。
            notFull.await();
        // 3.如果没有满,则进入队列
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 4 唤醒一个等待的线程
    notEmpty.signal();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

总结 put 的流程:

  1. 所有执行 put 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
  2. 判断阻塞队列是否满了,如果满了,则调用 await 方法阻塞这个线程,并标记为 notFull(生产者)线程,同时释放 lock 锁,等待被消费者线程唤醒。
  3. 如果没有满,则调用 enqueue 方法将元素 put 进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了 lock 锁的线程。
  4. 唤醒一个标记为 notEmpty(消费者)的线程。

take 操作的源码

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

take 操作和 put 操作的流程是类似的,总结一下 take 操作的流程:

  1. 所有执行 take 操作的线程竞争 lock 锁,拿到了 lock 锁的线程进入下一步,没有拿到 lock 锁的线程自旋竞争锁。
  2. 判断阻塞队列是否为空,如果是空,则调用 await 方法阻塞这个线程,并标记为 notEmpty(消费者)线程,同时释放 lock 锁,等待被生产者线程唤醒。
  3. 如果没有空,则调用 dequeue 方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock 锁的线程。
  4. 唤醒一个标记为 notFull(生产者)的线程。

注意

  1. put 和 take 操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
  2. 就算拿到锁了之后,也不一定会顺利进行 put/take 操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁。
  3. 在第 2 点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁,拿到锁了再 while 判断队列是否可用(这也是为什么不用if判断,而使用 while 判断的原因)。

# 5. 示例和使用场景

# 5.1 生产者-消费者模型

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{
        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Producer extends Thread{
        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

下面是这个例子的输出片段:

从队列取走一个元素,队列剩余0个元素
从队列取走一个元素,队列剩余0个元素
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:8
向队列取中插入一个元素,队列剩余空间:7
向队列取中插入一个元素,队列剩余空间:6
向队列取中插入一个元素,队列剩余空间:5
向队列取中插入一个元素,队列剩余空间:4
向队列取中插入一个元素,队列剩余空间:3
向队列取中插入一个元素,队列剩余空间:2
向队列取中插入一个元素,队列剩余空间:1
向队列取中插入一个元素,队列剩余空间:0
从队列取走一个元素,队列剩余1个元素
从队列取走一个元素,队列剩余9个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于put操作和System.out.println语句这两个操作没有上锁。考虑到这样的情况:线程1在执行完put/take操作后立即失去CPU时间片,然后切换到线程2执行put/take操作,执行完毕后回到线程1的System.out.println语句并输出,发现这个时候阻塞队列的size已经被线程2改变了,所以这个时候输出的size并不是当时线程1执行完put/take操作之后阻塞队列的size,但可以确保的是size不会超过10个。实际上使用阻塞队列是没有问题的。

# 5.2 线程池中使用阻塞队列

 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
1
2
3
4
5
6
7
8

Java 中的线程池就是使用阻塞队列实现的,我们在了解阻塞队列之后,无论是使用 Executors 类中已经提供的线程池,还是自己通过 ThreadPoolExecutor 实现线程池,都会更加得心应手。

# 参考

  • 转载自:http://concurrent.redspider.group/article/03/13.html
#多线程与并发#阻塞队列
上次更新: 2023-03-21
JUC - 并发容器简介
JUC - 通信工具类

← JUC - 并发容器简介 JUC - 通信工具类→

最近更新
01
开始
01-09
02
AI工具分享
01-09
03
AI 导读
01-07
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Jason Huang | 闽ICP备2025088096号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式