内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

rabbitmq的三種隊列以及使用方式(go)-創(chuàng)新互聯(lián)

rabbitmq的三種隊列以及使用方式(beego)

創(chuàng)新互聯(lián)建站堅持“要么做到,要么別承諾”的工作理念,服務領(lǐng)域包括:做網(wǎng)站、網(wǎng)站設(shè)計、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣等服務,滿足客戶于互聯(lián)網(wǎng)時代的博白網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡建設(shè)合作伙伴!

提示:最上面右調(diào)用的統(tǒng)一調(diào)用,最下面有消費的代碼注:消費需根據(jù)條件修改并另起一個mian.go


前言

面試官問這個問題,肯定是想知道你們公司有一個什么場景需要使用到這個Mq,這個場景有一個什么技術(shù)挑戰(zhàn)導致必須要用這個mq,用了這個mq之后有什么好處。mq經(jīng)典的使用場景有解耦,異步,削鋒。
而rabbitmq是如何進行使用的他的使用發(fā)放是什么呢?


提示:先是統(tǒng)一的引入跟實例化

import (
	"bytes"
	"fmt"
	"github.com/streadway/amqp"
)

type Callback func(str1 string)

//Connect RabbitMQ連接函數(shù)
func Connect() (conn *amqp.Connection, err error) {//連接mq
	conn, err = amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
	return conn, err
}


func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
	r := s.String()
	return &r
}
一、rabbitmq的普通隊列(路由模式) 1.生成者

示例:這是最普通的rabbitmq的生成者

//Publish 發(fā)送端函數(shù)
//exchange交換機名稱
//queueName隊列名稱
//body發(fā)送內(nèi)容
func Publish(exchange string, queueName string, body string) error {//建立連接
	conn, err := Connect()
	if err != nil {return err
	}
	defer conn.Close()

	//創(chuàng)建通道channel
	channel, err := conn.Channel()
	if err != nil {return err
	}
	defer channel.Close()

	//創(chuàng)建隊列
	q, err := channel.QueueDeclare(
		queueName, //隊列名稱
		true,      //持久化
		false,
		false,
		false,
		nil,
	)
	if err != nil {return err
	}
	//發(fā)送消息
	err = channel.Publish(exchange, q.Name, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	})
	return err
}
2.消費者

實例:這是普通隊列的消費

//Consumer 接受方法
func Consumer(exchange string, queueName string, callback Callback) {//建立連接
	conn, err := Connect()
	defer conn.Close()
	if err != nil {fmt.Println(err)
		return
	}
	//創(chuàng)建通道channel
	channel, err := conn.Channel()
	defer channel.Close()

	if err != nil {fmt.Println(err)
		return
	}

	//創(chuàng)建queue
	q, err := channel.QueueDeclare(
		queueName,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}
	//輸出
	msgs, err := channel.Consume(
		q.Name,
		"",
		false, //手動應答
		false,
		false,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}
	forever := make(chan bool)
	go func() {for d := range msgs {	s := BytesToString(&(d.Body))
			callback(*s)
			d.Ack(false)
		}
	}()
	fmt.Printf("Waiting for messages")
	<-forever
}

func BytesToString(b *[]byte) *string {s := bytes.NewBuffer(*b)
	r := s.String()
	return &r
}
func callback(s string) {fmt.Printf("msg:%s", s)
	return
}
二、rabbitmq的并發(fā)隊列(主題模式) 1.生產(chǎn)者

代碼如下(示例):

func PublishEx(exchange string, types string, routingKey string, body string) error {//建立連接
	conn, err := Connect()
	defer conn.Close()
	if err != nil {return err
	}
	//創(chuàng)建channel
	channel, err := conn.Channel()
	defer channel.Close()
	if err != nil {return err
	}

	//創(chuàng)建交換機
	err = channel.ExchangeDeclare(
		exchange,
		types,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {return err
	}

	err = channel.Publish(exchange, routingKey, false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	})
	return err
}
2.消費者

代碼如下(示例):

