博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java高并发设计(十)--java安全集合BlockingQueue
阅读量:5902 次
发布时间:2019-06-19

本文共 8720 字,大约阅读时间需要 29 分钟。

hot3.png

接着上个章节我们继续讲解java安全集合中的队列内容,这里只对常用的容器做详细的介绍,其他的有个概念,真正碰到使用场景再好好研究一下,上面提到过java安全队列中的主要实现如下:

    ArrayBlockingQueue 数组有界的队列

    LinkedBlockingQueue 列表结构的队列

    DelayQueue 延迟队列

    PriorityBlockingQueue 优先级别的队列

    SynchronousQueue 同步队列,容量为1

ArrayBlockingQueue:基于数组的有界阻塞队列,内部实现将对象放入到一个数组中进行操作。并且它是有界的队列,初始化时必须指定大小,后期无法修改,执行的规则是先进先出的规则。

ArrayBlockingQueue
queue = new ArrayBlockingQueue<>(20); queue.put("wang"); queue.put("wang1"); queue.put("wang2"); //单次输出永远是wang System.out.println(queue.take());

ArrayBlockingQueue的底层实现我们可以看下源码的构造:

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { //我主要调几个重要的方法来说明下底层实现,如果想了解的更多,请看下源代码的实现 private static final long serialVersionUID = -817911632652898426L; //内部数据的存储对象,数组 final Object[] items; //外部调用take时取得数组中的下标位置数据 int takeIndex; //外部调用put时放置数据的下标位置数据 int putIndex; //当前容器的实际数据大小 int count; //全局锁 final ReentrantLock lock; //判断是否为空时产生的阻塞 private final Condition notEmpty; //判断是否已经达到边界时的阻塞 private final Condition notFull; final int inc(int i) { return (++i == items.length) ? 0 : i; } final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } //构造函数,必须指定容量 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //实际的构造函数,该函数会初始化内部变量 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //核心的put操作,验证不能为空,并且使用锁机制,然后验证是否超边界值,然后是插入数据,最后释放锁 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } //insert很简单,只是放置数据,并且对影响的内部变量进行修改 private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } //核心取数据的操作,主要实现是extract的操作 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.
cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }}

LinkedBlockingQueue:内部是以链式结构存储的数据对象,该对象的初始化可以指定边界值,如果没有指定默认是很大的,Integer.MAX_VALUE。我们直接看源码解释:

public class LinkedBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; //链表的数据结构, static class Node
{ E item; Node
next; Node(E x) { item = x; } } //容器的边界 private final int capacity; //当前变量的数量 private final AtomicInteger count = new AtomicInteger(0); //链表中的头部数据 private transient Node
head; //链表中的尾部数据 private transient Node
last; //取数据锁 private final ReentrantLock takeLock = new ReentrantLock(); //取是非空的状态 private final Condition notEmpty = takeLock.newCondition(); //放置数据的锁 private final ReentrantLock putLock = new ReentrantLock(); //放置数据是不能超过边界的状态 private final Condition notFull = putLock.newCondition(); //全局操作非空的安全机制 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } //全局操作非满边界的机制 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //构造行数,空参的是默认最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node
(null); } //当前容器的实际数据量大小 public int size() { return count.get(); } //放置数据的核心实现 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; //初始化当前链表结构的当前节点数据 Node
node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获得放置锁 putLock.lockInterruptibly(); try { //放置数据前提是判断是否到达边界 while (count.get() == capacity) { notFull.await(); } //放置数据的底层实现 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } //放置在最后的数据 private void enqueue(Node
node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //取数据的核心实现 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获得取锁 takeLock.lockInterruptibly(); try { //首先要确定容器有数据 while (count.get() == 0) { notEmpty.await(); } //取数据 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //取得头部数据的底层实现 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node
h = head; Node
first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }}

DelayQueue:对元素进行持有直到一个特定的延迟到期.DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。需要注意的是该延迟的对象是Delayed接口的实现对象。

public class DelayQueue
extends AbstractQueue
implements BlockingQueue
{ private transient final ReentrantLock lock = new ReentrantLock(); private final PriorityQueue
q = new PriorityQueue
(); //该容器应用场景较少,后期有时间了再和朋友一起看看}public interface Delayed extends Comparable
{ //Comparable的接口是可比较的顶层设计 long getDelay(TimeUnit unit);}

PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。

        声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:

  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; //内部类,主要实现Shared internal API for dual stacks and queues abstract static class Transferer { abstract Object transfer(Object e, boolean timed, long nanos); } //cpu数量 static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; static final int maxUntimedSpins = maxTimedSpins * 16; static final long spinForTimeoutThreshold = 1000L; //Dual stack static final class TransferStack extends Transferer { //省略内部实现 } /** Dual Queue */ static final class TransferQueue extends Transferer { //省略内部实现 } private transient volatile Transferer transferer; //空参构造,默认是false,非公平锁机制 public SynchronousQueue() { this(false); } //指定类型的锁机制,两种实现方式的截然不同的 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } //放置对象的设置,主要体现在内部类的初始化上,会根据情况自动获得相应的操作 public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); if (transferer.transfer(o, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } //取数据的核心实现 public E take() throws InterruptedException { Object e = transferer.transfer(null, false, 0); if (e != null) return (E)e; Thread.interrupted(); throw new InterruptedException(); }}

 

转载于:https://my.oschina.net/wangshuaixin/blog/827125

你可能感兴趣的文章
安装gulp及相关插件
查看>>
如何在Linux用chmod来修改所有子目录中的文件属性?
查看>>
Hyper-V 2016 系列教程30 机房温度远程监控方案
查看>>
笔记:认识.NET平台
查看>>
SCCM 2016 配置管理系列(Part8)
查看>>
我的友情链接
查看>>
python基础教程_学习笔记19:标准库:一些最爱——集合、堆和双端队列
查看>>
js replace,正则截取字符串内容
查看>>
javascript继承方式详解
查看>>
lnmp环境搭建
查看>>
自定义session扫描器精确控制session销毁时间--学习笔记
查看>>
PHP队列的实现
查看>>
单点登录加验证码例子
查看>>
[T-SQL]从变量与数据类型说起
查看>>
occActiveX - ActiveX with OpenCASCADE
查看>>
BeanUtils\DBUtils
查看>>
python模块--os模块
查看>>
Java 数组在内存中的结构
查看>>
《关爱码农成长计划》第一期报告
查看>>
学习进度表 04
查看>>