中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

Go語言之并發(fā)示例(Runner)

這篇通過一個(gè)例子,演示使用通道來監(jiān)控程序的執(zhí)行時(shí)間,生命周期,甚至終止程序等。我們這個(gè)程序叫runner,我們可以稱之為執(zhí)行者,它可以在后臺(tái)執(zhí)行任何任務(wù),而且我們還可以控制這個(gè)執(zhí)行者,比如強(qiáng)制終止它等。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長期合作伙伴,公司提供的服務(wù)項(xiàng)目有:申請域名、網(wǎng)絡(luò)空間、營銷軟件、網(wǎng)站建設(shè)、休寧縣網(wǎng)站維護(hù)、網(wǎng)站推廣。


現(xiàn)在開始吧,運(yùn)用我們前面十幾篇連載的知識(shí),來構(gòu)建我們的Runner,使用一個(gè)結(jié)構(gòu)體類型就可以。


//一個(gè)執(zhí)行者,可以執(zhí)行任何任務(wù),但是這些任務(wù)是限制完成的,//該執(zhí)行者可以通過發(fā)送終止信號(hào)終止它
type Runner struct {
    tasks []func(int) //要執(zhí)行的任務(wù)
    complete chan error //用于通知任務(wù)全部完成
    timeout <-chan time.Time //這些任務(wù)在多久內(nèi)完成
    interrupt chan os.Signal //可以控制強(qiáng)制終止的信號(hào)
}


示例中,我們定義了一個(gè)結(jié)構(gòu)體類型Runner,這個(gè)Runner包含了要執(zhí)行哪些任務(wù)tasks,然后使用complete通知任務(wù)是否全部完成,不過這個(gè)執(zhí)行者是有時(shí)間限制的,這就是timeout,如果在限定的時(shí)間內(nèi)沒有完成,就會(huì)接收到超時(shí)的通知,如果完成了就會(huì)接收到完成的通知。注意這里的timeout是單向通道,只能接收。


complete定義為error類型的通道,是為了當(dāng)執(zhí)行任務(wù)出現(xiàn)問題時(shí)返回錯(cuò)誤的原因,如果沒有出現(xiàn)錯(cuò)誤,返回的是nil。


此外,我們還定義了一個(gè)中斷的信號(hào),讓我們可以隨時(shí)的終止執(zhí)行者。


有了結(jié)構(gòu)體,我們接著再定義一個(gè)工廠函數(shù)New,用于返回我們需要的Runner。


func New(tm time.Duration) *Runner {
    return &Runner{
        complete:make(chan error),
        timeout:time.After(tm),
        interrupt:make(chan os.Signal,1),
    }
}


這個(gè)New函數(shù)非常簡潔,可以幫我們很快的初始化一個(gè)Runnner,它只有一個(gè)參數(shù),用來設(shè)置這個(gè)執(zhí)行者的超時(shí)時(shí)間。這個(gè)超時(shí)時(shí)間被我們傳遞給了time.After函數(shù),這個(gè)函數(shù)可以在tm時(shí)間后,會(huì)同伙一個(gè)time.Time類型的只能接收的單向通道,來告訴我們已經(jīng)到時(shí)間了。


complete是一個(gè)無緩沖通道,也就是同步通道,因?yàn)槲覀円褂盟鼇砜刂莆覀冋麄€(gè)程序是否終止,所以它必須是同步通道,要讓main goroutine等待,一致要任務(wù)完成或者被強(qiáng)制終止。


interrupt是一個(gè)有緩沖的通道,這樣做是因?yàn)?,我們可以至少接收到一個(gè)操作系統(tǒng)的中斷信息,這樣Go runtime在發(fā)送這個(gè)信號(hào)的時(shí)候不會(huì)被阻塞,如果是無緩沖的通道就會(huì)阻塞了。


系統(tǒng)信號(hào)是什么意思呢,比如我們在程序執(zhí)行的時(shí)候按下Ctrl + C,這就是一個(gè)中斷的信號(hào),告訴程序可以強(qiáng)制終止了。


