中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

如何理解tcpServer中的IOLoop方法

本篇文章為大家展示了如何理解tcpServer 中的IOLoop方法,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

晉安網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)公司自2013年起到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。

今天我們就分析一下IOLoop這個(gè)方法

廢話不多說(shuō),直接上代碼吧(代碼位于nsq/nsqlookupd/lookup_protocol_v1.go這個(gè)文件中)

//這段代碼位于nsq/nsqlookupd/client_v1.go這個(gè)文件中
type ClientV1 struct {
	net.Conn //組合net.Conn接口
	peerInfo *PeerInfo //client的信息也就是前面所講的product的信息
}
//初始化一個(gè)ClientV1
func NewClientV1(conn net.Conn) *ClientV1 {
	return &ClientV1{
		Conn: conn,
	}
}

//實(shí)現(xiàn)String接口
func (c *ClientV1) String() string {
	return c.RemoteAddr().String()
}

//定義ClientV1結(jié)束




type LookupProtocolV1 struct {
	ctx *Context //一直貫穿整個(gè)代碼的Context,具體可翻看前面章節(jié)
}

func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
	var err error
	var line string

	client := NewClientV1(conn) //新建一個(gè)client版本為V1
	err = nil
	reader := bufio.NewReader(client) //由client 創(chuàng)建一個(gè) 帶有buffer 的Reader 默認(rèn) buffer size 為4096,這里的NewReader參數(shù)為io.Reader 接口,為何net.Conn接口也能作為參數(shù)呢?因?yàn)閚et.Conn 接口其實(shí)也是實(shí)現(xiàn)了io.Reader接口了,具體概念大家可翻看golang的教程
	for {
		line, err = reader.ReadString('\n') //按行讀取
		if err != nil {
			break
		}
		line = strings.TrimSpace(line) //去掉這行兩頭的空格符
		
		params := strings.Split(line, " ") //字符串按一個(gè)空格字符串分割,并獲取相應(yīng)的Commad 以及 該command 的相應(yīng)的params

		response, err := p.Exec(client, reader, params) //執(zhí)行相應(yīng)的Command
		if err != nil {
			ctx := ""
			if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
				ctx = " - " + parentErr.Error()
			}
			p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx)

			_, err = protocol.SendResponse(client, []byte(err.Error())) //返回錯(cuò)誤給client
			if err != nil {
				break
			}

			// errors of type FatalClientErr should forceably close the connection
			if _, ok := err.(*protocol.FatalClientErr); ok { 
				break
			}
			continue
		}

		if response != nil {
			_, err = protocol.SendResponse(client, response) //返回命令處理結(jié)果給client
			if err != nil {
				break
			}
		}
	}

	//for 循環(huán)結(jié)束了 說(shuō)明程序要退出了,調(diào)用RegistrationDB 中的 RemoveProducer從producer 的注冊(cè)數(shù)據(jù)中刪除 producer信息
	//這里的RegistrationDB下章再具體講解
	p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client)
	if client.peerInfo != nil {
		registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id)
		for _, r := range registrations {
			if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
				p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
					client, r.Category, r.Key, r.SubKey)
			}
		}
	}
	return err
}

//這個(gè)方法就是執(zhí)行相應(yīng)的命令動(dòng)作 有 PING IDENTIFY REGISTER UNREGISTER 這四個(gè)類型
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	switch params[0] {
	case "PING": //用于client的心跳處理
		return p.PING(client, params)
	case "IDENTIFY": //用于client端的信息注冊(cè),執(zhí)行PING REGISTER UNREGISTER 命令前必須先執(zhí)行此命令
		return p.IDENTIFY(client, reader, params[1:])
	case "REGISTER": //用于client端注冊(cè)topic以及channel的命令
		return p.REGISTER(client, reader, params[1:])
	case "UNREGISTER": //執(zhí)行與REGISTER命令相反的邏輯
		return p.UNREGISTER(client, reader, params[1:])
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

//INDENTIFY命令處理邏輯
//該命令用于注冊(cè)client的producer信息,并返回nsqlookupd的TCP 以及 HTTP 端口信息給client
//大致的報(bào)文如下
//  V1 INDENTIFY\n   注意了這里前面提過(guò)的V1前面是兩個(gè)空格字符
//123\n  這里是后面json數(shù)據(jù)(producer 信息的json數(shù)據(jù)的字節(jié)長(zhǎng)度)
//{....}\n 一串表示producer信息的json數(shù)據(jù),具體的可參考 nsq/nsqlookupd/registration_db.go文件中的PeerInfo struct
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	var err error
	if client.peerInfo != nil { //如果有該client 的PeerInfo數(shù)據(jù)則返回錯(cuò)誤
		return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again")
	}

	var bodyLen int32
	err = binary.Read(reader, binary.BigEndian, &bodyLen) //獲取producer PeerInfo json數(shù)據(jù)的字節(jié)長(zhǎng)度 大端二進(jìn)制格式
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
	}
	body := make([]byte, bodyLen) //初始化一個(gè)producer PeerInfo json數(shù)據(jù)長(zhǎng)度的bytes
	_, err = io.ReadFull(reader, body) //讀取所有的json數(shù)據(jù)
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body")
	}
	
	// body is a json structure with producer information
	peerInfo := PeerInfo{id: client.RemoteAddr().String()} //實(shí)例化一個(gè)PeerInfo
	err = json.Unmarshal(body, &peerInfo) //解析producer PeerInfo json數(shù)據(jù)到peerInfo
	if err != nil {
		//json 數(shù)據(jù)解析失敗 返回錯(cuò)誤
		return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
	}

	peerInfo.RemoteAddress = client.RemoteAddr().String() //獲取PeerInfo remote address

	// require all fields
	
	//校驗(yàn)獲取的PeerInfo 數(shù)據(jù)
	if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" {
		return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields")
	}
	//將當(dāng)前系統(tǒng)時(shí)間(納秒)更新到PeerInfo 中的lastUpdate中 用于 client的心跳判斷
	atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano())

	p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s",
		client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version)

	client.peerInfo = &peerInfo
	//注冊(cè)producer PeerInfo 信息到 RegistrationDB中 其中Registration的 Category 為client Key 和 SubKey為空
	if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) {
		p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "")
	}

	// build a response
	//將nsqlookupd的TCP端口,HTTP端口信息,版本信息,broadcast address信息,以及host name信息 已json數(shù)據(jù)格式返回給client
	data := make(map[string]interface{})
	data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port
	data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port
	data["version"] = version.Binary
	hostname, err := os.Hostname()
	if err != nil {
		log.Fatalf("ERROR: unable to get hostname %s", err)
	}
	data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress
	data["hostname"] = hostname

	response, err := json.Marshal(data)
	if err != nil {
		p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data)
		return []byte("OK"), nil
	}
	return response, nil
}

//獲取params中的topic 以及 channel
func getTopicChan(command string, params []string) (string, string, error) {
	if len(params) == 0 {
		return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command))
	}

	topicName := params[0]
	var channelName string
	if len(params) >= 2 {
		channelName = params[1]
	}
	
	//校驗(yàn)是否是topic
	if !protocol.IsValidTopicName(topicName) {
		return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName))
	}

	//校驗(yàn)是否是channel
	if channelName != "" && !protocol.IsValidChannelName(channelName) {
		return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName))
	}

	return topicName, channelName, nil
}

//REGISTER 命令 用于注冊(cè)client的topic 以及 channel信息
//一個(gè)topic下可以有多個(gè)channel
//一個(gè)消費(fèi)者訂閱的是一個(gè)topic 那么 生成者給這個(gè)topic 下的channel的信息 這個(gè)消費(fèi)者也能接收得到這個(gè)信息,如果消費(fèi)者訂閱的不是這個(gè)channel的信息,那么這個(gè)消費(fèi)者則接受不到這個(gè)信息
//nsq topic 與channel的關(guān)系,大家可以多搜索下資料,我這里感覺(jué)講的也不太清晰,請(qǐng)大家諒解一下
//REGISTER 命令必須在INDENTIFY 之后才能調(diào)用
//具體協(xié)議報(bào)文如下
//REGISTER topic1\n 這個(gè)只創(chuàng)建一個(gè)名字為topic1的topic
//或如下報(bào)文
//REGISTER topic1 channel1\n 這個(gè)只創(chuàng)建一個(gè)名字為topic1的topic 且topic1下面創(chuàng)建一個(gè)名字為channel1的channel
func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	if client.peerInfo == nil {
		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
	}
	
	//獲取REGISTER 命令時(shí)的topic 以及channel名字
	topic, channel, err := getTopicChan("REGISTER", params) 	if err != nil {
		return nil, err
	}
	//如果有channel
	if channel != "" {
		//添加channel信息到RegistrationDB中其中Registration的 Category 為"channel"字符串,Key為topic,SubKey為channel
		key := Registration{"channel", topic, channel}
		if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
			p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",
				client, "channel", topic, channel)
		}
	}
	//添加topic信息到RegistrationDB中其中Registration的 Category 為"topic"字符串,Key為topic,SubKey為空
	key := Registration{"topic", topic, ""}
	if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) {
		p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s",
			client, "topic", topic, "")
	}
	//返回OK
	return []byte("OK"), nil
}

