博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java-阻塞队列
阅读量:4207 次
发布时间:2019-05-26

本文共 12285 字,大约阅读时间需要 40 分钟。

概要

1.什么是阻塞队列2.几种主要的阻塞队列3.阻塞队列中的方法 VS 非阻塞队列中的方法4.阻塞队列的实现原理5.示例和使用场景

1.什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:    1.在队列为空时,获取元素的线程会等待队列变为非空。    2.当队列满时,存储元素的线程会等待队列可用。     阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

1.1简单的阻塞队列代码
import java.util.PriorityQueue;public class Test2 {
private int queueSize = 10; private PriorityQueue
queue = new PriorityQueue
(queueSize); public static void main(String[] args) { Test2 test = new Test2(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{
@Override public void run() { consume(); } private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("队列空,等待数据"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走队首元素 queue.notify(); System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } } } } class Producer extends Thread{
@Override public void run() { produce(); } private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("队列满,等待有空余空间"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一个元素 queue.notify(); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } } } }}

2.几种主要的阻塞队列

1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。5. SynchronousQueue:一个不存储元素的阻塞队列。6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

3.阻塞队列中的方法 VS 非阻塞队列中的方法

这里写图片描述

1、往队列中添加元素的方法有4钟,分别为:add(e)/offer(e)/put(e)/offer(e,time,unit)2、往队列中取元素的方法有4种,分别为:remove()/poll()/take()/poll(long,TimeUnit).3、检查队列中的元素有2种,分别为:element()/peek().文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html

4.阻塞队列的实现原理

4.1 ArrayBlockingQueue
4.1.1 ArrayBlockingQueue继承体系结构
public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable {
4.1.2 ArrayBlockingQueue的相关属性
final Object[] items;    int takeIndex;    int putIndex;    int count;    final ReentrantLock lock;    private final Condition notEmpty;    private final Condition notFull;
ArrayBlockingQueue是基于数组,所以有一个数组items来保存元素。关键字synchronized与wait()和notify()/notifyAll()方法相结合可以实现等待/通知模式。但是Condition可以实现多路通知,有选择性地进行线程通知。
4.1.3 ArrayBlockingQueue的构造方法
//创建一个指定大小的队列对象。    public ArrayBlockingQueue(int capacity) {        this(capacity, false);    }    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);//NonfairSync(悲观锁)        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }
ReentrantLock.lock函数中,会调用到NonfairSync.lock方法,首先会通过CAS方法,尝试将当前的AQS中的State字段改成从0改成1,如果修改成功的话,说明原来的状态是0,并没有线程占用锁,而且成功的获取了锁,只需要调用setExclusiveOwnerThread函将当前线程设置成持有锁的线程即可。否则,CAS操作失败之后,和普通锁一样,调用acquire(1)函数尝试获取锁。
4.1.4 ArrayBlockingQueue类中的put方法
public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;//获取锁        lock.lockInterruptibly();//如果当前线程未被中断,则获取锁定,如果已经被中断则出现异常。还有lock/tryLock。        try {            //检查是否已满,如果已满,则调用Condition的await方法等待并释放锁            while (count == items.length)                notFull.await();            enqueue(e);//如果没满,则直接加入到队列中        } finally {            lock.unlock();//最后释放锁        }    }    private static void checkNotNull(Object v) {        if (v == null)            throw new NullPointerException();    }    private void enqueue(E x) {        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        //唤醒一个消费者        notEmpty.signal();    }
如果满了调用notFull.await(),如果有增加元素,唤醒notEmpty.signal()。take()和put()在这方面相反。
4.1.5 ArrayBlockingQueue类中的take方法
public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)                notEmpty.await();//如果队列中存储的元素为空,则等待直至队列中非空,put()中被唤醒。            return dequeue();        } finally {            lock.unlock();        }    }    private E dequeue() {        final Object[] items = this.items;        @SuppressWarnings("unchecked")        E x = (E) items[takeIndex];        items[takeIndex] = null;        if (++takeIndex == items.length)            takeIndex = 0;        count--;        if (itrs != null)            itrs.elementDequeued();        notFull.signal();//唤醒put()中等待的生产者        return x;    }
4.2 LinkedBlockingQueue
4.2.1 LinkedBlockingQueue继承体系结构
public class LinkedBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable
4.2.2 ArrayBlockingQueue的相关属性
static class Node
{ E item; Node
next; Node(E x) { item = x; } } private final int capacity;//容量,默认Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger(); 当前队列中存储元素的数量 transient Node
head; private transient Node
last; private final ReentrantLock takeLock = new ReentrantLock(); private final ReentrantLock putLock = new ReentrantLock();//take和put的锁,ArrayBlockingQueue是通用一个,因为数组不能同时put和take,但是链表可以。 private final Condition notEmpty = takeLock.newCondition(); private final Condition notFull = putLock.newCondition();//同样的2个锁和Condition。
ArrayBlockingQueue是共用一个,而LinkedBlockingQueue采用了两把锁,一个是take锁,一个是put锁,这就说明,put和take操作是可以同时进行的。
4.2.3 LinkedBlockingQueue的构造方法
public LinkedBlockingQueue() {        this(Integer.MAX_VALUE);    }    public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;        last = head = new Node
(null);//头尾结点指向一个节点,存储的元素为null }
4.2.4 LinkedBlockingQueue的put方法介绍
public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        int c = -1;//注意:用此局部变量持有一个负数来指示CAS操作是否操作成功。 c = count.getAndIncrement();//利用原子性加一        Node
node = new Node
(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) {
//如果已到达最大容量,则等待直到有空间来进行存储才会被唤醒 notFull.await(); } enqueue(node); c = count.getAndIncrement();//利用原子性加一 if (c + 1 < capacity)//如果当前存储的元素个数小于容量,则唤醒正在等待的生产者的线程。 notFull.signal(); } finally { putLock.unlock(); } if (c == 0)//第一次添加了一个元素,因此需要唤醒一个消费者线程 signalNotEmpty(); } /* * 将节点加入到链表的末尾 */ private void enqueue(Node
node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //唤醒一个消费者线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
4.2.5 LinkedBlockingQueue的take方法介绍
public E take() throws InterruptedException {        E x;        int c = -1;        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();//加take锁        try {            //如果当前队列为空,则一直等待直到队列非空            while (count.get() == 0) {                notEmpty.await();            }            x = dequeue();//将头结点取出来            c = count.getAndDecrement();//利用原子性进行进行减一操作            //如果此时的容量是大于1的,则说明还有其它元素可以被消费线程获取,因此唤醒其他消费者线程()            if (c > 1)                notEmpty.signal();        } finally {            takeLock.unlock();//释放锁        }        //如果c的容量是等于capacity,又被消费了一个,因此可以通知生产者线程来进行生产        if (c == capacity)            signalNotFull();        return x;    }    private E dequeue() {        Node
h = head; Node
first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; } /** * 唤醒一个生产者线程 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
4.3 PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列。PriorityBlockingQueue需要插入其中的元素类型提供一个Comparator,PriorityBlockingQueue使用这个Comparator来确定元素之间的优先级关系,底层是基于一个二叉堆的实现。
4.4 PriorityQueue
支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素。常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
4.5 SynchronousQueue
不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。

5.示例和使用场景

5.1示例
public class Test{
private int queueSize = 10; private ArrayBlockingQueue
queue = new ArrayBlockingQueue
(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread {
@Override public void run() { consume(); } private void consume() { while (true) { try { queue.take(); System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread {
@Override public void run() { produce(); } private void produce() { while (true) { try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+ (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } }}
5.2应用场景
5.2.1 FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
()); }

这里写图片描述

1.它是一种固定大小的线程池;2.corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;3.keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;4.但这里keepAliveTime无效;阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;5.由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;6.由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
5.2.2 FixedThreadPool
public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue
()); }

这里写图片描述

它是一个可以无限扩大的线程池;它比较适合处理执行时间比较小的任务;corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

参考资料:

你可能感兴趣的文章
面试刷题20:并发包中的线程池有哪些?
查看>>
面试刷题21:java并发工具中的队列有哪些?
查看>>
面试刷题22:CAS和AQS是什么?
查看>>
面试刷题23:类加载过程和双亲委派机制?
查看>>
面试刷题24:介绍一枚 JAVA妹妹?
查看>>
面试刷题25:jvm的垃圾收集算法?
查看>>
面试刷题mysql1:一条sql语句是如何经过mysql的体系结构的?
查看>>
mysql之日志
查看>>
面试刷题26:新冠攻击人类?什么攻击java平台?
查看>>
面试刷题27:程序员如何防护java界的新冠肺炎?
查看>>
面试刷题28:如何写出安全的java代码?
查看>>
面试刷题29:mysql事务隔离实现原理?
查看>>
面试刷题30:SpringBean的生命周期?
查看>>
面试刷题31:分布式ID设计方案
查看>>
面试刷题32:你对tomcat做了哪些性能调优?
查看>>
面试刷题33:如何应对高并发?
查看>>
面试刷题34:说一下分布式架构中的缓存使用场景?
查看>>
面试刷题35:负载均衡有哪几种实现方式?
查看>>
面试刷题36:线程池的原理和使用方法?
查看>>
面试刷题36:线程池的原理和使用方法?
查看>>