今天就跟大家聊聊有關(guān)web開發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
秀英網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營維護(hù)。成都創(chuàng)新互聯(lián)公司自2013年創(chuàng)立以來到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司。
在做任務(wù)開發(fā)的時候,你們一定會碰到以下場景:
場景1:調(diào)用第三方接口的時候, 一個需求你需要調(diào)用不同的接口,做數(shù)據(jù)組裝。
場景2:一個應(yīng)用首頁可能依托于很多服務(wù)。那就涉及到在加載頁面時需要同時請求多個服務(wù)的接口。這一步往往是由后端統(tǒng)一調(diào)用組裝數(shù)據(jù)再返回給前端,也就是所謂的 BFF(Backend For Frontend) 層。
針對以上兩種場景,假設(shè)在沒有強(qiáng)依賴關(guān)系下,選擇串行調(diào)用,那么總耗時即:
time=s1+s2+....sn
按照當(dāng)代秒入百萬的有為青年,這么長時間早就把你祖宗十八代問候了一遍。
為了偉大的KPI,我們往往會選擇并發(fā)地調(diào)用這些依賴接口。那么總耗時就是:
time=max(s1,s2,s3.....,sn)
當(dāng)然開始堆業(yè)務(wù)的時候可以先串行化,等到上面的人著急的時候,亮出絕招。
這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業(yè)務(wù)某個接口提高百分之XXX性能,間接產(chǎn)生XXX價值。
當(dāng)然這一切的前提是,做老板不懂技術(shù),做技術(shù)”懂”你。
言歸正傳,如果修改成并發(fā)調(diào)用,你可能會這么寫,
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup wg.Add(2) var userInfo *User var productList []Product go func() { defer wg.Done() userInfo, _ = getUser() }() go func() { defer wg.Done() productList, _ = getProductList() }() wg.Wait() fmt.Printf("用戶信息:%+v\n", userInfo) fmt.Printf("商品信息:%+v\n", productList) } /********用戶服務(wù)**********/ type User struct { Name string Age uint8 } func getUser() (*User, error) { time.Sleep(500 * time.Millisecond) var u User u.Name = "wuqinqiang" u.Age = 18 return &u, nil } /********商品服務(wù)**********/ type Product struct { Title string Price uint32 } func getProductList() ([]Product, error) { time.Sleep(400 * time.Millisecond) var list []Product list = append(list, Product{ Title: "SHib", Price: 10, }) return list, nil }
從實(shí)現(xiàn)上來說,需要多少服務(wù),會開多少個 G,利用 sync.WaitGroup 的特性,
實(shí)現(xiàn)并發(fā)編排任務(wù)的效果。
好像,問題不大。
但是隨著代號 996 業(yè)務(wù)場景的增加,你會發(fā)現(xiàn),好多模塊都有相似的功能,只是對應(yīng)的業(yè)務(wù)場景不同而已。
那么我們能不能抽像出一套針對此業(yè)務(wù)場景的工具,而把具體業(yè)務(wù)實(shí)現(xiàn)交給業(yè)務(wù)方。
本著不重復(fù)造輪子的原則,去搜了下開源項(xiàng)目,最終看上了 go-zero 里面的一個工具 mapreduce。
可以自行 Google 這個名詞。
使用很簡單。我們通過它改造一下上面的代碼:
package main import ( "fmt" "github.com/tal-tech/go-zero/core/mr" "time" ) func main() { var userInfo *User var productList []Product _ = mr.Finish(func() (err error) { userInfo, err = getUser() return err }, func() (err error) { productList, err = getProductList() return err }) fmt.Printf("用戶信息:%+v\n", userInfo) fmt.Printf("商品信息:%+v\n", productList) } //打印 用戶信息:&{Name:wuqinqiang Age:18} 商品信息:[{Title:SHib Price:10}]
是不是舒服多了。
但是這里還需要注意一點(diǎn),假設(shè)你調(diào)用的其中一個服務(wù)錯誤,并且你 return err 對應(yīng)的錯誤,那么其他調(diào)用的服務(wù)會被取消。
比如我們修改 getProductList 直接響應(yīng)錯誤。
func getProductList() ([]Product, error) { return nil, errors.New("test error") } //打印 // 用戶信息:<nil> // 商品信息:[]
那么最終打印的時候連用戶信息都會為空,因?yàn)槌霈F(xiàn)一個服務(wù)錯誤,用戶服務(wù)請求被取消了。
一般情況下,在請求服務(wù)錯誤的時候我們會有保底操作,一個服務(wù)錯誤不能影響其他請求的結(jié)果。
所以在使用的時候具體處理取決于業(yè)務(wù)場景。
既然用了,那么就追下源碼吧。
func Finish(fns ...func() error) error { if len(fns) == 0 { return nil } return MapReduceVoid(func(source chan<- interface{}) { for _, fn := range fns { source <- fn } }, func(item interface{}, writer Writer, cancel func(error)) { fn := item.(func() error) if err := fn(); err != nil { cancel(err) } }, func(pipe <-chan interface{}, cancel func(error)) { drain(pipe) }, WithWorkers(len(fns))) }
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder) }, opts...) return err }
對于 MapReduceVoid函數(shù),主要查看三個閉包參數(shù)。
第一個 GenerateFunc 用于生產(chǎn)數(shù)據(jù)。
MapperFunc 讀取生產(chǎn)出的數(shù)據(jù),進(jìn)行處理。
VoidReducerFunc 這里表示不對 mapper 后的數(shù)據(jù)做聚合返回。所以這個閉包在此操作幾乎0作用。
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { source := buildSource(generate) return MapReduceWithSource(source, mapper, reducer, opts...) } func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{})// 創(chuàng)建無緩沖通道 threading.GoSafe(func() { defer close(source) generate(source) //開始生產(chǎn)數(shù)據(jù) }) return source //返回?zé)o緩沖通道 }
buildSource函數(shù)中,返回一個無緩沖的通道。并開啟一個 G 運(yùn)行 generate(source),往無緩沖通道塞數(shù)據(jù)。這個generate(source) 不就是一開始 Finish 傳遞的第一個閉包參數(shù)。
return MapReduceVoid(func(source chan<- interface{}) { // 就這個 for _, fn := range fns { source <- fn } })
然后查看 MapReduceWithSource 函數(shù),
func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) { options := buildOptions(opts...) //任務(wù)執(zhí)行結(jié)束通知信號 output := make(chan interface{}) //將mapper處理完的數(shù)據(jù)寫入collector collector := make(chan interface{}, options.workers) // 取消操作信號 done := syncx.NewDoneChan() writer := newGuardedWriter(output, done.Done()) var closeOnce sync.Once var retErr errorx.AtomicError finish := func() { closeOnce.Do(func() { done.Close() close(output) }) } cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { retErr.Set(ErrCancelWithNil) } drain(source) finish() }) go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) drain(collector) }() // 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper go executeMappers(func(item interface{}, w Writer) { mapper(item, w, cancel) }, source, collector, done.Done(), options.workers) value, ok := <-output if err := retErr.Load(); err != nil { return nil, err } else if ok { return value, nil } else { return nil, ErrReduceNoOutput } }
這段代碼挺長的,我們說下核心的點(diǎn)。這里使用一個G 調(diào)用 executeMappers 方法。
go executeMappers(func(item interface{}, w Writer) { mapper(item, w, cancel) }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) { var wg sync.WaitGroup defer func() { // 等待所有任務(wù)全部執(zhí)行完畢 wg.Wait() // 關(guān)閉通道 close(collector) }() //根據(jù)指定數(shù)量創(chuàng)建 worker池 pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) for { select { case <-done: return case pool <- lang.Placeholder: // 從buildSource() 返回的無緩沖通道取數(shù)據(jù) item, ok := <-input // 當(dāng)通道關(guān)閉,結(jié)束 if !ok { <-pool return } wg.Add(1) // better to safely run caller defined method threading.GoSafe(func() { defer func() { wg.Done() <-pool }() //真正運(yùn)行閉包函數(shù)的地方 // func(item interface{}, w Writer) { // mapper(item, w, cancel) // } mapper(item, writer) }) } } }
具體的邏輯已備注,代碼很容易懂。
一旦 executeMappers 函數(shù)返回,關(guān)閉 collector 通道,那么執(zhí)行 reducer 不再阻塞。
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) //這里 drain(collector) }()
這里的 reducer(collector, writer, cancel) 其實(shí)就是從 MapReduceVoid 傳遞的第三個閉包函數(shù)。
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) { reducer(input, cancel) //這里 drain(input) // We need to write a placeholder to let MapReduce to continue on reducer done, // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce. writer.Write(lang.Placeholder) }, opts...) return err }
然后這個閉包函數(shù)又執(zhí)行了 reducer(input, cancel),這里的 reducer 就是我們一開始解釋過的 VoidReducerFunc,從 Finish() 而來。
等等,看到上面三個地方的 drain(input)了嗎?
// drain drains the channel. func drain(channel <-chan interface{}) { // drain the channel for range channel { } }
其實(shí)就是一個排空 channel 的操作,但是三個地方都對同一個 channel做同樣的操作,也是讓我費(fèi)解。
還有更重要的一點(diǎn)。
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) drain(collector) }()
上面的代碼,假如執(zhí)行 reducer,writer 寫入引發(fā) panic,那么drain(collector) 將沒有機(jī)會執(zhí)行。
不過作者已經(jīng)修復(fù)了這個問題,直接把 drain(collector) 放入到 defer。
具體 issues[1]。
到這里,關(guān)于 Finish 的源碼也就結(jié)束了。感興趣的可以看看其他源碼。
很喜歡 go-zero 里的一些工具,但是工具往往并不獨(dú)立,依賴于其他文件包,導(dǎo)致明明只想使用其中一個工具卻需要安裝整個包。
所以最終的結(jié)果就是扒源碼,創(chuàng)建無依賴庫工具集,遵循 MIT 即可。
看完上述內(nèi)容,你們對web開發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
分享題目:web開發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)
分享鏈接:http://www.rwnh.cn/article26/ihjicg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)、定制開發(fā)、響應(yīng)式網(wǎng)站、企業(yè)建站、外貿(mào)建站、Google
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)