
前言

PriorityBlockingQueue 优先级队列,阻塞线程安全(添加、队列读取都进行了加锁)、码分无界、阻塞读阻塞的队列队列,底层采用的码分堆结构实现(二叉树),默认是阻塞小根堆,最小的队列或者最大的元素会一直置顶,每次获取都取最顶端的码分数据
队列创建
小根堆
PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(); 大根堆
PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } }); 应用场景
有任务要执行,可以对任务加一个优先级的阻塞权重,这样队列会识别出来,队列对该任务优先进行出队。码分
我们来看一个具体例子,阻塞例子中定义了一个将要放入“优先阻塞队列”的队列任务类,并且定义了一个任务工场类和一个任务执行类,码分在任务工场类中产生了各种不同优先级的任务,将其添加到队列中,在任务执行类中,任务被一个个取出并执行。
package com.niuh.queue.priority; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; /** * <p> * PriorityBlockingQueue使用示例 * </p> */ public class PriorityBlockingQueueDemo { public static void main(String[] args) throws Exception { Random random = new Random(47); ExecutorService exec = Executors.newCachedThreadPool(); PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(); exec.execute(new PrioritizedTaskProducer(queue, exec)); // 这里需要注意,往PriorityBlockingQueue中添加任务和取出任务的 exec.execute(new PrioritizedTaskConsumer(queue)); // 步骤是同时进行的,因而输出结果并不一定是亿华云计算有序的 } } class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> { private Random random = new Random(47); private static int counter = 0; private final int id = counter++; private final int priority; protected static List<PrioritizedTask> sequence = new ArrayList<>(); public PrioritizedTask(int priority) { this.priority = priority; sequence.add(this); } @Override public int compareTo(PrioritizedTask o) { return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0); // 定义优先级计算方式 } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(250)); } catch (InterruptedException e) { } System.out.println(this); } @Override public String toString() { return String.format("[%1$-3d]", priority) + " Task " + id; } public String summary() { return "(" + id + ": " + priority + ")"; } public static class EndSentinel extends PrioritizedTask { private ExecutorService exec; public EndSentinel(ExecutorService exec) { super(-1); this.exec = exec; } @Override public void run() { int count = 0; for (PrioritizedTask pt : sequence) { System.out.print(pt.summary()); if (++count % 5 == 0) { System.out.println(); } } System.out.println(); System.out.println(this + " Calling shutdownNow()"); exec.shutdownNow(); } } } class PrioritizedTaskProducer implements Runnable { private Random random = new Random(47); private Queue<Runnable> queue; private ExecutorService exec; public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) { this.queue = queue; this.exec = exec; } @Override public void run() { for (int i = 0; i < 20; i++) { queue.add(new PrioritizedTask(random.nextInt(10))); // 往PriorityBlockingQueue中添加随机优先级的任务 Thread.yield(); } try { for (int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(250); queue.add(new PrioritizedTask(10)); // 往PriorityBlockingQueue中添加优先级为10的任务 } for (int i = 0; i < 10; i++) { queue.add(new PrioritizedTask(i));// 往PriorityBlockingQueue中添加优先级为1-10的任务 } queue.add(new PrioritizedTask.EndSentinel(exec)); } catch (InterruptedException e) { } System.out.println("Finished PrioritizedTaskProducer"); } } class PrioritizedTaskConsumer implements Runnable { private PriorityBlockingQueue<Runnable> queue; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) { this.queue = queue; } @Override public void run() { try { while (!Thread.interrupted()) { queue.take().run(); // 任务的消费者,从PriorityBlockingQueue中取出任务执行 } } catch (InterruptedException e) { } System.out.println("Finished PrioritizedTaskConsumer"); } } 工作原理
PriorityBlockingQueue 是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:
PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11。 该队列可以说是真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647) 该队列属于权重队列,源码下载可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析) 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator。 注意:
堆结构实际上是一种完全二叉树。关于二叉树可以查看 《树、二叉树、二叉搜索树的实现和特性》 堆又分为大顶堆和小顶堆 。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。关于二叉堆可以查看《堆和二叉堆的实现和特性》 源码分析
定义
PriorityBlockingQueue的类继承关系如下:

其包含的方法定义如下:

成员属性
从下面的字段我们可以知道,该队列可以排序,云南idc服务商使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。
/** * 默认数组长度 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 最大达容量,分配时超出可能会出现 OutOfMemoryError 异常 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 队列,存储我们的元素 */ private transient Object[] queue; /** * 队列长度 */ private transient int size; /** * 比较器,入队进行权重的比较 */ private transient Comparator<? super E> comparator; /** * 显示锁 */ private final ReentrantLock lock; /** * 空队列时进行线程阻塞的 Condition 对象 */ private final Condition notEmpty; 构造函数
/** * 默认构造,使用长度为 11 的数组,比较器为空 */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * 自定义数据长度构造,比较器为空 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * 自定义数组长度,可以自定义比较器 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } /** * 构造函数,带有初始内容的队列 */ public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesnt return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) heapify(); } 入队方法
入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。
offer(E e)
public void put(E e) { offer(e); // never need to block } public boolean offer(E e) { //判断是否为空 if (e == null) throw new NullPointerException(); //显示锁 final ReentrantLock lock = this.lock; lock.lock(); //定义临时对象 int n, cap; Object[] array; //判断数组是否满了 while ((n = size) >= (cap = (array = queue).length)) //数组扩容 tryGrow(array, cap); try { //拿到比较器 Comparator<? super E> cmp = comparator; //判断是否有自定义比较器 if (cmp == null) //堆上浮 siftUpComparable(n, e, array); else //使用自定义比较器进行堆上浮 siftUpUsingComparator(n, e, array, cmp); //队列长度 +1 size = n + 1; //唤醒休眠的出队线程 notEmpty.signal(); } finally { //释放锁 lock.unlock(); } return true; } siftUpComparable(int k, T x, Object[] array)
上浮调整比较器方法的实现
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { //无符号向左移,目的是找到放入位置的父节点 int parent = (k - 1) >>> 1; //拿到父节点的值 Object e = array[parent]; //比较是否大于该元素,不大于就没比较交换 if (key.compareTo((T) e) >= 0) break; //以下都是元素位置交换 array[k] = e; k = parent; } array[k] = key; } 根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。
入队图解
例子:85 添加到二叉堆中(大顶堆)
package com.niuh.queue.priority; import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; /** * <p> * PriorityBlockingQueue 简单演示 demo * </p> */ public class TestPriorityBlockingQueue { public static void main(String[] args) throws InterruptedException { // 大顶堆 PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(10, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } }); concurrentLinkedQueue.offer(90); concurrentLinkedQueue.offer(80); concurrentLinkedQueue.offer(70); concurrentLinkedQueue.offer(60); concurrentLinkedQueue.offer(40); concurrentLinkedQueue.offer(30); concurrentLinkedQueue.offer(20); concurrentLinkedQueue.offer(10); concurrentLinkedQueue.offer(50); concurrentLinkedQueue.offer(85); //输出元素排列 concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" ")); //取出元素 Integer take = concurrentLinkedQueue.take(); System.out.println(); concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" ")); } } 
操作的细节分为两步:
第一步:首先把新元素插入到堆的尾部再说;(新的元素可能是特别大或者特别小,那么要做的一件事情就是重新维护一下堆的所有元素,把新元素挪到这个堆的相应的位置) 第二步:依次向上调整整个堆的结构,就叫 HeapifyUp 
85 按照上面讲的先插入到堆的尾部,也就是一维数组的尾部,一维数组的尾部的话就上图的位置,因为这是一个完全二叉树,所以它的尾部就是50后面这个结点。插进来之后这个时候就破坏了堆,它的每一个结点都要大于它的儿子的这种属性了,接下来要做的事情就是要把 85 依次地向上浮动,怎么浮动?就是 85 大于它的父亲结点,那么就和父亲结点进行交换,直到走到根如果大于根的话,就和根也进行交换。

85 再继续往前走之后,它要和 80 再进行比较,同理可得:也就是说这个结点每次和它的父亲比,如果它大于它的父亲的话就交换,直到它不再大于它的父亲。

出队方法
入队列的方法说完后,我们来说说出队列的方法。PriorityBlockingQueue提供了多种出队操作的实现来满足不同情况下的需求,如下:
E take(); E poll(); E poll(long timeout, TimeUnit unit); E peek() poll 和 peek 与上面类似,这里不做说明
take()
出队方法,该方法会阻塞
public E take() throws InterruptedException { //显示锁 final ReentrantLock lock = this.lock; //可中断锁 lock.lockInterruptibly(); //结果接收对象 E result; try { //判断队列是否为空 while ( (result = dequeue()) == null) //线程阻塞 notEmpty.await(); } finally { lock.unlock(); } return result; } dequeue()
我们再来看看具体出队方法的实现,dequeue方法
private E dequeue() { //长度减少 1 int n = size - 1; //判断队列中是否有元素 if (n < 0) return null; else { //队列对象 Object[] array = queue; //取出第一个元素 E result = (E) array[0]; //拿出最后一个元素 E x = (E) array[n]; //置空 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) //下沉调整 siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); //成功则减少队列中的元素数量 size = n; return result; } 总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。
siftDownComparable(int k, T x, Object[] array, int n)
再来看看下沉比较器方法的实现
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { //判断队列长度 if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //找到队列最后一个元素的父节点的索引。 int half = n >>> 1; // loop while a non-leaf while (k < half) { //拿到 k 节点下的左子节点 int child = (k << 1) + 1; // assume left child is least //取得子节点对应的值 Object c = array[child]; //取得 k 右子节点的索引 int right = child + 1; //比较右节点的索引是否小于队列长度和左右子节点的值进行比较 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //比较父节点值是否大于子节点 if (key.compareTo((T) c) <= 0) break; //下面都是元素替换 array[k] = c; k = child; } array[k] = key; } } 出队图解
将堆尾元素替换到顶部(即堆顶被替代删除掉)
依次从根部向下调整整个堆的结构(一直到堆尾即可) HeapifyDown
例子:90 从二叉堆中删除(大顶堆)

总结
PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。