接着上个章节我们继续讲解java安全集合中的队列内容,这里只对常用的容器做详细的介绍,其他的有个概念,真正碰到使用场景再好好研究一下,上面提到过java安全队列中的主要实现如下:
ArrayBlockingQueue 数组有界的队列
LinkedBlockingQueue 列表结构的队列
DelayQueue 延迟队列
PriorityBlockingQueue 优先级别的队列
SynchronousQueue 同步队列,容量为1
ArrayBlockingQueue:基于数组的有界阻塞队列,内部实现将对象放入到一个数组中进行操作。并且它是有界的队列,初始化时必须指定大小,后期无法修改,执行的规则是先进先出的规则。
ArrayBlockingQueuequeue = new ArrayBlockingQueue<>(20); queue.put("wang"); queue.put("wang1"); queue.put("wang2"); //单次输出永远是wang System.out.println(queue.take());
ArrayBlockingQueue的底层实现我们可以看下源码的构造:
public class ArrayBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { //我主要调几个重要的方法来说明下底层实现,如果想了解的更多,请看下源代码的实现 private static final long serialVersionUID = -817911632652898426L; //内部数据的存储对象,数组 final Object[] items; //外部调用take时取得数组中的下标位置数据 int takeIndex; //外部调用put时放置数据的下标位置数据 int putIndex; //当前容器的实际数据大小 int count; //全局锁 final ReentrantLock lock; //判断是否为空时产生的阻塞 private final Condition notEmpty; //判断是否已经达到边界时的阻塞 private final Condition notFull; final int inc(int i) { return (++i == items.length) ? 0 : i; } final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } //构造函数,必须指定容量 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //实际的构造函数,该函数会初始化内部变量 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //核心的put操作,验证不能为空,并且使用锁机制,然后验证是否超边界值,然后是插入数据,最后释放锁 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } //insert很简单,只是放置数据,并且对影响的内部变量进行修改 private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } //核心取数据的操作,主要实现是extract的操作 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this. cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }}
LinkedBlockingQueue:内部是以链式结构存储的数据对象,该对象的初始化可以指定边界值,如果没有指定默认是很大的,Integer.MAX_VALUE。我们直接看源码解释:
public class LinkedBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; //链表的数据结构, static class Node { E item; Node next; Node(E x) { item = x; } } //容器的边界 private final int capacity; //当前变量的数量 private final AtomicInteger count = new AtomicInteger(0); //链表中的头部数据 private transient Node head; //链表中的尾部数据 private transient Node last; //取数据锁 private final ReentrantLock takeLock = new ReentrantLock(); //取是非空的状态 private final Condition notEmpty = takeLock.newCondition(); //放置数据的锁 private final ReentrantLock putLock = new ReentrantLock(); //放置数据是不能超过边界的状态 private final Condition notFull = putLock.newCondition(); //全局操作非空的安全机制 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } //全局操作非满边界的机制 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //构造行数,空参的是默认最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node (null); } //当前容器的实际数据量大小 public int size() { return count.get(); } //放置数据的核心实现 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; //初始化当前链表结构的当前节点数据 Node node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获得放置锁 putLock.lockInterruptibly(); try { //放置数据前提是判断是否到达边界 while (count.get() == capacity) { notFull.await(); } //放置数据的底层实现 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } //放置在最后的数据 private void enqueue(Node node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //取数据的核心实现 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获得取锁 takeLock.lockInterruptibly(); try { //首先要确定容器有数据 while (count.get() == 0) { notEmpty.await(); } //取数据 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //取得头部数据的底层实现 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node h = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }}
DelayQueue:对元素进行持有直到一个特定的延迟到期.DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。需要注意的是该延迟的对象是Delayed接口的实现对象。
public class DelayQueueextends AbstractQueue implements BlockingQueue { private transient final ReentrantLock lock = new ReentrantLock(); private final PriorityQueue q = new PriorityQueue (); //该容器应用场景较少,后期有时间了再和朋友一起看看}public interface Delayed extends Comparable { //Comparable的接口是可比较的顶层设计 long getDelay(TimeUnit unit);}
PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略; 但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理public class SynchronousQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; //内部类,主要实现Shared internal API for dual stacks and queues abstract static class Transferer { abstract Object transfer(Object e, boolean timed, long nanos); } //cpu数量 static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; static final int maxUntimedSpins = maxTimedSpins * 16; static final long spinForTimeoutThreshold = 1000L; //Dual stack static final class TransferStack extends Transferer { //省略内部实现 } /** Dual Queue */ static final class TransferQueue extends Transferer { //省略内部实现 } private transient volatile Transferer transferer; //空参构造,默认是false,非公平锁机制 public SynchronousQueue() { this(false); } //指定类型的锁机制,两种实现方式的截然不同的 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } //放置对象的设置,主要体现在内部类的初始化上,会根据情况自动获得相应的操作 public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); if (transferer.transfer(o, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } //取数据的核心实现 public E take() throws InterruptedException { Object e = transferer.transfer(null, false, 0); if (e != null) return (E)e; Thread.interrupted(); throw new InterruptedException(); }}