中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

[RabbitMQ]RabbitMQ簡介及Java示例

RabbitMQ簡介

目前RabbitMQ是AMQP 0-9-1(高級(jí)消息隊(duì)列協(xié)議)的一個(gè)實(shí)現(xiàn),使用Erlang語言編寫,利用了Erlang的分布式特性。

十多年的萬榮網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都營銷網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整萬榮建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“萬榮網(wǎng)站設(shè)計(jì)”,“萬榮網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

概念介紹: Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。 Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。 Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。 Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。 Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。 vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。 producer:消息生產(chǎn)者,就是投遞消息的程序。 consumer:消息消費(fèi)者,就是接受消息的程序。 channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。

使用流程

AMQP模型中,消息在producer中產(chǎn)生,發(fā)送到MQ的exchange上,exchange根據(jù)配置的路由方式發(fā)到相應(yīng)的Queue上,Queue又將消息發(fā)送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊(duì)列的使用過程大概如下:

客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel。 客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性。 客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性。 客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。 客戶端投遞消息到exchange。

exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。 exchange也有幾個(gè)類型,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會(huì)投遞到隊(duì)列。

Exchange類型

Exchange路由消息的集中類型:

名稱

默認(rèn)的預(yù)先定義exchange名字

作用描述

Direct exchange

(Empty string) and amq.direct

根據(jù)Binding指定的Routing Key,將符合Key的消息發(fā)送到Binding的Queue

Fanout exchange

amq.fanout

將同一個(gè)message發(fā)送到所有同該Exchange bingding的queue

Topic exchange

amq.topic

根據(jù)Binding指定的Routing Key,Exchange對(duì)key進(jìn)行模式匹配后路由到相應(yīng)的Queue,模式匹配時(shí)符號(hào)”#”匹配一個(gè)或多個(gè)詞,符號(hào)”*”匹配正好一個(gè)詞。

Headers exchange

amq.match (and amq.headers in RabbitMQ)

同direct exchange類似,不同之處是不再使用Routing Key路由,而是使用headers(message attributes)進(jìn)行匹配路由到指定Queue。

參考:

http://www.choudan.net/2013/07/25/OpenStack-RabbitMQ(%E4%B8%80).html

http://stephansun.iteye.com/blog/1452853

http://www.diggerplus.org/archives/3110

http://backend.blog.163.com/blog/static/202294126201322563245975/

http://lynnkong.iteye.com/blog/1699684

特性: broker的持久化:exchange和queue聲明為durable時(shí),exchange和queue的配置會(huì)在服務(wù)端磁盤保存起來,這樣在服務(wù)停掉重啟后,exchange和queue以及其相應(yīng)的binding等配置不會(huì)丟失; message的持久化:當(dāng)message的deliver mode attribute(message properties)設(shè)置為2時(shí),每個(gè)未被消費(fèi)的message將被保存在磁盤中,在服務(wù)重啟后仍能保存。
message在文件中的保存參考:http://my.oschina.net/hncscwc/blog/182083 cluster:RabbitMQ支持多個(gè)nodes(每個(gè)nodes是一個(gè)RabbitMQ實(shí)例)組成一個(gè)cluster,訪問cluster中的任意一個(gè)node的效果是相同的,也就是說任何一個(gè)message都可以在任意一個(gè)nodes上生產(chǎn)和消費(fèi)(生產(chǎn)或消費(fèi)的message會(huì)在nodes間中轉(zhuǎn))。 mirrored-queue:RabbitMQ在cluster的基礎(chǔ)上,支持同一個(gè)queue的message同時(shí)存儲(chǔ)在多個(gè)nodes上,這樣當(dāng)部分節(jié)點(diǎn)失效時(shí),可保證message和broker的配置不丟失。
安裝與配置 1.安裝erlang

sudo apt-get install tk tcl unixODBC erlang sudo vim /etc/profile 添加export PATH=$PATH:/usr/lib/erlang/bin/ 2.安裝rabbitmq

sudo apt-get install rabbitmq-server sudo vim /etc/profile 添加export PATH=$PATH:/usr/lib/rabbitmq/bin source /etc/profile

rabbitmq的基本配置(端口等)參考:http://my.oschina.net/hncscwc/blog/302339

3.用戶與權(quán)限

在正式應(yīng)用之前,我們先在RabbitMQ里創(chuàng)建一個(gè)vhost,加一個(gè)用戶,并設(shè)置該用戶的權(quán)限。使用rabbitmqctl客戶端工具,在根目錄下創(chuàng)建”/mq_test”這個(gè)vhost:

rabbitmqctl add_vhost /mq_test

創(chuàng)建一個(gè)用戶名”test”,設(shè)置密碼”test123″:

rabbitmqctl add_user test test123

設(shè)置pyh用戶對(duì)/pyhtest這個(gè)vhost擁有全部權(quán)限:

rabbitmqctl set_permissions -p /mq_test test “.*” “.*” “.*”、

后面三個(gè)”*”代表pyh用戶擁有對(duì)/pyhtest的配置、寫、讀全部權(quán)限

參考:http://my.oschina.net/hncscwc/blog/262246

4.配置開啟web管理插件

cat <<EOF>> /etc/rabbitmq/enabled_plugins [rabbitmq_management]. EOF

可以通過http://localhost:15672/ 查看運(yùn)行情況

5.啟動(dòng)

使用root權(quán)限運(yùn)行rabbitmq-server 或使用/etc/init.d/rabbitmq-server start|restart|stop

6.在一臺(tái)機(jī)器上啟動(dòng)多個(gè)節(jié)點(diǎn)(模擬集群)

vim start_rabbitmq_cluster.sh

添加以下內(nèi)容

#!/bin/bash if [[ $# != 1 ]] then echo "Usage: $0 process_num" exit 1 fi HOST_NAME=`hostname` START_NUM=0 PROCESS_NUM=$1 END_NUM=$(( START_NUM + PROCESS_NUM - 1)) RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME="rabbit" RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" rabbitmq-server -detached for (( i=$((START_NUM+1)); i<=$END_NUM; i++ )) do RABBITMQ_PROT=$(( i + 5672 )) MANAGE_PORT=$(( i + 15672 )) NODE_NAME="rabbit_$i" echo $RABBITMQ_PROT echo $MANAGE_PORT echo $NODE_NAME RABBITMQ_NODE_PORT=$RABBITMQ_PROT RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,$MANAGE_PORT}]" RABBITMQ_NODENAME=$NODE_NAME rabbitmq-server -detached sleep 3 rabbitmqctl -n $NODE_NAME stop_app rabbitmqctl -n $NODE_NAME reset echo "join cluster" rabbitmqctl -n $NODE_NAME join_cluster rabbit@$HOST_NAME rabbitmqctl -n $NODE_NAME start_app done rabbitmqctl cluster_status -n rabbit

運(yùn)行

chmod a+x start_rabbitmq_cluster.sh start_rabbitmq_cluster.sh 3

啟動(dòng)后可以通過rabbitmqctl -n rabbit cluster_status查看集群節(jié)點(diǎn)配置情況,或者在web管理頁面中查看

7.在多臺(tái)機(jī)器上建立集群

首先在主節(jié)點(diǎn)上啟動(dòng)服務(wù)

然后將其他機(jī)器的rabbitmq加入集群

1.將主服務(wù)器的/var/log/rabbitmq/.erlang.cookie 拷貝到新節(jié)點(diǎn) 2.在新節(jié)點(diǎn)上將文件所有人更改為rabbitmq,注意保持文件權(quán)限為所有者只讀,其他人無權(quán)限 chown rabbitmq.rabbitmq /var/log/rabbitmq/.erlang.cookie 3.在新節(jié)點(diǎn)上加入集群、 /etc/init.d/rabbitmq-server start rabbitmqctl -n rabbit stop_app rabbitmqctl -n rabbit reset rabbitmqctl -n rabbit join_cluster rabbit@$MASTER_NODE rabbitmqctl -n rabbit start_app

8.配置network partion時(shí)的處理方式

cat<<EOF>> /usr/local/rabbitmq/rabbitmq_server-3.1.0/etc/rabbitmq/rabbitmq.conf [ {rabbit, [{cluster_partition_handling, pause_minority}]} ]. EOF

參考:http://my.oschina.net/hncscwc/blog/174417

Java代碼示例

首先在項(xiàng)目中添加maven依賴

<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.2.2</version> </dependency> </dependencies>

Producer

