ConcurrentLinkedQueue ConcurrentLinkedQueue 线程安全的无界非阻塞 队列,其底层数据结构使用单向链表实现,对于入队和出队操作使用 CAS 来实现线程安全。
类图
ConcurrentLinkedQueue 内部的队列使用单向链表的方式实现,其中有两个volatile 类型的Node 节点分别用来存放队列首、尾节点。
Node 内部则维护一个使用volatile 修饰的item,用来存放节点的值;next 用来存放链表的下一个节点;其内部使用UNSafe 工具类提供的CAS 算法来保证入队时操作链表的原子性。
实现原理 offer 操作 在队列末尾添加一个元素,如果传递的参数是null 则抛出NPE 异常,否则由于 ConcurrentLinkedQueue 是无界队列,该方法一直会返回true 。另外,由于使用CAS 无阻塞算法,因此方法不会阻塞挂起调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean offer (E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null ) { if (p.casNext(null , newNode)) { if (p != t) casTail(t, newNode); return true ; } } else if (p == q) p = (t != (t = tail)) ? t : head; else p = (p != t && t != (t = tail)) ? t : q; } }
add 操作 add 操作是在链表末尾添一个元素,其实在内部调用的还是 offer 操作。
1 2 3 public boolean add (E e) { return offer(e); }
poll 操作 poll 操作是在队列头部获取并移除一个元素 如果队列为空则返回 null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public E poll () { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null )) { if (p != h) updateHead(h, ((q = p.next) != null ) ? q : p); return item; } else if ((q = p.next) == null ) { updateHead(h, p); return null ; } else if (p == q) continue restartFromHead; else p = q; } } } final void updateHead (Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
peek 操作 获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public E peek () { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null || (q = p.next) == null ) { updateHead(h, p); return item; } else if (p == q) continue restartFromHead; else p = q; } } }
size 操作 计算当前队列元素个数,在并发环境下不是很有用,因为CAS 没有加锁,所以从调用size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public int size () { int count = 0 ; for (Node<E> p = first(); p != null ; p = succ(p)) if (p.item != null ) if (++count == Integer.MAX_VALUE) break ; return count; } Node<E> first () { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null ); if (hasItem || (q = p.next) == null ) { updateHead(h, p); return hasItem ? p : null ; } else if (p == q) continue restartFromHead; else p = q; } } } final Node<E> succ (Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
remove 操作 如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回true,否则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean remove (Object o) { if (o != null ) { Node<E> next, pred = null ; for (Node<E> p = first(); p != null ; pred = p, p = next) { boolean removed = false ; E item = p.item; if (item != null ) { if (!o.equals(item)) { next = succ(p); continue ; } removed = p.casItem(item, null ); } next = succ(p); if (pred != null && next != null ) pred.casNext(p, next); if (removed) return true ; } } return false ; }
contains操作 判断队列里面是否含有指定对象,由于是遍历整个队列,所以像size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回 false。
1 2 3 4 5 6 7 8 9 public boolean contains (Object o) { if (o == null ) return false ; for (Node<E> p = first(); p != null ; p = succ(p)) { E item = p.item; if (item != null && o.equals(item)) return true ; } return false ; }
LinkedBlockingQueue LinkedBlockingQueue是使用独占锁实现的无界(可指定有界)阻塞 队列。
类图
LinkedBlockingQueue 也是使用单向链表实现的,其也有两个Node ,分别用来存放首、尾节点,并且还有一个初始值为 0 的原子变量count ,用来记录队列元素个数。
还有两个ReentrantLock 的实例,分别用来控制元素入队和出队的原子性,其中takeLock 用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待。putLock 控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。
另外,notEmpty 和notFull 是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程,其实这是生产者-消费者模型。
从LinkedBlockingQueue 的构造函数可知,其默认容量是0x7fffffff,用户也可以自定义容量,所以从一定程度上可以说从LinkedBlockingQueue 是有界阻塞队列。
1 2 3 4 5 6 7 8 9 public LinkedBlockingQueue () { this (Integer.MAX_VALUE); } public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .capacity = capacity; last = head = new Node<E>(null ); }
实现原理 offer操作 向队列尾部插入一个元素,如果队列中有空闲则插入成功后返回true ,如果队列己满则丢弃当前元素然后返回false 。此方法是非阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public boolean offer (E e) { if (e == null ) throw new NullPointerException(); final AtomicInteger count = this .count; if (count.get() == capacity) return false ; int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return c >= 0 ; } private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
put操作 向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列有空闲插入成功后返回。如果在阻塞时被其他线程设置了中断标志,被阻塞线程会抛出InterruptedException 异常而返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
poll操作 从队列头部获取并移除一个元素 如果队列为空则返回null 。此方法是不阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; E x = null ; int c = -1 ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() > 0 ) { x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
peek操作 获取队列头部元素但是不从队列里面移除它,如果队列为空返回null 。此方法是不阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public E peek () { if (count.get() == 0 ) return null ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null ) return null ; else return first.item; } finally { takeLock.unlock(); } }
take操作 获取当前队列头部元素并从队列里移除它,如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则阻塞线程会抛出InterruptedException 异常而返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
remove操作 删除队列里面指定的元素,有则删除并返回true ,没有则返回 false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public boolean remove (Object o) { if (o == null ) return false ; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null ; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true ; } } return false ; } finally { fullyUnlock(); } } void unlink (Node<E> p, Node<E> trail) { p.item = null ; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); } void fullyLock () { putLock.lock(); takeLock.lock(); } void fullyUnlock () { takeLock.unlock(); putLock.unlock(); }
size操作 获取当前队列元素个数。
1 2 3 public int size () { return count.get(); }
由于进行入队和出队操作时的count加了锁,所以结果比ConcurentLinkedQueue的size 方法准确。
ConcurentLinkedQueue中遍历链表获取size未使用原子变量保存是因为使用原子变量保存队列元素个数需要保证入队、出队和原子变量操作时原子性操作,而ConcurentLinkedQueue 使用的是CAS 无锁算法,所以无法实现。
ArrayBlockingQueue ArrayBlockingQueue 是用有界数组 方式实现的阻塞 队列。
类图
ArrayBlockingQueue 内部结构如下:
items: 数组,用来存放队列元素
putIndex:入队元素下标
takeIndex: 出队元素下标
count: 队列元素个数
lock: 独占锁,保证出、入操作的原子性
notEmpty: 出队条件变量
notFull:入队条件变量
构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
ArrayBlockingQueue 是有界队列,构造函数必须传入队列大小参数。在默认情况下使用ReentrantLock 提供的非公平独占锁进行出、入队操作的同步。
实现原理 offer操作 向队列尾部插入一个非空元素,如果队列有空闲空间则插入成功后返回true ,如果队列已满则丢弃当前元素然后返回false 。此方法是不阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } } private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
put操作 向队列尾部插入一个非空元素,如果队列有空闲空间则插入后直接返回true ,如果队列已满则阻塞当前线程直到队列有空闲并插入成功后返回true ,如果在阻塞时被其他线程设置了中断标志, 则被阻塞线程会抛出InterruptedException 异常而返回。
1 2 3 4 5 6 7 8 9 10 11 12 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
poll操作 从队列头部获取一个元素,如果队列为空则返回null。此方法是不阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
take操作 获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素。此方法响应中断。
1 2 3 4 5 6 7 8 9 10 11 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
peek操作 获取队列头部元素但是不从队列里面移除它。
1 2 3 4 5 6 7 8 9 10 11 12 13 public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } } final E itemAt (int i) { return (E) items[i]; }
size操作 计算当前队列元素个数(全局锁,结果精准)。
1 2 3 4 5 6 7 8 9 public int size () { final ReentrantLock lock = this .lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
PriorityBlockingQueue PriorityBlockingQueue 是带优先级的无界阻塞 队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使用对象的CompareTo 方法提供比较规则,如果需要自定义比较规则则可以自定义comparators。
类图
PriorityBlockingQueue 内部结构如下:
queue:数组,用来存放队列元素
size:队列元素个数
allocationSpinLock:自旋锁,使用CAS 操作保证只有一个线程可以进行扩容,0表示当前没有进行扩容,1表示正在扩容
lock: 独占锁,保证同时只有一个线程可以进行入队、出队操作
notEmpty:出队条件变量
构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private static final int DEFAULT_INITIAL_CAPACITY = 11 ;public PriorityBlockingQueue () { this (DEFAULT_INITIAL_CAPACITY, null ); } public PriorityBlockingQueue (int initialCapacity) { this (initialCapacity, null ); } public PriorityBlockingQueue (int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1 ) throw new IllegalArgumentException(); this .lock = new ReentrantLock(); this .notEmpty = lock.newCondition(); this .comparator = comparator; this .queue = new Object[initialCapacity]; }
PriorityBlockingQueue 默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法确认优先级(元素必须实现Comparable接口)。
实现原理 offer操作 在队列中插入一个元素,由于是无界队列所以一直返回true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public boolean offer (E e) { if (e == null ) throw new NullPointerException(); final ReentrantLock lock = this .lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null ) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1 ; notEmpty.signal(); } finally { lock.unlock(); } return true ; } private void tryGrow (Object[] array, int oldCap) { lock.unlock(); Object[] newArray = null ; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this , allocationSpinLockOffset, 0 , 1 )) { try { int newCap = oldCap + ((oldCap < 64 ) ? (oldCap + 2 ) : (oldCap >> 1 )); if (newCap - MAX_ARRAY_SIZE > 0 ) { int minCap = oldCap + 1 ; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0 ; } } if (newArray == null ) Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0 , newArray, 0 , oldCap); } } private static <T> void siftUpComparable (int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0 ) { int parent = (k - 1 ) >>> 1 ; Object e = array[parent]; if (key.compareTo((T) e) >= 0 ) break ; array[k] = e; k = parent; } array[k] = key; }
poll操作 获取队列内部堆树的根节点元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } private E dequeue () { int n = size - 1 ; if (n < 0 ) return null ; else { Object[] array = queue; E result = (E) array[0 ]; E x = (E) array[n]; array[n] = null ; Comparator<? super E> cmp = comparator; if (cmp == null ) siftDownComparable(0 , x, array, n); else siftDownUsingComparator(0 , x, array, n, cmp); size = n; return result; } }
put操作 put 操作内部调用的是offer 操作,由于是无界队列,所以不需要阻塞。
take操作 获取队列内部堆树的根节点元素,如果队列为空则阻塞,响应中断。
1 2 3 4 5 6 7 8 9 10 11 12 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null ) notEmpty.await(); } finally { lock.unlock(); } return result; }
size操作 计算队列元素个数。
1 2 3 4 5 6 7 8 9 public int size () { final ReentrantLock lock = this .lock; lock.lock(); try { return size; } finally { lock.unlock(); } }
DelayQueue DelayQueue 并发队列是一个无界阻塞延迟 队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。
类图
DelayQueue 内部使用PriorityQueue 存放数据,使用ReentrantLock 实现线程同步。队列中的元素需要实现Delayed 接口,实现比较接口。
1 2 3 4 public interface Delayed extends Comparable <Delayed > { long getDelay (TimeUnit unit) ; }
leader 变量的使用基于 Lead - Follower 模式的变体,用于尽量减少不必要的线程等待。当一个线程调用队列的take 方法变leader 线程后,它会调用条件变量available.awaitNanos(delay) 等待delay 时间,但是其他线程(follwer 线程)会调用available.await()进行无限等待。leader 线程延迟时间过期后,会退出take 方法,并通过调用available.signal()方法唤醒一个follwer 线程,被唤醒的follwer 线程被选举为新的leader 线程。
实现原理 offer操作 插入非空元素到队列,由于是无界队列所以一直返回true 。插入元素要实现Delayed 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock.unlock(); } }
take操作 获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
poll操作 获取并移除队头过期元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(NANOSECONDS) > 0 ) return null ; else return q.poll(); } finally { lock.unlock(); } }
size操作 计算队列元素个数,包含过期的和没有过期的。
1 2 3 4 5 6 7 8 9 public int size () { final ReentrantLock lock = this .lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }