内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

死磕java集合之LinkedTransferQueue源碼分析

問題

(1)LinkedTransferQueue是什么東東?

為西峽等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計制作服務(wù),及西峽網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站制作、網(wǎng)站設(shè)計、西峽網(wǎng)站設(shè)計,以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!

(2)LinkedTransferQueue是怎么實現(xiàn)阻塞隊列的?

(3)LinkedTransferQueue是怎么控制并發(fā)安全的?

(4)LinkedTransferQueue與SynchronousQueue有什么異同?

簡介

LinkedTransferQueue是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合體,它綜合了這三者的方法,并且提供了更加高效的實現(xiàn)方式。

繼承體系

死磕 java集合之LinkedTransferQueue源碼分析

LinkedTransferQueue實現(xiàn)了TransferQueue接口,而TransferQueue接口是繼承自BlockingQueue的,所以LinkedTransferQueue也是一個阻塞隊列。

TransferQueue接口中定義了以下幾個方法:

// 嘗試移交元素
boolean tryTransfer(E e);
// 移交元素
void transfer(E e) throws InterruptedException;
// 嘗試移交元素(有超時時間)
boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;
// 判斷是否有消費者
boolean hasWaitingConsumer();
// 查看消費者的數(shù)量
int getWaitingConsumerCount();

主要是定義了三個移交元素的方法,有阻塞的,有不阻塞的,有超時的。

存儲結(jié)構(gòu)

LinkedTransferQueue使用了一個叫做dual data structure的數(shù)據(jù)結(jié)構(gòu),或者叫做dual queue,譯為雙重數(shù)據(jù)結(jié)構(gòu)或者雙重隊列。

雙重隊列是什么意思呢?

放取元素使用同一個隊列,隊列中的節(jié)點具有兩種模式,一種是數(shù)據(jù)節(jié)點,一種是非數(shù)據(jù)節(jié)點。

放元素時先跟隊列頭節(jié)點對比,如果頭節(jié)點是非數(shù)據(jù)節(jié)點,就讓他們匹配,如果頭節(jié)點是數(shù)據(jù)節(jié)點,就生成一個數(shù)據(jù)節(jié)點放在隊列尾端(入隊)。

取元素時也是先跟隊列頭節(jié)點對比,如果頭節(jié)點是數(shù)據(jù)節(jié)點,就讓他們匹配,如果頭節(jié)點是非數(shù)據(jù)節(jié)點,就生成一個非數(shù)據(jù)節(jié)點放在隊列尾端(入隊)。

用圖形來表示就是下面這樣:

死磕 java集合之LinkedTransferQueue源碼分析

不管是放元素還是取元素,都先跟頭節(jié)點對比,如果二者模式不一樣就匹配它們,如果二者模式一樣,就入隊。

源碼分析

主要屬性

// 頭節(jié)點
transient volatile Node head;
// 尾節(jié)點
private transient volatile Node tail;
// 放取元素的幾種方式:
// 立即返回,用于非超時的poll()和tryTransfer()方法中
private static final int NOW   = 0; // for untimed poll, tryTransfer
// 異步,不會阻塞,用于放元素時,因為內(nèi)部使用×××單鏈表存儲元素,不會阻塞放元素的過程
private static final int ASYNC = 1; // for offer, put, add
// 同步,調(diào)用的時候如果沒有匹配到會阻塞直到匹配到為止
private static final int SYNC  = 2; // for transfer, take
// 超時,用于有超時的poll()和tryTransfer()方法中
private static final int TIMED = 3; // for timed poll, tryTransfer

主要內(nèi)部類

static final class Node {
    // 是否是數(shù)據(jù)節(jié)點(也就標(biāo)識了是生產(chǎn)者還是消費者)
    final boolean isData;   // false if this is a request node
    // 元素的值
    volatile Object item;   // initially non-null if isData; CASed to match
    // 下一個節(jié)點
    volatile Node next;
    // 持有元素的線程
    volatile Thread waiter; // null until waiting
}

典型的單鏈表結(jié)構(gòu),內(nèi)部除了存儲元素的值和下一個節(jié)點的指針外,還包含了是否為數(shù)據(jù)節(jié)點和持有元素的線程。

內(nèi)部通過isData區(qū)分是生產(chǎn)者還是消費者。

主要構(gòu)造方法

public LinkedTransferQueue() {
}

public LinkedTransferQueue(Collection<? extends E> c) {
    this();
    addAll(c);
}

只有這兩個構(gòu)造方法,且沒有初始容量,所以是×××的一個阻塞隊列。

入隊

四個方法都是一樣的,使用異步的方式調(diào)用xfer()方法,傳入的參數(shù)都一模一樣。

public void put(E e) {
    // 異步模式,不會阻塞,不會超時
    // 因為是放元素,單鏈表存儲,會一直往后加
    xfer(e, true, ASYNC, 0);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

xfer(E e, boolean haveData, int how, long nanos)的參數(shù)分別是:

(1)e表示元素;

(2)haveData表示是否是數(shù)據(jù)節(jié)點,

(3)how表示放取元素的方式,上面提到的四種,NOW、ASYNC、SYNC、TIMED;

(4)nanos表示超時時間;

出隊

出隊的四個方法也是直接或間接的調(diào)用xfer()方法,放取元素的方式和超時規(guī)則略微不同,本質(zhì)沒有大的區(qū)別。

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}
public E take() throws InterruptedException {
    // 同步模式,會阻塞直到取到元素
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 有超時時間
    E e = xfer(null, false, TIMED, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    // 立即返回,沒取到元素返回null
    return xfer(null, false, NOW, 0);
}

取元素就各有各的玩法了,有同步的,有超時的,有立即返回的。

移交元素的方法

public boolean tryTransfer(E e) {
    // 立即返回
    return xfer(e, true, NOW, 0) == null;
}

public void transfer(E e) throws InterruptedException {
    // 同步模式
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 有超時時間
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

請注意第二個參數(shù),都是true,也就是這三個方法其實也是放元素的方法。

這里xfer()方法的幾種模式到底有什么區(qū)別呢?請看下面的分析。

神奇的xfer()方法

private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允許放入空元素
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed
    // 外層循環(huán),自旋,失敗就重試
    retry:
    for (;;) {                            // restart on append race

        // 下面這個for循環(huán)用于控制匹配的過程
        // 同一時刻隊列中只會存儲一種類型的節(jié)點
        // 從頭節(jié)點開始嘗試匹配,如果頭節(jié)點被其它線程先一步匹配了
        // 就再嘗試其下一個,直到匹配到為止,或者到隊列中沒有元素為止

        for (Node h = head, p = h; p != null;) { // find & match first node
            // p節(jié)點的模式
            boolean isData = p.isData;
            // p節(jié)點的值
            Object item = p.item;
            // p沒有被匹配到
            if (item != p && (item != null) == isData) { // unmatched
                // 如果兩者模式一樣,則不能匹配,跳出循環(huán)后嘗試入隊
                if (isData == haveData)   // can't match
                    break;
                // 如果兩者模式不一樣,則嘗試匹配
                // 把p的值設(shè)置為e(如果是取元素則e是null,如果是放元素則e是元素值)
                if (p.casItem(item, e)) { // match
                    // 匹配成功
                    // for里面的邏輯比較復(fù)雜,用于控制多線程同時放取元素時出現(xiàn)競爭的情況的
                    // 看不懂可以直接跳過
                    for (Node q = p; q != h;) {
                        // 進入到這里可能是頭節(jié)點已經(jīng)被匹配,然后p會變成h的下一個節(jié)點 
                        Node n = q.next;  // update by 2 unless singleton
                        // 如果head還沒變,就把它更新成新的節(jié)點
                        // 并把它刪除(forgetNext()會把它的next設(shè)為自己,也就是從單鏈表中刪除了)
                        // 這時為什么要把head設(shè)為n呢?因為到這里了,肯定head本身已經(jīng)被匹配掉了
                        // 而上面的p.casItem()又成功了,說明p也被當(dāng)前這個元素給匹配掉了
                        // 所以需要把它們倆都出隊列,讓其它線程可以從真正的頭開始,不用重復(fù)檢查了
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 如果新的頭節(jié)點為空,或者其next為空,或者其next未匹配,就重試
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 喚醒p中等待的線程
                    LockSupport.unpark(p.waiter);
                    // 并返回匹配到的元素
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            // p已經(jīng)被匹配了或者嘗試匹配的時候失敗了
            // 也就是其它線程先一步匹配了p
            // 這時候又分兩種情況,p的next還沒來得及修改,p的next指向了自己
            // 如果p的next已經(jīng)指向了自己,就重新取head重試,否則就取其next重試
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        // 到這里肯定是隊列中存儲的節(jié)點類型和自己一樣
        // 或者隊列中沒有元素了
        // 就入隊(不管放元素還是取元素都得入隊)
        // 入隊又分成四種情況:
        // NOW,立即返回,沒有匹配到立即返回,不做入隊操作
        // ASYNC,異步,元素入隊但當(dāng)前線程不會阻塞(相當(dāng)于×××LinkedBlockingQueue的元素入隊)
        // SYNC,同步,元素入隊后當(dāng)前線程阻塞,等待被匹配到
        // TIMED,有超時,元素入隊后等待一段時間被匹配,時間到了還沒匹配到就返回元素本身

        // 如果不是立即返回
        if (how != NOW) {                 // No matches available
            // 新建s節(jié)點
            if (s == null)
                s = new Node(e, haveData);
            // 嘗試入隊
            Node pred = tryAppend(s, haveData);
            // 入隊失敗,重試
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 如果不是異步(同步或者有超時)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

private Node tryAppend(Node s, boolean haveData) {
    // 從tail開始遍歷,把s放到鏈表尾端
    for (Node t = tail, p = t;;) {        // move p to last node and append
        Node n, u;                        // temps for reads of next & tail
        // 如果首尾都是null,說明鏈表中還沒有元素
        if (p == null && (p = head) == null) {
            // 就讓首節(jié)點指向s
            // 注意,這里插入第一個元素的時候tail指針并沒有指向s
            if (casHead(null, s))
                return s;                 // initialize
        }
        else if (p.cannotPrecede(haveData))
            // 如果p無法處理,則返回null
            // 這里無法處理的意思是,p和s節(jié)點的類型不一樣,不允許s入隊
            // 比如,其它線程先入隊了一個數(shù)據(jù)節(jié)點,這時候要入隊一個非數(shù)據(jù)節(jié)點,就不允許,
            // 隊列中所有的元素都要保證是同一種類型的節(jié)點
            // 返回null后外面的方法會重新嘗試匹配重新入隊等
            return null;                  // lost race vs opposite mode
        else if ((n = p.next) != null)    // not last; keep traversing
            // 如果p的next不為空,說明不是最后一個節(jié)點
            // 則讓p重新指向最后一個節(jié)點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))
            // 如果CAS更新s為p的next失敗
            // 則說明有其它線程先一步更新到p的next了
            // 就讓p指向p的next,重新嘗試讓s入隊
            p = p.next;                   // re-read on CAS failure
        else {
            // 到這里說明s成功入隊了
            // 如果p不等于t,就更新tail指針
            // 還記得上面插入第一個元素時tail指針并沒有指向新元素嗎?
            // 這里就是用來更新tail指針的
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                       (t = tail)   != null &&
                       (s = t.next) != null && // advance and retry
                       (s = s.next) != null && s != t);
            }
            // 返回p,即s的前一個元素
            return p;
        }
    }
}

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    // 如果是有超時的,計算其超時時間
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 當(dāng)前線程
    Thread w = Thread.currentThread();
    // 自旋次數(shù)
    int spins = -1; // initialized after first item and cancel checks
    // 隨機數(shù),隨機讓一些自旋的線程讓出CPU
    ThreadLocalRandom randomYields = null; // bound if needed

    for (;;) {
        Object item = s.item;
        // 如果s元素的值不等于e,說明它被匹配到了
        if (item != e) {                  // matched
            // assert item != s;
            // 把s的item更新為s本身
            // 并把s中的waiter置為空
            s.forgetContents();           // avoid garbage
            // 返回匹配到的元素
            return LinkedTransferQueue.<E>cast(item);
        }
        // 如果當(dāng)前線程中斷了,或者有超時的到期了
        // 就更新s的元素值指向s本身
        if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                s.casItem(e, s)) {        // cancel
            // 嘗試解除s與其前一個節(jié)點的關(guān)系
            // 也就是刪除s節(jié)點
            unsplice(pred, s);
            // 返回元素的值本身,說明沒匹配到
            return e;
        }

        // 如果自旋次數(shù)小于0,就計算自旋次數(shù)
        if (spins < 0) {                  // establish spins at/near front
            // spinsFor()計算自旋次數(shù)
            // 如果前面有節(jié)點未被匹配就返回0
            // 如果前面有節(jié)點且正在匹配中就返回一定的次數(shù),等待
            if ((spins = spinsFor(pred, s.isData)) > 0)
                // 初始化隨機數(shù)
                randomYields = ThreadLocalRandom.current();
        }
        else if (spins > 0) {             // spin
            // 還有自旋次數(shù)就減1
            --spins;
            // 并隨機讓出CPU
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();           // occasionally yield
        }
        else if (s.waiter == null) {
            // 更新s的waiter為當(dāng)前線程
            s.waiter = w;                 // request unpark then recheck
        }
        else if (timed) {
            // 如果有超時,計算超時時間,并阻塞一定時間
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        }
        else {
            // 不是超時的,直接阻塞,等待被喚醒
            // 喚醒后進入下一次循環(huán),走第一個if的邏輯就返回匹配的元素了
            LockSupport.park(this);
        }
    }
}

這三個方法里的內(nèi)容特別復(fù)雜,很大一部分代碼都是在控制線程安全,各種CAS,我們這里簡單描述一下大致的邏輯:

(1)來了一個元素,我們先查看隊列頭的節(jié)點,是否與這個元素的模式一樣;

(2)如果模式不一樣,就嘗試讓他們匹配,如果頭節(jié)點被別的線程先匹配走了,就嘗試與頭節(jié)點的下一個節(jié)點匹配,如此一直往后,直到匹配到或到鏈表尾為止;

(3)如果模式一樣,或者到鏈表尾了,就嘗試入隊;

(4)入隊的時候有可能鏈表尾修改了,那就尾指針后移,再重新嘗試入隊,依此往復(fù);

(5)入隊成功了,就自旋或阻塞,阻塞了就等待被其它線程匹配到并喚醒;

(6)喚醒之后進入下一次循環(huán)就匹配到元素了,返回匹配到的元素;

(7)是否需要入隊及阻塞有四種情況:

a)NOW,立即返回,沒有匹配到立即返回,不做入隊操作

    對應(yīng)的方法有:poll()、tryTransfer(e)

b)ASYNC,異步,元素入隊但當(dāng)前線程不會阻塞(相當(dāng)于×××LinkedBlockingQueue的元素入隊)

    對應(yīng)的方法有:add(e)、offer(e)、put(e)、offer(e, timeout, unit)

c)SYNC,同步,元素入隊后當(dāng)前線程阻塞,等待被匹配到

    對應(yīng)的方法有:take()、transfer(e)

d)TIMED,有超時,元素入隊后等待一段時間被匹配,時間到了還沒匹配到就返回元素本身

    對應(yīng)的方法有:poll(timeout, unit)、tryTransfer(e, timeout, unit)