//UNREGISTER命令用于取消注冊(cè)topic 或取消注冊(cè)某個(gè)topic下的某一個(gè)channel
//報(bào)文格式如下
//UNREGISTER topic1 channel1\n 這個(gè)報(bào)文表示取消注冊(cè)名字為topic1的topic下的名字為channel1的channel,這個(gè)名字 只取消注冊(cè)這個(gè)channel1,不取消注冊(cè)topic1下的其他channel 以及這個(gè)topic1本身
//或這個(gè)格式的報(bào)文
//UNREGISTER topic1\n 這個(gè)報(bào)文表示取消注冊(cè)名字為topic1的topic,這個(gè)時(shí)候topic1以及topic1下的所有channel都取消注冊(cè)了
func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
	if client.peerInfo == nil {
		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY")
	}

	topic, channel, err := getTopicChan("UNREGISTER", params) //獲取topic 以及channel 的名字
	if err != nil {
		return nil, err
	}

	if channel != "" {
		//如果有channel 則只取消注冊(cè)這個(gè)topic下的這個(gè)channel
		key := Registration{"channel", topic, channel}
		removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id)
		if removed {
			p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
				client, "channel", topic, channel)
		}
		// for ephemeral channels, remove the channel as well if it has no producers
		if left == 0 && strings.HasSuffix(channel, "#ephemeral") {
			p.ctx.nsqlookupd.DB.RemoveRegistration(key)
		}
	} else {
		// no channel was specified so this is a topic unregistration
		// remove all of the channel registrations...
		// normally this shouldn't happen which is why we print a warning message
		// if anything is actually removed
		//如果沒(méi)有channel 這個(gè)取消注冊(cè)這個(gè)topic 以及這個(gè)topic下的所有channel
		registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*")
		for _, r := range registrations {
			if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed {
				p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s",
					client, "channel", topic, r.SubKey)
			}
		}

		key := Registration{"topic", topic, ""}
		if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed {
			p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s",
				client, "topic", topic, "")
		}
	}
	//返回OK
	return []byte("OK"), nil
}

//PING 用于client中的心跳處理命令
func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) {
	if client.peerInfo != nil {
		// we could get a PING before other commands on the same client connection
		//獲取上一次心跳的時(shí)間
		cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate))
		//獲取當(dāng)前時(shí)間
		now := time.Now()
		//這里日志輸出兩次心跳之間的間隔時(shí)間
		p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id,
			now.Sub(cur))
		//更新PeerInfo中的lastUpdate時(shí)間為當(dāng)前時(shí)間
		atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano())
	}
	//返回OK
	return []byte("OK"), nil
}

nsqlookupd中的tcpServer中的主要協(xié)議處理的IOLoop這里基本講解完了。

上述內(nèi)容就是如何理解tcpServer 中的IOLoop方法,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站題目:如何理解tcpServer中的IOLoop方法
當(dāng)前地址:http://www.rwnh.cn/article8/gcgeip.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、品牌網(wǎng)站制作、用戶體驗(yàn)、ChatGPT、網(wǎng)頁(yè)設(shè)計(jì)公司、外貿(mào)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
五原县| 吉林省| 霍州市| 玛曲县| 新兴县| 金门县| 丽江市| 同仁县| 云梦县| 湖北省| 年辖:市辖区| 衢州市| 德安县| 安国市| 巴中市| 旌德县| 连江县| 紫金县| 资兴市| 连州市| 东光县| 嘉黎县| 安岳县| 千阳县| 宜丰县| 筠连县| 江北区| 铅山县| 梅河口市| 佛冈县| 固阳县| 彩票| 宾阳县| 汽车| 苍山县| 余干县| 德庆县| 武山县| 独山县| 凉城县| 株洲市|