(1)DelayQueue是阻塞隊(duì)列嗎?
創(chuàng)新互聯(lián)長(zhǎng)期為近千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為承留企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè),承留網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。(2)DelayQueue的實(shí)現(xiàn)方式?
(3)DelayQueue主要用于什么場(chǎng)景?
DelayQueue是java并發(fā)包下的延時(shí)阻塞隊(duì)列,常用于實(shí)現(xiàn)定時(shí)任務(wù)。
從繼承體系可以看到,DelayQueue實(shí)現(xiàn)了BlockingQueue,所以它是一個(gè)阻塞隊(duì)列。
另外,DelayQueue還組合了一個(gè)叫做Delayed的接口,DelayQueue中存儲(chǔ)的所有元素必須實(shí)現(xiàn)Delayed接口。
那么,Delayed是什么呢?
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed是一個(gè)繼承自Comparable的接口,并且定義了一個(gè)getDelay()方法,用于表示還有多少時(shí)間到期,到期了應(yīng)返回小于等于0的數(shù)值。
// 用于控制并發(fā)的鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 優(yōu)先級(jí)隊(duì)列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于標(biāo)記當(dāng)前是否有線程在排隊(duì)(僅用于取元素時(shí))
private Thread leader = null;
// 條件,用于表示現(xiàn)在是否有可取的元素
private final Condition available = lock.newCondition();
從屬性我們可以知道,延時(shí)隊(duì)列主要使用優(yōu)先級(jí)隊(duì)列來(lái)實(shí)現(xiàn),并輔以重入鎖和條件來(lái)控制并發(fā)安全。
因?yàn)閮?yōu)先級(jí)隊(duì)列是×××的,所以這里只需要一個(gè)條件就可以了。
還記得優(yōu)先級(jí)隊(duì)列嗎?點(diǎn)擊鏈接直達(dá)【死磕 java集合之PriorityQueue源碼分析】
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
構(gòu)造方法比較簡(jiǎn)單,一個(gè)默認(rèn)構(gòu)造方法,一個(gè)初始化添加集合c中所有元素的構(gòu)造方法。
因?yàn)镈elayQueue是阻塞隊(duì)列,且優(yōu)先級(jí)隊(duì)列是×××的,所以入隊(duì)不會(huì)阻塞不會(huì)超時(shí),因此它的四個(gè)入隊(duì)方法是一樣的。
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
入隊(duì)方法比較簡(jiǎn)單:
(1)加鎖;
(2)添加元素到優(yōu)先級(jí)隊(duì)列中;
(3)如果添加的元素是堆頂元素,就把leader置為空,并喚醒等待在條件available上的線程;
(4)解鎖;
因?yàn)镈elayQueue是阻塞隊(duì)列,所以它的出隊(duì)有四個(gè)不同的方法,有拋出異常的,有阻塞的,有不阻塞的,有超時(shí)的。
我們這里主要分析兩個(gè),poll()和take()方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
poll()方法比較簡(jiǎn)單:
(1)加鎖;
(2)檢查第一個(gè)元素,如果為空或者還沒(méi)到期,就返回null;
(3)如果第一個(gè)元素到期了就調(diào)用優(yōu)先級(jí)隊(duì)列的poll()彈出第一個(gè)元素;
(4)解鎖。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆頂元素
E first = q.peek();
// 如果堆頂元素為空,說(shuō)明隊(duì)列中還沒(méi)有元素,直接阻塞等待
if (first == null)
available.await();
else {
// 堆頂元素的到期時(shí)間
long delay = first.getDelay(NANOSECONDS);
// 如果小于0說(shuō)明已到期,直接調(diào)用poll()方法彈出堆頂元素
if (delay <= 0)
return q.poll();
// 如果delay大于0 ,則下面要阻塞了
// 將first置為空方便gc,因?yàn)橛锌赡芷渌貜棾隽诉@個(gè)元素
// 這里還持有著引用不會(huì)被清理
first = null; // don't retain ref while waiting
// 如果前面有其它線程在等待,直接進(jìn)入等待
if (leader != null)
available.await();
else {
// 如果leader為null,把當(dāng)前線程賦值給它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay時(shí)間后自動(dòng)醒過(guò)來(lái)
// 醒過(guò)來(lái)后把leader置空并重新進(jìn)入循環(huán)判斷堆頂元素是否到期
// 這里即使醒過(guò)來(lái)后也不一定能獲取到元素
// 因?yàn)橛锌赡芷渌€程先一步獲取了鎖并彈出了堆頂元素
// 條件鎖的喚醒分成兩步,先從Condition的隊(duì)列里出隊(duì)
// 再入隊(duì)到AQS的隊(duì)列中,當(dāng)其它線程調(diào)用LockSupport.unpark(t)的時(shí)候才會(huì)真正喚醒
// 關(guān)于AQS我們后面會(huì)講的^^
available.awaitNanos(delay);
} finally {
// 如果leader還是當(dāng)前線程就把它置為空,讓其它線程有機(jī)會(huì)獲取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出隊(duì)后,如果leader為空且堆頂還有元素,就喚醒下一個(gè)等待的線程
if (leader == null && q.peek() != null)
// signal()只是把等待的線程放到AQS的隊(duì)列里面,并不是真正的喚醒
available.signal();
// 解鎖,這才是真正的喚醒
lock.unlock();
}
}
take()方法稍微要復(fù)雜一些:
(1)加鎖;
(2)判斷堆頂元素是否為空,為空的話直接阻塞等待;
(3)判斷堆頂元素是否到期,到期了直接調(diào)用優(yōu)先級(jí)隊(duì)列的poll()彈出元素;
(4)沒(méi)到期,再判斷前面是否有其它線程在等待,有則直接等待;
(5)前面沒(méi)有其它線程在等待,則把自己當(dāng)作第一個(gè)線程等待delay時(shí)間后喚醒,再嘗試獲取元素;
(6)獲取到元素之后再喚醒下一個(gè)等待的線程;
(7)解鎖;
說(shuō)了那么多,是不是還是不知道怎么用呢?那怎么能行,請(qǐng)看下面的案例:
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<Message> queue = new DelayQueue<>();
long now = System.currentTimeMillis();
// 啟動(dòng)一個(gè)線程從隊(duì)列中取元素
new Thread(()->{
while (true) {
try {
// 將依次打印1000,2000,5000,7000,8000
System.out.println(queue.take().deadline - now);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 添加5個(gè)元素到隊(duì)列中
queue.add(new Message(now + 5000));
queue.add(new Message(now + 8000));
queue.add(new Message(now + 2000));
queue.add(new Message(now + 1000));
queue.add(new Message(now + 7000));
}
}
class Message implements Delayed {
long deadline;
public Message(long deadline) {
this.deadline = deadline;
}
@Override
public long getDelay(TimeUnit unit) {
return deadline - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return String.valueOf(deadline);
}
}
是不是很簡(jiǎn)單,越早到期的元素越先出隊(duì)。
(1)DelayQueue是阻塞隊(duì)列;
(2)DelayQueue內(nèi)部存儲(chǔ)結(jié)構(gòu)使用優(yōu)先級(jí)隊(duì)列;
(3)DelayQueue使用重入鎖和條件來(lái)控制并發(fā)安全;
(4)DelayQueue常用于定時(shí)任務(wù);
java中的線程池實(shí)現(xiàn)定時(shí)任務(wù)是直接用的DelayQueue嗎?
當(dāng)然不是,ScheduledThreadPoolExecutor中使用的是它自己定義的內(nèi)部類DelayedWorkQueue,其實(shí)里面的實(shí)現(xiàn)邏輯基本都是一樣的,只不過(guò)DelayedWorkQueue里面沒(méi)有使用現(xiàn)成的PriorityQueue,而是使用數(shù)組又實(shí)現(xiàn)了一遍優(yōu)先級(jí)隊(duì)列,本質(zhì)上沒(méi)有什么區(qū)別。
歡迎關(guān)注我的公眾號(hào)“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
本文題目:死磕java集合之DelayQueue源碼分析-創(chuàng)新互聯(lián)
鏈接分享:http://www.rwnh.cn/article24/dcddce.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、網(wǎng)站收錄、App開(kāi)發(fā)、品牌網(wǎng)站設(shè)計(jì)、企業(yè)網(wǎng)站制作、動(dòng)態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容