總結(jié)

(1)LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合體;

(2)LinkedTransferQueue的實現(xiàn)方式是使用一種叫做雙重隊列的數(shù)據(jù)結(jié)構(gòu);

(3)不管是取元素還是放元素都會入隊;

(4)先嘗試跟頭節(jié)點比較,如果二者模式不一樣,就匹配它們,組成CP,然后返回對方的值;

(5)如果二者模式一樣,就入隊,并自旋或阻塞等待被喚醒;

(6)至于是否入隊及阻塞有四種模式,NOW、ASYNC、SYNC、TIMED;

(7)LinkedTransferQueue全程都沒有使用synchronized、重入鎖等比較重的鎖,基本是通過 自旋+CAS 實現(xiàn);

(8)對于入隊之后,先自旋一定次數(shù)后再調(diào)用LockSupport.park()或LockSupport.parkNanos阻塞;

彩蛋

LinkedTransferQueue與SynchronousQueue(公平模式)有什么異同呢?

(1)在java8中兩者的實現(xiàn)方式基本一致,都是使用的雙重隊列;

(2)前者完全實現(xiàn)了后者,但比后者更靈活;

(3)后者不管放元素還是取元素,如果沒有可匹配的元素,所在的線程都會阻塞;

(4)前者可以自己控制放元素是否需要阻塞線程,比如使用四個添加元素的方法就不會阻塞線程,只入隊元素,使用transfer()會阻塞線程;

(5)取元素兩者基本一樣,都會阻塞等待有新的元素進入被匹配到;


歡迎關(guān)注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

死磕 java集合之LinkedTransferQueue源碼分析

分享標(biāo)題:死磕java集合之LinkedTransferQueue源碼分析
網(wǎng)站URL:http://www.rwnh.cn/article40/jjsdho.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管關(guān)鍵詞優(yōu)化、響應(yīng)式網(wǎng)站、定制開發(fā)、域名注冊、建站公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)
宁武县| 积石山| 温宿县| 惠水县| 平湖市| 高台县| 出国| 乌拉特前旗| 颍上县| 乌鲁木齐市| 宁夏| 抚宁县| 金昌市| 临海市| 三门峡市| 习水县| 沙坪坝区| 公主岭市| 长宁区| 华容县| 绥中县| 合肥市| 自治县| 岗巴县| 达拉特旗| 长兴县| 页游| 宁安市| 民勤县| 万源市| 武穴市| 琼中| 安国市| 蒙阴县| 从江县| 怀宁县| 延庆县| 葫芦岛市| 莱阳市| 南宁市| 呼玛县|