前言

今天就开始分析JAVA中的队列了,先从LinkedBlockingQueue开始吧。不同于LinkedList,它是 基于链表实现的线程安全的队列实现,可以用来在多线程环境中安全地传递数据,总结来说,它具有以下特点:

  1. 链表实现:内部使用单向链表来存储元素,这使得它在插入和移除操作时的性能表现较好。
  2. 线程安全:它可以在多个线程之间安全地进行操作,而不需要额外的同步手段。
  3. 阻塞操作:它支持阻塞操作,包括阻塞式的插入和移除操作。当队列为空时,试图从队列中获取元素的操作会被阻塞,直到队列中有可用元素为止;当队列已满时,试图向队列中插入元素的操作会被阻塞,直到队列有足够的空间为止。
  4. 容量可选:可以选择是否限制队列的容量。如果指定了容量限制,那么队列的大小将受限于该容量,当队列达到容量上限时,插入操作将被阻塞,直到有元素被移除为止。如果不指定容量限制,则队列的大小理论上可以无限增长(Integer.MAX_VALUE)。
  5. 先进先出:保证了元素的插入和移除顺序是先进先出的。

类图如下:

image-20240401125844469

关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 这是队列的容量,如果在构造时没有指定,那么默认值为Integer.MAX_VALUE。这个值决定了队列可以容纳的元素数量
private final int capacity;

// 用于跟踪队列中元素的数量并支持相关的并发操作。
private final AtomicInteger count = new AtomicInteger();

// 这是队列的头节点,也就是队列中最早进入的元素。这个节点的item字段总是null,真正的队列头元素是head.next
transient Node<E> head;

// 这是队列的尾节点,也就是队列中最后进入的元素。这个节点的next字段总是null。
private transient Node<E> last;

// 用于在执行取元素操作时进行同步。它主要用于保护 head 节点和相关的操作,确保线程安全。
private final ReentrantLock takeLock = new ReentrantLock();

// 与takeLock配合使用。当队列为空时,获取元素的操作会等待在这个条件上,直到队列不为空
@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notEmpty = takeLock.newCondition();

// 用于在执行放置元素操作时进行同步。它主要用于保护 tail 节点和相关的操作,确保线程安全。
private final ReentrantLock putLock = new ReentrantLock();

// 与putLock配合使用。当队列已满时,插入元素的操作会等待在这个条件上,直到队列中有空余位置。
@SuppressWarnings("serial") // Classes implementing Condition may be serializable.
private final Condition notFull = putLock.newCondition();

// Node的结构
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 构造一个默认容量的阻塞队列,默认容量为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 构造一个指定容量大阻塞队列,同时维护好last和head指针
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

// 构造一个容量为Integer.MAX_VALUE的队列,并将指定集合中的所有元素添加到队列中。元素将按照集合迭代器的遍历顺序添加。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);,
} finally {
putLock.unlock();
}
}

常用方法

添加方法

offer(E e)

将e插入队列的尾部,插入成功返回true,否则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offer(E e) {
if (e == null) throw new NullPointerException(); // 会抛出空指针
final AtomicInteger count = this.count;
if (count.get() == capacity) // 如果队列已满,直接返回false
return false;
final int c;
final Node<E> node = new Node<E>(e); // 构造一个新的节点,并将item赋值e
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 开始获取插入锁
try {
if (count.get() == capacity) // 再次判断容量是否已满
return false;
enqueue(node); // // 将新节点插入到队列尾部
c = count.getAndIncrement(); // 累计count值
if (c + 1 < capacity) // 说明容量未满
notFull.signal(); // 唤醒等待在 notFull 条件变量上的线程
} finally {
putLock.unlock(); // 释放锁
}
if (c == 0) // 如果队列之前为空,则唤醒等待在 notEmpty 条件变量上的线程
signalNotEmpty();
return true;
}

看下入队的方法,就是将last节点的next指针指向node节点,然后将这个node赋值给last对象。

1
2
3
4
5
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

add(E e)

add方法也是将一个数据添加到队列的尾部,可以看到它底层是调用了offer()方法,只是它会返回添加成功与否。

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

访问方法

peek()

获取头部的元素的value值,但是并不会异常。可以看到获取元素也是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
public E peek() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}

take()

返回头部元素的value值,并移除这个头部节点。如果当前队列为空,就进入阻塞状态,直到唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 获取锁,如果被中断则会抛出异常
try {
while (count.get() == 0) { // 如果队列是空,就阻塞掉
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) // 如果取出元素后队列中还有元素,则唤醒其他等待取元素的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) // 如果取出元素前队列是满的,则唤醒等待插入元素的线程
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

poll()

以非阻塞的放出返回头部元素的值,并移除头部元素。它与take方法的区别就是它不会在线程被终止的时候抛出异常,也不会在队列为空的时候阻塞当前线程,而是返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
final E x;
final int c;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() == 0) // 如果队列是空,就返回null
return null;
x = dequeue();
c = count.getAndDecrement();
if (c > 1) // 如果取出元素后队列中还有元素,则唤醒其他等待取元素的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) // 如果取出元素前队列是满的,则唤醒等待插入元素的线程
signalNotFull();
return x;
}

移除方法

remove()

返回并移除头部元素,可以看到底层调用的是poll()方法,要注意点是如果队列为空时会抛出NoSuchElementException异常。

1
2
3
4
5
6
7
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

remove(E e)

从队列中移除包含指定内容的节点,这种操作时需要加putLock和takeLock,阻止读写操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> pred = head, p = pred.next;
p != null;
pred = p, p = p.next) {// 遍历队列
if (o.equals(p.item)) { // 如果找到内容一致的节点
unlink(p, pred); // 删除节点,并介绍count值
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}

计数方法

计数就是size()方法了。从上面代码可以看出我们在操作队列时都会计算一个count值,这个值是并发安全的,所以队列的数量就是这个值的内容。

1
2
3
public int size() {
return count.get();
}