import com.rabbitmq.client.*; import com.sun.deploy.util.StringUtils; import java.io.IOException; import java.lang.String; import java.lang.System; import java.util.HashMap; import java.util.Map; import java.util.Scanner; public class Producer { //exchange type public enum XT { DEFAULT, DIRECT, TOPIC, HEADERS, FANOUT } private static final String QUEUE_NAME = "log2"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //使用默認(rèn)端口連接本地rabbitmq服務(wù)器 Connection connection = factory.newConnection(); //聲明一個(gè)連接 Channel channel = connection.createChannel(); //聲明消息通道 //exchange類型 參考:http://stephansun.iteye.com/blog/1452853 XT xt = XT.HEADERS; switch (xt) { case DEFAULT: //默認(rèn),向指定的隊(duì)列發(fā)送消息,消息只會(huì)被一個(gè)consumer處理,多個(gè)消費(fèi)者消息會(huì)輪訓(xùn)處理,消息發(fā)送時(shí)如果沒有consumer,消息不會(huì)丟失 //為消息通道綁定一個(gè)隊(duì)列 //隊(duì)列的相關(guān)參數(shù)需要與第一次定義該隊(duì)列時(shí)相同,否則會(huì)出錯(cuò) //參數(shù)1:隊(duì)列名稱 //參數(shù)2:為true時(shí)server重啟隊(duì)列不會(huì)消失 //參數(shù)3:隊(duì)列是否是獨(dú)占的,如果為true只能被一個(gè)connection使用,其他連接建立時(shí)會(huì)拋出異常 //參數(shù)4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除(沒有連接,并且沒有未處理的消息) //參數(shù)5:建立隊(duì)列時(shí)的其他參數(shù) channel.queueDeclare(QUEUE_NAME, true, false, true, null); while (GetInputString()) { //向server發(fā)布一條消息 //參數(shù)1:exchange名字,若為空則使用默認(rèn)的exchange //參數(shù)2:routing key //參數(shù)3:其他的屬性 //參數(shù)4:消息體 //RabbitMQ默認(rèn)有一個(gè)exchange,叫default exchange,它用一個(gè)空字符串表示,它是direct exchange類型, //任何發(fā)往這個(gè)exchange的消息都會(huì)被路由到routing key的名字對(duì)應(yīng)的隊(duì)列上,如果沒有對(duì)應(yīng)的隊(duì)列,則消息會(huì)被丟棄 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //設(shè)置消息為持久化,服務(wù)器重啟不會(huì)丟失 System.out.println("Send " + message); } break; case FANOUT: //廣播給所有隊(duì)列 接收方也必須通過fanout交換機(jī)獲取消息,所有連接到該交換機(jī)的consumer均可獲取消息 //如果producer在發(fā)布消息時(shí)沒有consumer在監(jiān)聽,消息將被丟棄 //定義一個(gè)交換機(jī) //參數(shù)1:交換機(jī)名稱 //參數(shù)2:交換機(jī)類型 //參數(shù)3:交換機(jī)持久性,如果為true則服務(wù)器重啟時(shí)不會(huì)丟失 //參數(shù)4:交換機(jī)在不被使用時(shí)是否刪除 //參數(shù)5:交換機(jī)的其他屬性 channel.exchangeDeclare(XCHG_NAME, "fanout", true, true, null); while (GetInputString()) { //發(fā)送一條廣播消息,參數(shù)2此時(shí)無意義 channel.basicPublish(XCHG_NAME, "", null, message.getBytes()); System.out.println("Send " + message); } break; case DIRECT: //向所有綁定了相應(yīng)routing key的隊(duì)列發(fā)送消息 //如果producer在發(fā)布消息時(shí)沒有consumer在監(jiān)聽,消息將被丟棄 //如果有多個(gè)consumer監(jiān)聽了相同的routing key 則他們都會(huì)受到消息 channel.exchangeDeclare(XCHG_NAME, "direct", true, true, null); while (GetInputString()) { //input like : info message String[] temp = StringUtils.splitString(message, " "); channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes()); System.out.println("Send " + message); } break; case TOPIC: //與direct模式有類似之處,都使用routing key作為路由 //不同之處在于direct模式只能指定固定的字符串,而topic可以指定一個(gè)字符串模式 channel.exchangeDeclare(XCHG_NAME, "topic", true, true, null); while (GetInputString()) { //input like : topic message String[] temp = StringUtils.splitString(message, " "); channel.basicPublish(XCHG_NAME, temp[0], null, temp[1].getBytes()); System.out.println("Send " + message); } break; case HEADERS: //與topic和direct有一定相似之處,但不是通過routing key來路由消息 //通過headers中詞的匹配來進(jìn)行路由 channel.exchangeDeclare(XCHG_NAME, "headers", true, true, null); while (GetInputString()) { //input like : headers message String[] temp = StringUtils.splitString(message, " "); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("name", temp[0]); //定義headers headers.put("sex", temp[1]); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish(XCHG_NAME, "", builder.build(), temp[2].getBytes()); //根據(jù)headers路由到相應(yīng)的consumer System.out.println("Send " + message); } break; } channel.close(); connection.close(); } private static boolean GetInputString() { message = scanner.nextLine(); if (message.length() == 0) return false; return true; } private static Scanner scanner = new Scanner(System.in); private static String message = ""; public static String XCHG_NAME = "xchg3"; }

Consumer

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class Consumer { private static final String QUEUE_NAME = "log2"; public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = QUEUE_NAME; Producer.XT xt = Producer.XT.HEADERS; switch (xt) { case DEFAULT: //隊(duì)列的相關(guān)參數(shù)需要與第一次定義該隊(duì)列時(shí)相同,否則會(huì)出錯(cuò),使用channel.queueDeclarePassive()可只被動(dòng)綁定已有隊(duì)列,而不創(chuàng)建 channel.queueDeclare(queueName, true, false, true, null); break; case FANOUT: //接收端也聲明一個(gè)fanout交換機(jī) channel.exchangeDeclare(Producer.XCHG_NAME, "fanout", true, true, null); //channel.exchangeDeclarePassive() 可以使用該函數(shù)使用一個(gè)已經(jīng)建立的exchange //聲明一個(gè)臨時(shí)隊(duì)列,該隊(duì)列會(huì)在使用完比后自動(dòng)銷毀 queueName = channel.queueDeclare().getQueue(); //將隊(duì)列綁定到交換機(jī),參數(shù)3無意義此時(shí) channel.queueBind(queueName, Producer.XCHG_NAME, ""); break; case DIRECT: channel.exchangeDeclare(Producer.XCHG_NAME, "direct", true, true, null); queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, Producer.XCHG_NAME, "info"); //綁定一個(gè)routing key,可以綁定多個(gè) channel.queueBind(queueName, Producer.XCHG_NAME, "warning"); break; case TOPIC: channel.exchangeDeclare(Producer.XCHG_NAME, "topic", true, true, null); queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, Producer.XCHG_NAME, "warning.#"); //監(jiān)聽兩種模式 #匹配一個(gè)或多個(gè)單詞 *匹配一個(gè)單詞 channel.queueBind(queueName, Producer.XCHG_NAME, "*.blue"); break; case HEADERS: channel.exchangeDeclare(Producer.XCHG_NAME, "headers", true, true, null); queueName = channel.queueDeclare().getQueue(); Map<String, Object> headers = new HashMap<String, Object>() {{ put("name", "test"); put("sex", "male"); put("x-match", "any");//all==匹配所有條件,any==匹配任意條件 }}; channel.queueBind(queueName, Producer.XCHG_NAME, "", headers); break; } // 在同一時(shí)間不要給一個(gè)worker一個(gè)以上的消息。 // 不要將一個(gè)新的消息分發(fā)給worker知道它處理完了并且返回了前一個(gè)消息的通知標(biāo)志(acknowledged) // 替代的,消息將會(huì)分發(fā)給下一個(gè)不忙的worker。 channel.basicQos(1); //server push消息時(shí)的隊(duì)列長度 //用來緩存服務(wù)器推送過來的消息 QueueingConsumer consumer = new QueueingConsumer(channel); //為channel聲明一個(gè)consumer,服務(wù)器會(huì)推送消息 //參數(shù)1:隊(duì)列名稱 //參數(shù)2:是否發(fā)送ack包,不發(fā)送ack消息會(huì)持續(xù)在服務(wù)端保存,直到收到ack。 可以通過channel.basicAck手動(dòng)回復(fù)ack //參數(shù)3:消費(fèi)者 channel.basicConsume(queueName, false, consumer); //channel.basicGet() //使用該函數(shù)主動(dòng)去服務(wù)器檢索是否有新消息,而不是等待服務(wù)器推送 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("Received " + new String(delivery.getBody())); //回復(fù)ack包,如果不回復(fù),消息不會(huì)在服務(wù)器刪除 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //channel.basicReject(); channel.basicNack(); //可以通過這兩個(gè)函數(shù)拒絕消息,可以指定消息在服務(wù)器刪除還是繼續(xù)投遞給其他消費(fèi)者 } } }

本文題目:[RabbitMQ]RabbitMQ簡介及Java示例
本文來源:http://www.rwnh.cn/article32/cpgepc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、網(wǎng)站制作網(wǎng)站改版、網(wǎng)站設(shè)計(jì)公司、響應(yīng)式網(wǎng)站、定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

營銷型網(wǎng)站建設(shè)
龙川县| 沅陵县| 香格里拉县| 丹江口市| 东乡县| 江陵县| 宿州市| 柞水县| 隆林| 方正县| 五家渠市| 云霄县| 夏津县| 大姚县| 吉水县| 普陀区| 大田县| 万山特区| 仙桃市| 珠海市| 铁力市| 凤台县| 乌海市| 青冈县| 丹江口市| 五寨县| 南康市| 清河县| 延吉市| 延长县| 昌江| 濮阳县| 广宗县| 海兴县| 安丘市| 临沂市| 莆田市| 永州市| 嘉黎县| 广元市| 中江县|