func ConsumerEx(exchange string, types string, routingKey string, callback Callback) {//建立連接
	conn, err := Connect()
	defer conn.Close()
	if err != nil {fmt.Println(err)
		return
	}
	//創(chuàng)建通道channel
	channel, err := conn.Channel()
	defer channel.Close()
	if err != nil {fmt.Println(err)
		return
	}

	//創(chuàng)建交換機
	err = channel.ExchangeDeclare(
		exchange,
		types,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	//創(chuàng)建隊列
	q, err := channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	//綁定
	err = channel.QueueBind(
		q.Name,
		routingKey,
		exchange,
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	msgs, err := channel.Consume(q.Name, "", false, false, false, false, nil)
	if err != nil {fmt.Println(err)
		return
	}

	forever := make(chan bool)
	go func() {for d := range msgs {	s := BytesToString(&(d.Body))
			callback(*s)
			d.Ack(false)
		}
	}()
	fmt.Printf("Waiting for messages\n")
	<-forever
}
   
三、rabbitmq的雙隊列(死信隊列) 1.生產(chǎn)者

代碼如下(示例):

func PublishDlx(exchangeA string, body string) error {//建立連接
	conn, err := Connect()
	if err != nil {return err
	}
	defer conn.Close()

	//創(chuàng)建一個Channel
	channel, err := conn.Channel()
	if err != nil {return err
	}
	defer channel.Close()

	//消息發(fā)送到A交換機
	err = channel.Publish(exchangeA, "", false, false, amqp.Publishing{DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
	})

	return err
}
2.消費者

代碼如下(示例):

func ConsumerDlx(exchangeA string, queueAName string, exchangeB string, queueBName string, ttl int, callback Callback) {//建立連接
	conn, err := Connect()
	if err != nil {fmt.Println(err)
		return
	}
	defer conn.Close()

	//創(chuàng)建一個Channel
	channel, err := conn.Channel()
	if err != nil {fmt.Println(err)
		return
	}
	defer channel.Close()

	//創(chuàng)建A交換機
	//創(chuàng)建A隊列
	//A交換機和A隊列綁定
	err = channel.ExchangeDeclare(
		exchangeA, // name
		"fanout",  // type
		true,      // durable
		false,     // auto-deleted
		false,     // internal
		false,     // no-wait
		nil,       // arguments
	)
	if err != nil {fmt.Println(err)
		return
	}

	//創(chuàng)建一個queue,指定消息過期時間,并且綁定過期以后發(fā)送到那個交換機
	queueA, err := channel.QueueDeclare(
		queueAName, // name
		true,       // durable
		false,      // delete when usused
		false,      // exclusive
		false,      // no-wait
		amqp.Table{	// 當消息過期時把消息發(fā)送到 exchangeB
			"x-dead-letter-exchange": exchangeB,
			"x-message-ttl":          ttl,
			//"x-dead-letter-queue" : queueBName,
			//"x-dead-letter-routing-key" :
		},
	)
	if err != nil {fmt.Println(err)
		return
	}

	//A交換機和A隊列綁定
	err = channel.QueueBind(
		queueA.Name, // queue name
		"",          // routing key
		exchangeA,   // exchange
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}
	//創(chuàng)建B交換機
	//創(chuàng)建B隊列
	//B交換機和B隊列綁定
	err = channel.ExchangeDeclare(
		exchangeB, // name
		"fanout",  // type
		true,      // durable
		false,     // auto-deleted
		false,     // internal
		false,     // no-wait
		nil,       // arguments
	)
	if err != nil {fmt.Println(err)
		return
	}

	//創(chuàng)建一個queue
	queueB, err := channel.QueueDeclare(
		queueBName, // name
		true,       // durable
		false,      // delete when usused
		false,      // exclusive
		false,      // no-wait
		nil,        // arguments
	)
	if err != nil {fmt.Println(err)
		return
	}

	//B交換機和B隊列綁定
	err = channel.QueueBind(
		queueB.Name, // queue name
		"",          // routing key
		exchangeB,   // exchange
		false,
		nil,
	)
	if err != nil {fmt.Println(err)
		return
	}

	msgs, err := channel.Consume(queueB.Name, "", false, false, false, false, nil)
	if err != nil {fmt.Println(err)
		return
	}

	forever := make(chan bool)
	go func() {for d := range msgs {	s := BytesToString(&(d.Body))
			callback(*s)
			d.Ack(false)
		}
	}()

	fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
	<-forever
}
   

剩下的統(tǒng)一調(diào)用消費者模板

package main

	import (
		"encoding/json"
		"fmt"
		"github.com/beego/beego/v2/client/orm"
		beego "github.com/beego/beego/v2/server/web"
		"github.com/garyburd/redigo/redis"
		"goApi/models"
		_ "goApi/routers"
		redisClient "goApi/services"
		"goApi/services/mq"
		"strconv"
	)
	
	func main() {beego.LoadAppConfig("ini", "../../conf/app.conf")
		//err := orm.RegisterDataBase("default", "mysql", "fukw:ipx4JtpXR6sCxmKt@tcp(127.0.0.1)/fukw?charset=utf8")
		err := orm.RegisterDataBase("default", "mysql", "root:root@tcp(127.0.0.1)/fukw?charset=utf8")
		if err != nil {	fmt.Println("連接數(shù)據(jù)庫失敗")
		}
	
		c, err := redis.Dial("tcp", "127.0.0.1:6379")
		if err != nil {	fmt.Println("redis連接失敗")
		}
		defer c.Close()
	
		mq.Consumer("", "fyouku_top", callback)
		fmt.Printf("mian執(zhí)行成功")
		beego.Run()
	}
	
	func callback(s string) {type Data struct {	VideoId int
		}
		var data Data
		err := json.Unmarshal([]byte(s), &data)
		videoInfo, err := models.GetVideoInfo(data.VideoId)
		if err == nil {	conn := redisClient.RedisConnect()
			defer conn.Close()
			//更新排行榜
			//執(zhí)行的代碼我這里是排行榜
			redisChannelKey := "video:top:channel:channelId:" + strconv.Itoa(videoInfo.ChannelId)
			redisTypeKey := "video:top:type:typeId:" + strconv.Itoa(videoInfo.TypeId)
			conn.Do("zincrby", redisChannelKey, 1, data.VideoId)
			conn.Do("zincrby", redisTypeKey, 1, data.VideoId)
		}
		fmt.Printf("msg is :%s\n", s)
	}

總結(jié)

這就是rabbitmq的3種隊列的的書寫形式

你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

當前名稱:rabbitmq的三種隊列以及使用方式(go)-創(chuàng)新互聯(lián)
文章源于:http://www.rwnh.cn/article18/csjigp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)網(wǎng)站改版、全網(wǎng)營銷推廣網(wǎng)站制作、網(wǎng)站內(nèi)鏈、品牌網(wǎng)站設(shè)計

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名
宣城市| 观塘区| 兰考县| 汕头市| 剑河县| 化德县| 马公市| 张家界市| 贵溪市| 拉孜县| 搜索| 大英县| 仪征市| 息烽县| 湖州市| 丹棱县| 加查县| 涿州市| 那曲县| 木里| 绿春县| 云阳县| 托克托县| 宜兴市| 类乌齐县| 汝阳县| 报价| 大庆市| 广东省| 宁国市| 涿鹿县| 建湖县| 鹰潭市| 葫芦岛市| 桂阳县| 汉源县| 托克托县| 中西区| 南溪县| 宜兴市| 龙江县|