(一)基本概念
成都創(chuàng)新互聯(lián)公司專注于瀏陽網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供瀏陽營銷型網(wǎng)站建設(shè),瀏陽網(wǎng)站制作、瀏陽網(wǎng)頁設(shè)計(jì)、瀏陽網(wǎng)站官網(wǎng)定制、重慶小程序開發(fā)公司服務(wù),打造瀏陽網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供瀏陽網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。RabbitMQ 是流行的開源消息隊(duì)列系統(tǒng),用 erlang 語言開發(fā)。我曾經(jīng)對這門語言挺有興趣,學(xué)過一段時(shí)間,后來沒堅(jiān)持。RabbitMQ 是 AMQP(高級消息隊(duì)列協(xié)議)的標(biāo)準(zhǔn)實(shí)現(xiàn)。如果不熟悉 AMQP,直接看 RabbitMQ 的文檔會比較困難。不過它也只有幾個(gè)關(guān)鍵概念,這里簡單介紹。
RabbitMQ 的結(jié)構(gòu)圖如下:
幾個(gè)概念說明:
Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
Queue:消息隊(duì)列載體,每個(gè)消息都會被投入到一個(gè)或多個(gè)隊(duì)列。
Binding:綁定,它的作用就是把 exchange 和 queue 按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange 根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個(gè) broker 里可以開設(shè)多個(gè) vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè) channel,每個(gè) channel 代表一個(gè)會話任務(wù)。
消息隊(duì)列的使用過程大概如下:
(1)客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè) channel 。
(2)客戶端聲明一個(gè) exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個(gè) queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用 routing key(本質(zhì)上是 binding key,后文已做相應(yīng)糾正),在 exchange 和 queue 之間建立好綁定關(guān)系。
(5)客戶端投遞消息到 exchange 。
exchange 接收到消息后,就根據(jù)消息的 routing key 和已經(jīng)設(shè)置的 binding 關(guān)系,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。
exchange 也有幾個(gè)類型:
完全根據(jù) routing key 進(jìn)行投遞的叫做 direct 交換機(jī)。例如,綁定時(shí)設(shè)置了 binding key 為 "abc",那么客戶端提交的消息,只有設(shè)置了routing key 為 "abc" 的才會被投遞到該隊(duì)列。 對 routing key 進(jìn)行模式匹配后進(jìn)行投遞的叫做 topic 交換機(jī)。符號 "#" 匹配一個(gè)或多個(gè)詞,符號 "*" 匹配正好一個(gè)詞。例如 "abc.#" 可以匹配 "abc.def.ghi" ,"abc.*" 只能匹配 "abc.def" 。 還有一種不需要 routing key 的,叫做 fanout 交換機(jī)。它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。RabbitMQ 支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊(duì)列持久化包括 3 個(gè)部分:
(1)exchange 持久化,在聲明時(shí)指定 durable => 1
(2)queue 持久化,在聲明時(shí)指定 durable => 1
(3)消息持久化,在投遞時(shí)指定 delivery_mode => 2(1 是非持久化)
如果 exchange 和 queue 都是持久化的,那么它們之間的 binding 也是持久化的。
(二)應(yīng)用實(shí)際
使用 Linux 服務(wù)器(ubuntu 9.10 64位),安裝 RabbitMQ 非常方便。
先運(yùn)行如下命令安裝 erlang:
# apt-get install erlang-nox
再到rabbitmq.com上下載RabbitMQ的安裝包,如下安裝:
# dpkg -i rabbitmq-server_2.6.1-1_all.deb
安裝完后,使用
# /etc/init.d/rabbitmq-server start|stop|restart
來啟動、停止、重啟 rabbitmq。
在正式應(yīng)用之前,我們先在 RabbitMQ 里創(chuàng)建一個(gè) vhost ,加一個(gè)用戶,并設(shè)置該用戶的權(quán)限。
使用 rabbitmqctl 客戶端工具,在根目錄下創(chuàng)建 "/pyhtest" 這個(gè) vhost :
# rabbitmqctl add_vhost /pyhtest
創(chuàng)建一個(gè)用戶名 "pyh" ,設(shè)置密碼 "pyh1234" :
# rabbitmqctl add_user pyh pyh1234
設(shè)置pyh用戶對/pyhtest這個(gè)vhost擁有全部權(quán)限:
# rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”
后面三個(gè)”*”代表pyh用戶擁有對/pyhtest的配置、寫、讀全部權(quán)限
設(shè)置好后,開始編程,我用 Perl 寫一個(gè)消息投遞程序(producer):
#!/usr/bin/perl use strict; use Net::RabbitMQ; use UUID::Tiny; my $channel = 1000; # channel ID,可以隨意指定,只要不沖突 my $queuename = “pyh_queue”; # 隊(duì)列名 my $exchange = “pyh_exchange”; # 交換機(jī)名 my $routing_key = “test”; # routing key my $mq = Net::RabbitMQ->new(); # 創(chuàng)建一個(gè)RabbitMQ對象 $mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立連接 $mq->channel_open($channel); # 打開一個(gè)channel $mq->exchange_declare($channel, $exchange, {durable => 1}); # 聲明一個(gè)持久化的交換機(jī) $mq->queue_declare($channel, $queuename, {durable => 1}); # 聲明一個(gè)持久化的隊(duì)列 $mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機(jī)和隊(duì)列間建立綁定 for (my $i=0;$i<10000000;$i++) { # 循環(huán)1000萬次 my $string = create_UUID_as_string(UUID_V1); # 產(chǎn)生一條UUID作為消息主體 $mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 將消息結(jié)合key以持久化模式投遞到交換機(jī) } $mq->disconnect(); # 斷開連接
消息接受程序(consumer)大概如下:
#!/usr/bin/perl use strict; use Net::RabbitMQ; my $channel = 1001; my $queuename = “pyh_queue”; my $mq = Net::RabbitMQ->new(); $mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ }); $mq->channel_open($channel); while (1) { my $hashref = $mq->get($channel, $queuename); last unless defined $hashref; print $hashref->{message_count}, “: “, $hashref->{body},”n”; } $mq->disconnect();
consumer 連接后只要指定隊(duì)列就可獲取到消息。
上述程序共投遞 1000 萬條消息,每條消息 36 字節(jié)(UUID),打開持久化,共耗時(shí) 17 分多鐘(包括產(chǎn)生 UUID 的時(shí)間),每秒投遞消息約 9500 條。測試機(jī)器是 8G 內(nèi)存、8 核志強(qiáng) CPU 。
投遞完后,在 /var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent 目錄,產(chǎn)生 2G 多的持久化消息數(shù)據(jù)。在運(yùn)行 consumer 程序后,這些數(shù)據(jù)都會消失,因?yàn)橄⒁呀?jīng)被消費(fèi)了。
分享題目:【轉(zhuǎn)載】消息隊(duì)列RabbitMQ入門介紹
標(biāo)題路徑:http://www.rwnh.cn/article34/cjphse.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、企業(yè)網(wǎng)站制作、定制開發(fā)、搜索引擎優(yōu)化、小程序開發(fā)、企業(yè)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)