菜单

注册免费送38元体验金闲话并发

2019年3月24日 - 注册免费送38元体验金

3. 绿灯队列的兑现原理

聊天并发(七)——Java中的阻塞队列

注册免费送38元体验金 1

作者
方腾飞
发布于 2013年12月18日 |
ArchSummit全世界架构师高峰会议(新加坡站)二〇一六年二月02-0三日设立,打听更加多详情!
2 讨论

1. 哪些是阻塞队列?

堵塞队列(BlockingQueue)是三个帮衬八个叠加操作的体系。那三个叠加的操作是:在队列为空时,获取成分的线程会等待队列变为非空。当队列满时,存款和储蓄成分的线程会等待队列可用。阻塞队列常用于生产者和消费者的景色,生产者是往队列里添美元素的线程,消费者是从队列里拿成分的线程。阻塞队列正是劳动者存放成分的容器,而消费者也只从容器里拿成分。

闭塞队列提供了各类处理办法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

2. Java里的封堵队列

相关厂商内容

跟技术大腕,侃侃容器那多少个事情!

至于红包、SSD云盘等主题技术集锦!

大数额的新玩法

58到家技术架构神速规划与出生

UCloud UCan技术夜,与“老开车员”共侃云服务技术实施与风向

JDK7提供了九个闭塞队列。分别是

ArrayBlockingQueue是三个用数组达成的有界阻塞队列。此行列遵照先进先出(FIFO)的规范对成分实行排序。暗许情形下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的兼具生产者线程或顾客线程,当队列可用时,能够依据阻塞的先后顺序访问队列,即先阻塞的生产者线程,能够先往队列里插入元素,先堵塞的买主线程,能够先从队列里取得成分。平日情形下为了保障公平性会下落吞吐量。我们得以应用以下代码制造3位己一视的短路队列:

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

访问者的公平性是利用可重入锁落成的,代码如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
}

LinkedBlockingQueue是3个用链表达成的有界阻塞队列。此行列的暗许和最大尺寸为Integer.MAX_VALUE。此行列依照先进先出的口径对成分进行排序。

PriorityBlockingQueue是3个支撑先行级的无界队列。暗许意况下成分选用自然顺序排列,也得以透过相比较器comparator来内定成分的排序规则。成分根据升序排列。

DelayQueue是四个协理延时获取成分的无界阻塞队列。队列使用PriorityQueue来达成。队列中的成分必须完成Delayed接口,在创建成分时得以钦点多短期才能从队列中获得当前因素。唯有在延迟期满时才能从队列中领取成分。大家得以将DelayQueue运用在偏下应用场景:

队列中的Delayed必须达成compareTo来钦命成分的逐条。比如让延时时光最长的位于队列的末梢。实现代码如下:

public int compareTo(Delayed other) {
           if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask x = (ScheduledFutureTask)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
       else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }

什么贯彻Delayed接口

作者们能够参照ScheduledThreadPoolExecutor里ScheduledFutureTask类。那一个类实现了Delayed接口。首先:在指标创造的时候,使用time记录前对象如曾几何时候能够应用,代码如下:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
}

下一场利用getDelay能够查询当前因素还索要延时多长期,代码如下:

public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

经过构造函数可以看看延迟时间参数ns的单位是飞秒,本人统筹的时候最棒应用皮秒,因为getDelay时能够内定任意单位,一旦以阿秒作为单位,而延时的岁月又准确不到阿秒就麻烦了。使用时请注意当time小于当前时刻时,getDelay会再次回到负数。

怎么样落到实处延时队列

延时队列的落到实处很简短,当顾客从队列里获得成分时,假使成分没有高达延时时间,就短路当前线程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    else if (leader != null)
                        available.await();

SynchronousQueue是2个不存款和储蓄成分的围堵队列。每二个put操作必须等待二个take操作,不然无法接二连三添法郎素。SynchronousQueue能够用作是3个传球手,负责把劳动者线程处理的数据直接传送给消费者线程。队列自身并不存款和储蓄任何因素,相当适合于传递性场景,比如在一个线程中应用的数码,传递给其余1个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue
和 ArrayBlockingQueue。

LinkedTransferQueue是三个由链表结构构成的无界阻塞TransferQueue队列。相对于别的阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。假如当前有顾客正在等待接受成分(消费者选取take()方法或带时限的poll()方法时),transfer方法能够把劳动者传入的要素登时transfer(传输)给消费者。尽管没有消费者在伺机接受成分,transfer方法会将成分存放在队列的tail节点,并等到该因素被消费者消费了才回来。transfer方法的主要代码如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

先是行代码是总计把存放在当前成分的s节点作为tail节点。第2行代码是让CPU自旋等待买主消费成分。因为自旋会消耗CPU,所以自旋一定的次数后使用Thread.yield()方法来刹车当前正值执行的线程,并执行其它线程。

tryTransfer方法。则是用来试探下生产者传入的因素是不是能平素传给消费者。若是没有顾客等待接受成分,则赶回false。和transfer方法的界别是tryTransfer方法无论消费者是还是不是接到,方法立刻再次来到。而transfer方法是必须等到消费者消费了才回来。

对于富含时限的tryTransfer(E e, long timeout, TimeUnit
unit)方法,则是总计把劳动者传入的元素直接传给消费者,但是借使没有消费者消费该成分则等待钦定的时刻再回去,要是超时还没消费成分,则赶回false,若是在逾期时间内消费了成分,则赶回true。

LinkedBlockingDeque是三个由链表结构构成的双向阻塞队列。所谓双向队列指的你可以从队列的两岸插入和移出成分。双端队列因为多了四个操作队列的输入,在八线程同时入队时,也就减弱了大体上的竞争。相比较别的的鸿沟队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等办法,以First单词结尾的艺术,表示插入,获取(peek)或移除双端队列的率先个要素。以Last单词结尾的格局,表示插入,获取或移除双端队列的终极多个因素。其它插入方法add等同于addLast,移除方法remove等效于removeFirst。可是take方法却一如既往takeFirst,不了解是否Jdk的bug,使用时依旧用含有First和Last后缀的点子更掌握。

在早先化LinkedBlockingDeque时得以设置体量制止其联网膨胀。此外双向阻塞队列能够使用在“工作窃取”格局中。

3. 封堵队列的落到实处原理

一经队列是空的,消费者会直接等候,当生产者添美成分时候,消费者是怎么晓稳妥前队列有成分的吧?借使让你来统一筹划阻塞队列你会什么安顿,让劳动者和顾客可以高功用的进展报导呢?让大家先来看望JDK是什么贯彻的。

利用布告情势完成。所谓文告方式,便是当生产者往满的系列里添比索素时会阻塞住生产者,当顾客消费了七个行列中的成分后,会通报劳动者当前队列可用。通过查看JDK源码发现ArrayBlockingQueue使用了Condition来兑现,代码如下:

private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {
        //省略其他代码
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
}

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
  } finally {
            lock.unlock();
        }
}

private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

当大家往队列里插入叁个要素时,借使队列不可用,阻塞生产者重要通过LockSupport.park(this);来贯彻

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);
        }

接轨进入源码,发现调用setBlocker先保存下就要阻塞的线程,然后调用unsafe.park阻塞当前线程。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

unsafe.park是个native方法,代码如下:

public native void park(boolean isAbsolute, long time);

park这么些方法会阻塞当前线程,只有以下各样情状中的一种发生时,该方式才会回到。

咱俩后续看一下JVM是如何促成park方法的,park在差别的操作系统使用差别的法子贯彻,在linux下是运用的是系统方法pthread_cond_wait完结。达成代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的
os::Platform伊夫nt::park方法,代码如下:

void os::PlatformEvent::park() {      
             int v ;
         for (;;) {
        v = _Event ;
         if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
         }
         guarantee (v >= 0, "invariant") ;
         if (v == 0) {
         // Do this the hard way by blocking ...
         int status = pthread_mutex_lock(_mutex);
         assert_status(status == 0, status, "mutex_lock");
         guarantee (_nParked == 0, "invariant") ;
         ++ _nParked ;
         while (_Event < 0) {
         status = pthread_cond_wait(_cond, _mutex);
         // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
         // Treat this the same as if the wait was interrupted
         if (status == ETIME) { status = EINTR; }
         assert_status(status == 0 || status == EINTR, status, "cond_wait");
         }
         -- _nParked ;

         // In theory we could move the ST of 0 into _Event past the unlock(),
         // but then we'd need a MEMBAR after the ST.
         _Event = 0 ;
         status = pthread_mutex_unlock(_mutex);
         assert_status(status == 0, status, "mutex_unlock");
         }
         guarantee (_Event >= 0, "invariant") ;
         }

     }

pthread_cond_wait是三个十二线程的尺度变量函数,cond是condition的缩写,字面意思可以明白为线程在伺机一个准绳发出,这些原则是2个全局变量。这么些艺术接收三个参数,3个共享变量_cond,3个互斥量_mutex。而unpark方法在linux下是应用pthread_cond_signal完结的。park
在windows下则是选择WaitForSingleObject完毕的。

当队列满时,生产者往阻塞队列里插入一个成分,生产者线程会进去WAITING
(parking)状态。我们得以选取jstack dump阻塞的生产者线程看到那一点:

"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
        at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)

4. 参考资料

5. 小编介绍

方腾飞,花名清英,并发编制程序网站站长。方今在阿里巴巴(Alibaba)微贷事业部工作。并发编制程序网:http://ifeve.com,个人博客园:http://weibo.com/kirals,欢迎通过自笔者的天涯论坛进行技术调换。

注册免费送38元体验金,感谢张龙对本文的审校。

给InfoQ中文站投稿只怕参加内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎我们经过腾讯网和讯(@InfoQ)恐怕腾讯新浪(@InfoQ)关切大家,并与大家的编辑和任何读者对象调换。

【ArchSummit东京2015】“双11”过后,各电商巨头对技术展现如何计算?机器学习大热,业务与之组成会擦出如何的火舌?摄像直播与音信资源信息,新工作须要哪些的新技巧与之配备?业务相对,技术不错分歧,在年初的技能擂台上,各家将呈现怎样的技巧玫瑰花锏…ArchSummit开幕倒计时,当即查看详情>>

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图