前言 今天就开始分析JAVA中的队列了,先从LinkedBlockingQueue开始吧。不同于LinkedList,它是 基于链表实现的线程安全的队列实现,可以用来在多线程环境中安全地传递数据,总结来说,它具有以下特点:
链表实现 :内部使用单向链表来存储元素,这使得它在插入和移除操作时的性能表现较好。
线程安全 :它可以在多个线程之间安全地进行操作,而不需要额外的同步手段。
阻塞操作 :它支持阻塞操作,包括阻塞式的插入和移除操作。当队列为空时,试图从队列中获取元素的操作会被阻塞,直到队列中有可用元素为止;当队列已满时,试图向队列中插入元素的操作会被阻塞,直到队列有足够的空间为止。
容量可选 :可以选择是否限制队列的容量。如果指定了容量限制,那么队列的大小将受限于该容量,当队列达到容量上限时,插入操作将被阻塞,直到有元素被移除为止。如果不指定容量限制,则队列的大小理论上可以无限增长(Integer.MAX_VALUE)。
先进先出 :保证了元素的插入和移除顺序是先进先出的。
类图如下:
关键属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private final int capacity;private final AtomicInteger count = new AtomicInteger ();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock ();@SuppressWarnings("serial") private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock ();@SuppressWarnings("serial") private final Condition notFull = putLock.newCondition();static class Node <E> { E item; Node<E> next; Node(E x) { item = x; } }
构造器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public LinkedBlockingQueue () { this (Integer.MAX_VALUE); } public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException (); this .capacity = capacity; last = head = new Node <E>(null ); } public LinkedBlockingQueue (Collection<? extends E> c) { this (Integer.MAX_VALUE); final ReentrantLock putLock = this .putLock; putLock.lock(); try { int n = 0 ; for (E e : c) { if (e == null ) throw new NullPointerException (); if (n == capacity) throw new IllegalStateException ("Queue full" ); enqueue(new Node <E>(e)); ++n; } count.set(n);, } finally { putLock.unlock(); } }
常用方法 添加方法 offer(E e) 将e插入队列的尾部,插入成功返回true,否则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean offer (E e) { if (e == null ) throw new NullPointerException (); final AtomicInteger count = this .count; if (count.get() == capacity) return false ; final int c; final Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; putLock.lock(); try { if (count.get() == capacity) return false ; enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return true ; }
看下入队的方法,就是将last节点的next指针指向node节点,然后将这个node赋值给last对象。
1 2 3 4 5 private void enqueue (Node<E> node) { last = last.next = node; }
add(E e) add方法也是将一个数据添加到队列的尾部,可以看到它底层是调用了offer()方法,只是它会返回添加成功与否。
1 2 3 4 5 6 public boolean add (E e) { if (offer(e)) return true ; else throw new IllegalStateException ("Queue full" ); }
访问方法 peek() 获取头部的元素的value值,但是并不会异常。可以看到获取元素也是要加锁的。
1 2 3 4 5 6 7 8 9 10 11 12 public E peek () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { return (count.get() > 0 ) ? head.next.item : null ; } finally { takeLock.unlock(); } }
take() 返回头部元素的value值,并移除这个头部节点。如果当前队列为空,就进入阻塞状态,直到唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public E take () throws InterruptedException { final E x; final int c; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
poll() 以非阻塞的放出返回头部元素的值,并移除头部元素。它与take方法的区别就是它不会在线程被终止的时候抛出异常,也不会在队列为空的时候阻塞当前线程,而是返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; final E x; final int c; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() == 0 ) return null ; x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
移除方法 remove() 返回并移除头部元素,可以看到底层调用的是poll()方法,要注意点是如果队列为空时会抛出NoSuchElementException异常。
1 2 3 4 5 6 7 public E remove () { E x = poll(); if (x != null ) return x; else throw new NoSuchElementException (); }
remove(E e) 从队列中移除包含指定内容的节点,这种操作时需要加putLock和takeLock,阻止读写操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public boolean remove (Object o) { if (o == null ) return false ; fullyLock(); try { for (Node<E> pred = head, p = pred.next; p != null ; pred = p, p = p.next) { if (o.equals(p.item)) { unlink(p, pred); return true ; } } return false ; } finally { fullyUnlock(); } } void fullyLock () { putLock.lock(); takeLock.lock(); }
计数方法 计数就是size()方法了。从上面代码可以看出我们在操作队列时都会计算一个count值,这个值是并发安全的,所以队列的数量就是这个值的内容。
1 2 3 public int size () { return count.get(); }