本文共 12285 字,大约阅读时间需要 40 分钟。
1.什么是阻塞队列2.几种主要的阻塞队列3.阻塞队列中的方法 VS 非阻塞队列中的方法4.阻塞队列的实现原理5.示例和使用场景
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 1.在队列为空时,获取元素的线程会等待队列变为非空。 2.当队列满时,存储元素的线程会等待队列可用。 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
import java.util.PriorityQueue;public class Test2 { private int queueSize = 10; private PriorityQueuequeue = new PriorityQueue (queueSize); public static void main(String[] args) { Test2 test = new Test2(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("队列空,等待数据"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走队首元素 queue.notify(); System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("队列满,等待有空余空间"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一个元素 queue.notify(); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } } } }}
1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。5. SynchronousQueue:一个不存储元素的阻塞队列。6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
1、往队列中添加元素的方法有4钟,分别为:add(e)/offer(e)/put(e)/offer(e,time,unit)2、往队列中取元素的方法有4种,分别为:remove()/poll()/take()/poll(long,TimeUnit).3、检查队列中的元素有2种,分别为:element()/peek().文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html
public class ArrayBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable {
final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
ArrayBlockingQueue是基于数组,所以有一个数组items来保存元素。关键字synchronized与wait()和notify()/notifyAll()方法相结合可以实现等待/通知模式。但是Condition可以实现多路通知,有选择性地进行线程通知。
//创建一个指定大小的队列对象。 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//NonfairSync(悲观锁) notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
ReentrantLock.lock函数中,会调用到NonfairSync.lock方法,首先会通过CAS方法,尝试将当前的AQS中的State字段改成从0改成1,如果修改成功的话,说明原来的状态是0,并没有线程占用锁,而且成功的获取了锁,只需要调用setExclusiveOwnerThread函将当前线程设置成持有锁的线程即可。否则,CAS操作失败之后,和普通锁一样,调用acquire(1)函数尝试获取锁。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock;//获取锁 lock.lockInterruptibly();//如果当前线程未被中断,则获取锁定,如果已经被中断则出现异常。还有lock/tryLock。 try { //检查是否已满,如果已满,则调用Condition的await方法等待并释放锁 while (count == items.length) notFull.await(); enqueue(e);//如果没满,则直接加入到队列中 } finally { lock.unlock();//最后释放锁 } } private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //唤醒一个消费者 notEmpty.signal(); }
如果满了调用notFull.await(),如果有增加元素,唤醒notEmpty.signal()。take()和put()在这方面相反。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//如果队列中存储的元素为空,则等待直至队列中非空,put()中被唤醒。 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();//唤醒put()中等待的生产者 return x; }
public class LinkedBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable
static class Node{ E item; Node next; Node(E x) { item = x; } } private final int capacity;//容量,默认Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger(); 当前队列中存储元素的数量 transient Node head; private transient Node last; private final ReentrantLock takeLock = new ReentrantLock(); private final ReentrantLock putLock = new ReentrantLock();//take和put的锁,ArrayBlockingQueue是通用一个,因为数组不能同时put和take,但是链表可以。 private final Condition notEmpty = takeLock.newCondition(); private final Condition notFull = putLock.newCondition();//同样的2个锁和Condition。
ArrayBlockingQueue是共用一个,而LinkedBlockingQueue采用了两把锁,一个是take锁,一个是put锁,这就说明,put和take操作是可以同时进行的。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null);//头尾结点指向一个节点,存储的元素为null }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1;//注意:用此局部变量持有一个负数来指示CAS操作是否操作成功。 c = count.getAndIncrement();//利用原子性加一 Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //如果已到达最大容量,则等待直到有空间来进行存储才会被唤醒 notFull.await(); } enqueue(node); c = count.getAndIncrement();//利用原子性加一 if (c + 1 < capacity)//如果当前存储的元素个数小于容量,则唤醒正在等待的生产者的线程。 notFull.signal(); } finally { putLock.unlock(); } if (c == 0)//第一次添加了一个元素,因此需要唤醒一个消费者线程 signalNotEmpty(); } /* * 将节点加入到链表的末尾 */ private void enqueue(Node node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //唤醒一个消费者线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//加take锁 try { //如果当前队列为空,则一直等待直到队列非空 while (count.get() == 0) { notEmpty.await(); } x = dequeue();//将头结点取出来 c = count.getAndDecrement();//利用原子性进行进行减一操作 //如果此时的容量是大于1的,则说明还有其它元素可以被消费线程获取,因此唤醒其他消费者线程() if (c > 1) notEmpty.signal(); } finally { takeLock.unlock();//释放锁 } //如果c的容量是等于capacity,又被消费了一个,因此可以通知生产者线程来进行生产 if (c == capacity) signalNotFull(); return x; } private E dequeue() { Nodeh = head; Node first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; } /** * 唤醒一个生产者线程 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
一个支持优先级排序的无界阻塞队列。PriorityBlockingQueue需要插入其中的元素类型提供一个Comparator,PriorityBlockingQueue使用这个Comparator来确定元素之间的优先级关系,底层是基于一个二叉堆的实现。
支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素。常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。
public class Test{ private int queueSize = 10; private ArrayBlockingQueuequeue = new ArrayBlockingQueue (queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { try { queue.take(); System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+ (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } }}
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
1.它是一种固定大小的线程池;2.corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;3.keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;4.但这里keepAliveTime无效;阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;5.由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;6.由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
它是一个可以无限扩大的线程池;它比较适合处理执行时间比较小的任务;corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
参考资料: