Rabbit安装文档
田玲  |  2014-04-28  |   |  1619次阅读

32Windows平台安装

系统环境:

操作系统

32win7系统 SP1

RabbitMQ版本

rabbitmq-server-3.0.0

安装步骤:

根据官方网站:http://www.rabbitmq.com/的描述,在Windows下的安装需要先安装相应环境,再在其上安装RabbitMQServer

安装Erlang

运行Erlang安装程序(Erlang windows Binary File),这里我们是用的是rabbitmq-server-3.0.0Erlang会出现在开始菜单中,其安装目录会在C:\Program Files

Note在官方网站下载的ErlangR15B03中,Windows Binary FileWindows 64 Bit Binary File都是同一个下载链接(都是64位的安装程序),这应该是网站本身的链接出错了。64位的无法在32位机器上成功安装。所以我这里改为下载R15B0232位安装程序。

设置环境变量

官方文档中提到,如果当前已经安装了一个RabbitMQ,并且这个消息代理作为一个服务运行,那么在新安装Erlang之前,需要先卸载这个服务,然后更新ERLANG_HOME这个环境变量。

如果是第一次安装,需要设置这个系统环境变量ERLANG_HOME为当前安装Erlang的目录。这里我安装在C盘,路径是C:\Program Files\RabbitMQ Server\rabbitmq_server-3.0.0。(注意这里的路径需要是bin直接包含bin文件夹的路径),同时我还添加到系统路径PATH  %ERLANG_HOME%\bin;(官方文档没有提到这一步,但是在参考文献【@@@@@1】中提到了)

安装RabbitMQ程序

双击执行安装程序,这个过程很快就可以完成。

安装完RabbitMQ后,需要设置相应的环境变量。

配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-2.8.0

 

      添加到PATH %RABBITMQ_SERVER%\sbin;

添加完成后可以测试RabbitMQ服务器。

 

现在已经完成RabbitMQ的安装。在测试过程中,出现了错误,Error unable to connect to node ‘rabbit@2012-20121026GU’:node down

考虑在安装Erlang的过程中,要求安装Visual 2010.Net框架,现在尝试。安装完后还是提示这个问题。

然后我重启了下系统,再照上面的步骤安装了一遍,就完成了,⊙﹏⊙b

启动RabbitMQ的管理程序

根据参考资料【@@@@@1】中的描述,需要安装plugin才能进入管理页面,对于windows系统,首先,启动管理插件:

rabbitmq-plugins enable rabbitmq_management

具体启动:

                                      mochiweb

                                      webmachine

                                      rabbitmq_mochiweb

                                      amqp_client

                                      rabbitmq_management_agent

                                      rabbitmq_management

查看安装:

                                      rabbitmq-plugins list

然后重启下服务(运行下列命令行):

rabbitmq-service.bat stop

rabbitmq-service.bat install(之前已经进行过这步的,可以不进行了)

rabbitmq-service.bat start

下面,我们就可以直接登录Web页面进行管理了。

RabbitMQ的使用

监控

浏览器访问localhost:55672  默认账号:guest  密码:guest

Rabbit安装文档

在登录框中输入用户名、密码(这里我用默认账户申请了一个自己的用户名和密码,可以管理自己对应的消息队列)

 

这里55672是默认端口号。

Rabbit安装文档

订阅/发布模型的客户端在接收到消息后,原本存在于Queue中的消息就消失了,类似于投递报纸一样。另外,当多个订阅方客户端与同一个发布方相连的话,多个订阅放也能正常接收到所有发布的消息。

 

在使用过发布方和订阅方的客户端程序以后,服务器这边的情况如下:

Rabbit安装文档

在交换机这一栏可以看到我们新创建的交换机ex1

Rabbit安装文档

下面是我们的队列

Rabbit安装文档

点击队列名进入该队列的管理界面,在这里可以手动添加以下消息。Rabbit安装文档

RabbitMQ C#使用

CSC使用

官方文档的建议是不要使用Visual环境,直接使用.net的编译器csc编译程序。

首先要找到csc.exe文件的路径,然后将其加入到环境变量的path里,不用重启。

    首先要安装好。NET Framwork SDK环境,一般安装了vs.net工具的 都已经可以了,路径:C\WINDOWS\Microsoft.NET\Framework\v2.0.50727

    1.然后在WINDOWS下的命令窗口中,进入到当前文件所在的路径下,使用csc命令编译一个或者多个源文件使之生成EXE文件,也可将一部分文件生成为EXE文件一部分生成为DLL文件。具体详见例子。

    2.如果按照原来的方法同时将两个文件编译进同一个exe时,便以一定会出错

    这时候我们再引入一个新的参数 /main(需要指定命名空间)

    在我们自己决定要使用哪个cs文件里的Main函数时 可以输入命令

    csc Program2.cs Person.cs /mainProgram2

    就是制定了以Program2里的Main函数为程序入口点了。

    3.接着我们再讲一个参数 /out 这个是给可执行文件命名的 比如:csc /outMyApp.exe Program2.cs Person.cs这样就可以生成一个叫做MyApp.exe的可执行文件。

    4.两个参数 /t /r

    /t 是指定编译生成dll还是exe,在第一种方法中的命令csc *.cs 其实是一个简写,完整的写法是csc /texe *.cs,由于/texe是默认的选项可以不写, 还要说一句 这里的exe文件都是控制台应用程序(其他的以后再说)

    /r 的作用可以看成是引用了一个dll文件,格式可以写成csc program.cs /r1.dll2.dll3.dll挂接多个dll,每个dll之间用分号 隔开。

/tlibrary 将源文件编译成DLL库例:csc .tlibrary student.cs /rstudent.dll 引入student联接库中的相关的类例:/rstudent.dll hello.cd

 

编译 File.cs 以产生File.exe csc File.cs 编译File.cs 以产生File.dll csc /target:library File.cs 编译File.cs 并创建My.exe csc /out:My.exe File.cs 通过使用优化和定义DEBUG 符号,编译当前目录中所有的C# 文件。输出为File2.exe csc /define:DEBUG /optimize /out:File2.exe *.cs 编译当前目录中所有的C# 文件,以产生File2.dll 的调试版本。不显示任何徽标和警告: csc /target:library /out:File2.dll /warn:0 /nologo /debug *.cs 将当前目录中所有的C# 文件编译为Something.xyz(一个DLL): csc /target:library /out:Something.xyz *.cs

编译 File.cs 以产生File.dllcsc /target:library File.cs这个就是我们使用最多的一个命令,其实可以简单的写成csc /t:library File.cs,另外的一个写法是 csc /out:mycodebehind.dll /t:library mycodebehind.cs,这个可以自己指定输出的文件名。 csc /out:mycodebehind.dll /t:library mycodebehind.cs mycodebehind2.cs,这个的作用是把两个cs文件装到一个.dll文件里,很有用哦。

使用.NET框架编写客户端程序

准备环境

这里使用的是rabbitmq-dotnet-client-2.4.1-dotnet-3.0.zip类库,解压该文件,可以得到如图的文件目录结构:

Rabbit安装文档

编写RabbitMQ的时候,需要将Bin目录下的RabbitMQ.Client.dllRabbitMQ.ServiceModel.dll复制到项目文件夹下,并在项目中添加相应的引用。

这部分库文件的接口已经在rabbitmq-dotnet-client-3.0.0-user-guide.pdfrabbitmq-dotnet-client-3.0.0-wcf-service-model.pdf(均可以在官方网站下载到)中描述了。接口描述在:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.0.0/rabbitmq-dotnet-client-3.0.0-client-htmldoc/html/index.html

编程环境选择的是VS 2008(现在开始启用VS 2010),其他环境应该稍有不同。建立项目的话,采用C#控制台程序。

客户端程序

 

根据需要实现的RabbitMQ的模型,需要有两类客户端:发布方客户端和消费者客户端。发布方客户端负责发布消息到服务器中,消费者从服务器获取消息。 Rabbit安装文档

上图给出了RabbitMQ订阅/发布方通过exchange交付消息的模型。在RabbitMQ中,exchange支持三种方式的路由(参考网站:http://melin.iteye.com/blog/691265

Direct Exchange  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog Rabbit安装文档

Fanout Exchange  不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

Rabbit安装文档

Topic Exchange  将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:

Rabbit安装文档

在这里的实验主要使用了DirectTopic两种方式。鉴于项目的需求,可能会考虑使用Direct的方式。

这里我将发布者客户端和消费者客户端创建在一个项目中(为了方便实验,后期可以分开进一步实验,现在已经分开实验),项目名称:RabbitMQTEst,添加之前说到的两个引用:

Rabbit安装文档

创建生产者客户端:ProducerMQ.cs

代码如下:

usingSystem;

usingSystem.Collections.Generic;

usingSystem.Linq;

usingSystem.Text;

usingRabbitMQ.ServiceModel;

usingRabbitMQ.Client;  //ConnectionFactory在这个里面定义的

usingRabbitMQ.Client.Content;

usingSystem.Collections;

 

namespaceRabbitMQTEst

{

    class ProducerMQ

    {

        public static void InitProducerMQ()

        {

            Uri uri = new Uri("amqp://127.0.0.1:5672/");

            string exchange = "ex1";

            string exchangeType = "direct";

            string routingKey = "m1";

            bool persistMode = true;

            ConnectionFactory cf = new ConnectionFactory();

 

            cf.UserName = "mazhiyuan";

            cf.Password = "19870305";

            cf.VirtualHost = "dnt_mq";

            //cf.RequestedHeartbeat = 0;

            cf.Endpoint = new AmqpTcpEndpoint(uri);

           

            Console.WriteLine(cf.Port);

            Console.WriteLine("ProducerMQ In 0nd using");

            using (IConnection conn = cf.CreateConnection())

            {

                Console.WriteLine(cf.Port);

                Console.WriteLine("ProducerMQ In 1nd using");

                using (IModel ch = conn.CreateModel())

                {

                   

                    if (exchangeType != null)

                    {

                        //ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);

                        ch.ExchangeDeclare(exchange, exchangeType);                       

                        //ch.QueueDeclare("q1",true,true,false,null);//true, true, true, false, false, null);

                        ch.QueueDeclare("q1", true, false, false, null);//参数分别是string queuename,bool durable,bool exclusive,bool autoDelete,IDictionary arguments

                        //ch.QueueBind("q1","ex1","m1",null);

                        ch.QueueBind("q1", exchange, "m1",null);

                    }

                   // ch.BasicPublish("","q1",null,Encoding.Default.GetBytes("hello world"));

                    Console.WriteLine("ProducerMQ In 2nd using");

                    

                    IMapMessageBuilder b = new MapMessageBuilder(ch);

                    IDictionary target = b.Headers;

                    target["header"] = "hello world";

                    IDictionary targetBody = b.Body;

                    targetBody["body"] = "mazhiyuan";

                    if (persistMode)

                    {

                        ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;

                    }

 

                    ch.BasicPublish(exchange, routingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody());

                    //ch.BasicPublish(exchange, routingKey, null, System.Text.Encoding.UTF8.GetBytes("Hello World")); //可以直接使用字符串信息

                }

            }

            //connection 结束

        }

    }

}

 

说明:rabbitmq交换方式分为三种,分别是:  Direct Exchange  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog   Fanout Exchange  不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。   Topic Exchange  将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”

 

这里需要注意的地方一个是Uri的编写,因为这里是本地,所以取的是127.0.0.1,如果服务器是在别处,应该放上其本身的IP地址。端口号默认是5672,自己可在配置中修改。

这里的User在配置的时候提前加载,在设置的时候,有几点要注意,主要是设置权限的时候,我最开始没有设置,这个时候,我无法通过Web界面访问RabbitMQ的页面,在修改为administrator后,重新用这个User登录后,才方便查看队列等信息。

Rabbit安装文档

在设置队列的时候,如上面的语句ch.QueueDeclare("q1", true, false, false, null);其中的几个bool类型的数据,最重要的是设置bool exclusive参数为false,否则这个队列是专用的,在实验中发现,连接关闭后,这样的队列会自动删除,导致取不到消息。

 

rabbitmq中,获取消息可以使用两种方式,一种是主动获取,另一种是基于订阅模式,即让当前获取消息的线程阻塞,用于绑定到指定的队列上,当有新的消息入队之后,该阻塞线程会被运行,从队列中获取新入队的消息。

 

消费者客户端代码如下:

usingSystem;

usingSystem.Collections.Generic;

usingSystem.Linq;

usingSystem.Text;

usingRabbitMQ.ServiceModel;

usingRabbitMQ.Client;

usingRabbitMQ.Client.Events;

 

namespaceRabbitMQTEst

{

    class CustmerMq

    {

        public static int InitCustmerMq()

        {

            string exchange = "ex1";

            string exchangeType = "direct";

            string routingKey = "m1";

 

            string serverAddress = "127.0.0.1:5672";

            ConnectionFactory cf = new ConnectionFactory();

            cf.Address = serverAddress;

            cf.UserName = "mazhiyuan";

            cf.Password = "19870305";

            cf.VirtualHost = "dnt_mq";

            //cf.RequestedHeartbeat = 0;

           

            using (IConnection conn = cf.CreateConnection())

            {

                using (IModel channel = conn.CreateModel())

                {

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);

                    channel.BasicConsume("q1", false, consumer);

                    while (true)

                    {

                        try

                        {

                            BasicDeliverEventArgs e = consumer.Queue.Dequeue() as BasicDeliverEventArgs; //handle the delivery

                            Console.WriteLine("Recieved message:" + Encoding.Default.GetString(e.Body));

                            //将消息从队列中删除

                            channel.BasicAck(e.DeliveryTag, false);

                        }

                        catch (System.IO.EndOfStreamException)

                        {

                            Console.WriteLine("The consumer was cancelled, the model closed, or the connection went away");

                            break;

                        }

                        catch (Exception ex)

                        {

                            Console.WriteLine(ex.Message);

                            break;

                        }

                    }

                }

            }

           

            return 0;

        }

    }

}

