您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

DelayQueue 源码剖析-细化深入带你理解

时间:2022-10-05 13:51:19  来源:今日头条  作者:小心程序猿QAQ

JAVA中的DelayQueue

DelayQueue类是Java集合框架中的成员。在java.util.concurrent包下,实现了BlockingQueue接口。 他属于一种优先级队列,根据元素的延迟时间进行排序,这就意味着,你只能从队列里取出时间已经到期的元素。 如果没有延迟过期,则轮询将返回 null。 另外,DelayQueue 仅接受属于延迟类型的类或实现
java.util.concurrent.Delayed的那些元素。DelayQueue 在内部阻塞元素,直到某个元素延迟到期。DelayQueue 通过实现 getDelay(TimeUnit.NANOSECONDS) 方法返回剩余延迟时间。传递给 getDelay() 方法的 TimeUnit 实例是一个 Enum,它告诉应以哪个时间单位返回延迟。 TimeUnit 枚举可以采用 DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。此队列不允许空元素。此类及其迭代器实现了Collection和Iterator接口的所有可选方法。方法iterator()中提供的 Iterator 不能保证以任何特定顺序遍历 DelayQueue 的元素。 因为前面说了,他是一种优先级队列。

DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>
复制代码

DelayQueue的层次结构

 

可以看到 DelayQueue 实现了 Iterable、Collection、BlockingQueue、Queue接口。

DelayQueue的成员变量

private final transient ReentrantLock lock = new ReentrantLock();
    
private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition avAIlable = lock.newCondition();

复制代码

从四个属性中可以看出,它本质上就是一个优先级队列! 并且一个线程变量作为 “leader” , 还有个 lock ,都是什么用的呢?我们来分析一下

源码剖析

首先我们看到它有基本的添加元素、修改元素等实现,那么我们从 添加元素 方法看起

add、put、offer

public boolean add(E e) {
    // 调用offer方法
    return offer(e);
}

public void put(E e) {
    // 调用offer方法
    offer(e);
}
    
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

复制代码
  1. 首先使用属性中定义的锁
  2. 将元素添加到优先级队列中
  3. 判断如果peek出来的就是添加的元素,则leader线程置为null,同时唤醒一个正在等待的线程
  4. 返回成功
  5. 最后解锁

看到添加元素的代码比较简单,那么我们拿取的时候,它是如何决定元素的拿取时机的呢?是否是阻塞的呢?我们看下拿去元素的方法,poll和take

poll

/**
 * 检索并移除队列的头元素,如果队列中没有元素达到延迟时间则返回null
 */
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
复制代码

poll() 方法同样是加锁进行拿取,只不过 peek() 出来的元素如果getDelay(NANOSECONDS) 发现当前时间没有达到延迟时间的话,那么不进行返回元素,如果达到了,则把元素 poll() 出来。可以看到,他增加了一个 first.getDelay(NANOSECONDS) > 0 的判断,这就是为什么所有的元素需要实现
java.util.concurrent.Delayed。重写getDelay(TimeUnit unit) 方法,不然它怎么知道怎么才算延迟。

take

take() 方法就有些复杂点了

/**
 * 检索检索并移除此队列的头,如有必要,等待,
 * 直到有元素达到延迟时间。
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取可中断锁
    lock.lockInterruptibly();
    try {
        // 开始进行轮询
        for (;;) {
            // 获取队列头的元素(最早过期的)
            E first = q.peek();
            if (first == null)
                // 如果队列头元素为null,说明队列中没有元素,当前线程进入等待状态
                available.await();
            else {
                // 这里和上面获取过期元素的判断是一样的
                // 获取头元素的过期时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 对比如果过期,则取出元素
                    return q.poll();
                // 如果没有过期,将头元素置为null
                first = null;
                if (leader != null)
                    // 如果有线程在争抢leader线程,进入等待并让出锁
                    available.await();
                else {
                    // 没有争抢发生则将自己set进去
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 并且等待 剩余等待的时间
                        available.awaitNanos(delay);
                    } finally {
                        // 最终释放自己
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果当前没有leader线程在工作的话,并且队列内部有元素,那么唤醒那些正在等待的线程
        if (leader == null && q.peek() != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}
复制代码
  1. 首先看一下队列中是否含有元素
  2. 如果没有元素,那就take操作就会处于等待状态
  3. 如果有元素,并且元素已达到过期时间,就返回该元素(元素从队列移出)
  4. 如果元素没有达到过期时间,将first元素置为null,此时再次走向分支条件:如果当前已经有leader线程了,那么就进入等待,直到被唤醒,否则就把自己设为leader线程,因为先前元素还没到过期时间,那么就等待剩余过期时间的时间,最后释放leader字段。
  5. 最后如果leader为null并且队列中含有元素,那么唤醒其他正在等待的线程
  6. 解锁

poll timeout

// 检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 获取可中断锁
    lock.lockInterruptibly();
    try {
        // 开始进行轮询
        for (;;) {
            // 获取队列头的元素(最早过期的)
            E first = q.peek();
            if (first == null) {
                // 如果队列头元素为null,说明队列中没有元素,
                // 如果设置的TimeUnit 小于等于0,说明不等待直接返回 null
                if (nanos <= 0)
                    return null;
                else
                    // 如果队列头元素为null,说明队列中没有元素
                    // 当前线程进入等待状态
                    // 并重新赋值 nanos
                    nanos = available.awaitNanos(nanos);
            } else {
                // 如果 队列中有元素存在的话
                // 那么就检查一下头元素的过期时间
                // 已经到期了的话,就弹出该元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // 如果 nanos 比 delay 小,说明等待的太短了,再等待一下吧
                // 如果 nanos 比 delay 大,但是 leader 已经有线程占了,再等待一下吧
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    // 进入这里的条件是 nanos 比 delay 大,并且 leader 没有线程占用
                    Thread thisThread = Thread.currentThread();
                    // 把自己设置为 leader
                    leader = thisThread;
                    try {
                        // 等待 delay 时间
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果当前没有leader线程在工作的话,并且队列内部有元素,那么唤醒那些正在等待的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}
复制代码

take()方法相比poll()方法最大的区别就是当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空,或者只等待你指定的时间。

思考

1. 为什么设置了一个leader字段

先看一段官方解释
Thread designated to wait for the element at the head of the queue. 
This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. 
When a thread becomes the leader, it waits only for the next delay to elapse, 
but other threads await indefinitely. 
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim. 
Whenever the head of the queue is replaced with an element with an earlier expiration time, 
the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled. 
So waiting threads must be prepared to acquire and lose leadership while waiting.
复制代码

说了,这个线程被用来等待队列中头部元素。 可以从如下代码体现

    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
        // 并且等待 剩余等待的时间
        available.awaitNanos(delay);
    } finally {
        // 最终释放自己
        if (leader == thisThread)
            leader = null;
    }
复制代码

Leader-Follower 可以最小化不必要的等待时间,我理解的leader-follower模式就是:选取一个线程作为leader线程,leader线程负责监听网络请求,其它线程为follower线程并且处于waiting状态,当leader线程接受到一个请求后,会释放自己给其他follower机会,众多的follower中会产生一个新的leader,剩下没有抢到机会的继续做follower。

而leadler在delayqueue里目的就是为了避免不必要的唤醒和睡眠。如果所有线程都可用,那么它们就会同时被调用,并且只会有一个线程可以真正从队列里获取元素,其他的线程会再度进行睡眠,浪费cpu资源。

2. 多个线程进行take操作,不同线程处于什么状态

打个比方

同一时刻只有一个人上前去领盒饭,而其他人因为已经有人在窗口等饭了就会处于等待状态,并不是一直在窗口问食堂阿姨问个不停,队列就是食堂,队列里的元素就是一个一个的盒饭,每个盒饭都有自己的制作时间,最快做好的盒饭就会优先放在前面的流水线上,时间一到,上去领盒饭的人领到自己的盒饭,就下去了( leader == null ),然后一看,食堂还有饭!就冲着下面那些人喊,还有饭,快上来拿( available.signal() )! 然后下面的人纷纷上来(争抢锁),那么抢到锁的线程就会持有锁,有机会询问阿姨是否还有饭( peek() ),这时,阿姨有可能说

“有,但还没做好”

“啥时候好呀?”

"再等五分钟吧,别着急"

然后这时,当前线程就会设置自己为leader,并等待五分钟( available.awaitNanos(delay) ),这时!锁被释放,然后就会有人上来问阿姨,但是一看,这leader有人把持着呢,那再等等吧……( if (leader != null) )再然后呢,刚才那个等五分钟的人判断自己是leader,会释放leader。

直到饭好了,有人领到盒饭。 那么等五分钟那老哥会不会领到他刚才问的饭呢?答案是不一定,毕竟follower并没有排队。

3. 为什么first会置为null

看了上面的分析不知道有没有理解, 那么我们看一下简略代码是这样的:

    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    for (;;) {
        E first = q.peek();
        long delay = first.getDelay(NANOSECONDS);
        if (delay <= 0) {
            first = null;
            available.await();
        }
    }
复制代码

for循环里首先peek出来队列中的头元素,如果这个元素没有过期,假设这里不置为null的话,当前线程就持有一个first,然后 await,那么下次再 进入循环还会 peek 一次,而之前的peek依然在内存中,所以没必要多持有一个,所以如果不满足条件(没有到期)则置为null,然后再 await。就像注释里说的 don't retain ref while waiting

原文链接:
https://juejin.cn/post/7150517172987494413



Tags:DelayQueue   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
DelayQueue 源码剖析-细化深入带你理解
Java中的DelayQueueDelayQueue类是Java集合框架中的成员。在java.util.concurrent包下,实现了BlockingQueue接口。 他属于一种优先级队列,根据元素的延迟时间进行排序,这就意味...【详细内容】
2022-10-05  Search: DelayQueue  点击:(334)  评论:(0)  加入收藏
▌简易百科推荐
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(2)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(7)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(13)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(9)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(11)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(9)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条