我們這里初始化了結(jié)構(gòu)體的三個(gè)字段,而執(zhí)行的任務(wù)tasks沒有初始化,默認(rèn)就是零值nil,因?yàn)樗且粋€(gè)切片。但是我們的執(zhí)行者Runner不能沒有任務(wù)啊,既然初始化Runner的時(shí)候沒有,那我們就定義一個(gè)方法,通過方法給執(zhí)行者添加需要執(zhí)行的任務(wù)。


//將需要執(zhí)行的任務(wù),添加到Runner里
func (r *Runner) Add(tasks ...func(int)){
    r.tasks = append(r.tasks,tasks...)
}


這個(gè)沒有太多可以說明的,r.tasks就是一個(gè)切片,來存儲(chǔ)需要執(zhí)行的任務(wù)。通過內(nèi)置的append函數(shù)就可以追加任務(wù)了。這里使用了可變參數(shù),可以靈活的添加一個(gè),甚至同時(shí)多個(gè)任務(wù),比較方便。


到了這里我們需要的執(zhí)行者Runner,如何添加任務(wù),如何獲取一個(gè)執(zhí)行者,都有了,下面就開始執(zhí)行者如何運(yùn)行任務(wù)?如何在運(yùn)行的時(shí)候強(qiáng)制中斷任務(wù)?在這些處理之前,我們先來定義兩個(gè)我們的兩個(gè)錯(cuò)誤變量,以便在接下來的代碼實(shí)例中使用。


var ErrTimeOut = errors.New("執(zhí)行者執(zhí)行超時(shí)")
var ErrInterrupt = errors.New("執(zhí)行者被中斷")


兩種錯(cuò)誤類型,一個(gè)表示因?yàn)槌瑫r(shí)錯(cuò)誤,一個(gè)表示因?yàn)楸恢袛噱e(cuò)誤。下面我們就看看如何執(zhí)行一個(gè)個(gè)任務(wù)。


//執(zhí)行任務(wù),執(zhí)行的過程中接收到中斷信號(hào)時(shí),返回中斷錯(cuò)誤//如果任務(wù)全部執(zhí)行完,還沒有接收到中斷信號(hào),則返回nil
func (r *Runner) run() error {
    for id, task := range r.tasks {
        if r.isInterrupt() {
               return ErrInterrupt
        }
        task(id)
     }    
    return nil}//檢查是否接收到了中斷信號(hào)
func (r *Runner) isInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)        
        return true
    default:       
         return false
    }
}


新增的run方法也很簡單,會(huì)使用for循環(huán),不停的運(yùn)行任務(wù),在運(yùn)行的每個(gè)任務(wù)之前,都會(huì)檢測是否收到了中斷信號(hào),如果沒有收到,則繼續(xù)執(zhí)行,一直到執(zhí)行完畢,返回nil;如果收到了中斷信號(hào),則直接返回中斷錯(cuò)誤類型,任務(wù)執(zhí)行終止。


這里注意isInterrupt函數(shù),它在實(shí)現(xiàn)的時(shí)候,使用了基于select的多路復(fù)用,select和switch很像,只不過它的每個(gè)case都是一個(gè)通信操作。那么到底選擇哪個(gè)case塊執(zhí)行呢?原則就是哪個(gè)case的通信操作可以執(zhí)行就執(zhí)行哪個(gè),如果同時(shí)有多個(gè)可以執(zhí)行的case,那么就隨機(jī)選擇一個(gè)執(zhí)行。


針對(duì)我們方法中,如果r.interrupt中接受不到值,就會(huì)執(zhí)行default語句塊,返回false,一旦r.interrupt中可以接收值,就會(huì)通知Go Runtime停止接收中斷信號(hào),然后返回true。


這里如果沒有default的話,select是會(huì)阻塞的,直到r.interrupt可以接收值為止,因?yàn)槲覀兝又械倪壿嬕蟛荒茏枞晕覀兪褂昧薲efault。


好了,基礎(chǔ)工作都做好了,現(xiàn)在開始執(zhí)行我們所有的任務(wù),并且時(shí)刻監(jiān)視著任務(wù)的完成,執(zhí)行事件的超時(shí)。


//開始執(zhí)行所有任務(wù),并且監(jiān)視通道事件
func (r *Runner) Start() error {
    //希望接收哪些系統(tǒng)信號(hào)
    signal.Notify(r.interrupt, os.Interrupt)    
    go func() {
        r.complete <- r.run()
    }()    

    select {
        case err := <-r.complete:
                return err    
        case <-r.timeout:        
                return ErrTimeOut
    }
}


signal.Notify(r.interrupt, os.Interrupt),這個(gè)是表示,如果有系統(tǒng)中斷的信號(hào),發(fā)給r.interrupt即可。


任務(wù)的執(zhí)行,這里開啟了一個(gè)groutine,然后調(diào)用run方法,結(jié)果發(fā)送給通道r.complete。最后就是使用一個(gè)select多路復(fù)用,哪個(gè)通道可以操作,就返回哪個(gè)。


到了這時(shí)候,只有兩種情況了,要么任務(wù)完成;要么到時(shí)間了,任務(wù)執(zhí)行超時(shí)。從我們前面的代碼看,任務(wù)完成又分兩種情況,一種是沒有執(zhí)行完,但是收到了中斷信號(hào),中斷了,這時(shí)返回中斷錯(cuò)誤;一種是順利執(zhí)行完成,這時(shí)返回nil。


現(xiàn)在把這些代碼匯總一下,容易統(tǒng)一理解一下,所有代碼如下:


package commonimport (
    "errors"
    "os"
    "os/signal"
    "time")
var ErrTimeOut = errors.New("執(zhí)行者執(zhí)行超時(shí)")
var ErrInterrupt = errors.New("執(zhí)行者被中斷")

//一個(gè)執(zhí)行者,可以執(zhí)行任何任務(wù),但是這些任務(wù)是限制完成的,//該執(zhí)行者可以通過發(fā)送終止信號(hào)終止它
type Runner struct {
    tasks     []func(int)      //要執(zhí)行的任務(wù)
    complete  chan error       //用于通知任務(wù)全部完成
    timeout   <-chan time.Time //這些任務(wù)在多久內(nèi)完成
    interrupt chan os.Signal   //可以控制強(qiáng)制終止的信號(hào)
}
func New(tm time.Duration) *Runner {
    return &Runner{
        complete:  make(chan error),
        timeout:   time.After(tm),
        interrupt: make(chan os.Signal, 1),
    }
}
//將需要執(zhí)行的任務(wù),添加到Runner里
func (r *Runner) Add(tasks ...func(int)) {
    r.tasks = append(r.tasks, tasks...)
}
//執(zhí)行任務(wù),執(zhí)行的過程中接收到中斷信號(hào)時(shí),返回中斷錯(cuò)誤//如果任務(wù)全部執(zhí)行完,還沒有接收到中斷信號(hào),則返回nil
func (r *Runner) run() error {
    for id, task := range r.tasks {
            if r.isInterrupt() {
                        return ErrInterrupt
        }
        task(id)
     }
     return nil
}
//檢查是否接收到了中斷信號(hào)
func (r *Runner) isInterrupt() bool {
    select {
    case <-r.interrupt:
        signal.Stop(r.interrupt)        
        return true
    default:        
        return false
    }}//開始執(zhí)行所有任務(wù),并且監(jiān)視通道事件func (r *Runner) Start() error {    
    //希望接收哪些系統(tǒng)信號(hào)
    signal.Notify(r.interrupt, os.Interrupt)
    go func() {
        r.complete <- r.run()
    }()    
    select {
        case err := <-r.complete: 
               return err   
        case <-r.timeout:       
               return ErrTimeOut
    }
}


這個(gè)common包里的Runner我們已經(jīng)開發(fā)完了,現(xiàn)在我們寫個(gè)例子試試它。


package main
import (
    "flysnow.org/hello/common"
    "log"
    "time"
    "os")
func main() {
    log.Println("...開始執(zhí)行任務(wù)...")

    timeout := 3 * time.Second
    r := common.New(timeout)

    r.Add(createTask(), createTask(), createTask())
    if err:=r.Start();err!=nil{
            switch err { 
            case common.ErrTimeOut:
                    log.Println(err)
                    os.Exit(1)        
            case common.ErrInterrupt:
                   log.Println(err)
                   os.Exit(2)
        }
    }
    log.Println("...任務(wù)執(zhí)行結(jié)束...")
}
func createTask() func(int) {
    return func(id int) {
        log.Printf("正在執(zhí)行任務(wù)%d", id)
        time.Sleep(time.Duration(id)* time.Second)
    }
}


例子非常簡單,定義任務(wù)超時(shí)時(shí)間為 3 秒,添加 3 個(gè)生成的任務(wù),每個(gè)任務(wù)都是打印一個(gè)正在執(zhí)行哪個(gè)任務(wù),然后休眠一段時(shí)間。


調(diào)用r.Start()開始執(zhí)行任務(wù),如果一切都正常的話,返回nil,然后打印出...任務(wù)執(zhí)行結(jié)束...,不過我們例子中,因?yàn)槌瑫r(shí)時(shí)間和任務(wù)的設(shè)定,結(jié)果是執(zhí)行超時(shí)的。


2017/04/15 22:17:55 ...開始執(zhí)行任務(wù)...
2017/04/15 22:17:55 正在執(zhí)行任務(wù)0
2017/04/15 22:17:55 正在執(zhí)行任務(wù)1
2017/04/15 22:17:56 正在執(zhí)行任務(wù)2
2017/04/15 22:17:58 執(zhí)行者執(zhí)行超時(shí)


如果我們把超時(shí)時(shí)間改為 4 秒或者更多,就會(huì)打印...任務(wù)執(zhí)行結(jié)束...。這里我們還可以測試另外一種系統(tǒng)中斷情況,在終端里運(yùn)行程序后,快速不停地按Ctrl + C,就可以看到執(zhí)行者被中斷的打印輸出信息了。


到這里,這篇文章已經(jīng)要收尾了,這個(gè)例子中,我們演示使用通道通信、同步等待,監(jiān)控程序等。


此外這個(gè)執(zhí)行者也是一個(gè)很不錯(cuò)的模式,比如我們寫好之后,交給定時(shí)任務(wù)去執(zhí)行即可,比如cron,這個(gè)模式我們還可以擴(kuò)展,更高效率的并發(fā),更多靈活的控制程序的生命周期,更高效的監(jiān)控等,這個(gè)大家自己可以試試,基于自己的需求修改就可以了。

網(wǎng)頁名稱:Go語言之并發(fā)示例(Runner)
網(wǎng)頁地址:http://www.rwnh.cn/article42/ipcjec.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站收錄、微信公眾號(hào)網(wǎng)站建設(shè)、品牌網(wǎng)站制作、網(wǎng)站制作網(wǎng)站維護(hù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化
温宿县| 墨竹工卡县| 石河子市| 阜南县| 中宁县| 毕节市| 教育| 色达县| 师宗县| 奇台县| 怀仁县| 房产| 桓台县| 明光市| 恭城| 长寿区| 秦安县| 苏尼特左旗| 纳雍县| 门头沟区| 石屏县| 安岳县| 东兰县| 阆中市| 文登市| 乐平市| 正定县| 高平市| 封开县| 岳阳市| 宝坻区| 西丰县| 晋宁县| 库伦旗| 股票| 通化市| 普陀区| 广灵县| 新宁县| 江源县| 繁昌县|