根据参考资料【@@@@@3】和实验的结果发现,RabbitMQ没有为发布者提供队列,而为每一个消费者提供一个队列来存储消息。在基于主题(topic)的消息订阅/发布中,目前观察,RabbitMQ是通过在声明交换机(exchange)的时候设定exchangeType来实现的,在exchangeType的模式为topic的时候,这意味着交换机将根据exchangeroutingKey来确定绑定其的Queue,进而投递消息。简单说来,除了Queueexchange相绑定外,Queue还需要有同样的routingKey才能获取消息。这样的routingKey实际上就可以在逻辑上代表一个主题。

要实现这样服务器能对多个消费者提供服务,只需要将exchange绑定多个Queue就行了。这里我实现了两个消费者ConsumerConsumer2。这里需要注意的就是,如果消费者先于服务器订阅,那么就要对这个消费者事先声明队列和需要绑定的交换机。

根据项目里的需求,现在的实现方式是:

发布方:只声明一个交换机,没有对应的Queue

消费方:一个队列对应一个消费者。

 

结果说明

Rabbit安装文档

这里直接发送的是字符串信息 Hello World。确认已经能收到。

Rabbit安装文档

这里我通过消息队列发送了一次消息,又通过Web页面手动发布了两条消息,在客户端,这两条消息都正确地接收到了。

 

后续

RabbitMQ这里的实现的服务器是通过一个类似键值对的方式发布的数据,而在获取方,则是通过e.body获取的。根据参考文章http://www.cnblogs.com/7ero/articles/450233.html的例子,我们也可以对XML文件进行传输:

Rabbit安装文档

这里转化为byte[] data的字节数组可以直接用Public中提到的方法传递过去。至于其他方式,还有待进一步探讨。

从指定地点读取XML文件:http://developer.51cto.com/art/200610/32906.htm

传输XML的过程

在原来的客户端基础上,添加XML的读取和传输:

主要在程序PubMQSubMQ两个客户端中

在发布方PubMQ中,当准备好传输后,将原来的

ch.BasicPublish(exchange,routingKey,null,System.Text.Encoding.UTF8.GetBytes(“hello world”));

 

替换成

//Á?À?XMLä?º?

                    XmlDocument doc = new XmlDocument();

                    doc.Load("C:/Users/Administrator/Documents/Visual Studio 2010/Projects/RabbitMQPubClient/desitination.xml");

                    Console.Write(doc.OuterXml.ToString());

                    //a¤?®DÌ?¨º¬aê?¨¢?¤¡äÌ?aº?XMLÌ?¤¨¤¨ªê??®D??ºyY?D¨¢¨?

                    ch.BasicPublish(exchange,routingKey,null,System.Text.Encoding.UTF8.GetBytes(doc.OuterXml.ToString()));

在订阅方,这边的代码添加XML的存储:

//a?XML?t,a2?®DÌ?¨º¬a   //?¨²¨¦°?¢?

                                    Console.WriteLine("Recieved message:" + System.Text.Encoding.UTF8.GetString(e.Body));

                                    XMLContent = System.Text.Encoding.UTF8.GetString(e.Body);

                                    File.WriteAllText("Test.xml", XMLContent);

这里默认存在当前目录下,名为Test.xml的文件。

后期应该能进一步优化流程,做到程序能自动选择路径

RabbitMQ的消息队列

RabbitMQActiveMQ的生产者流量控制:

http://www.cnblogs.com/zhengyun_ustc/archive/2012/08/25/flowcontrol.html

一些队列理论、吞吐量、延迟和带宽:

http://datalj.blog.163.com/blog/static/3422303120124168474734/

参考资料:

【1】       win7安装RabbitMQhttp://hi.baidu.com/gavin_mars/item/b9b1972072b85e8d6e2cc391

【2】       RabbitMQ C#博客:http://blog.csdn.net/daizhj/article/category/744927

【3】       .NetRabbitMQ的使用:http://www.cnblogs.com/haoxinyue/archive/2012/09/27/2705660.html


文章原载于作者的文章,所述内容属作者个人观点,不代表本平台立场。
本文经过系统重新排版,阅读原内容可点击 阅读原文