澳门至尊网站-首页

您的位置:澳门至尊网站 > 技术教程 > 初识RabbitMQ体系之三

初识RabbitMQ体系之三

2019-10-23 02:26

网络参考大神们的博客,本身做了三个RabbitMQ即时发音讯的德姆o。

话十分少说,直接上代码!

 

风度翩翩爬山涉水搭建一个实施方案框架爬山涉水RabbitMQ_Demo

1.采取VS的NuGet安装包管理工科具安装RabbitMQ.Client:

个中带有4个部分跋山涉水的近义词

图片 1

1跋山涉水的近义词RabbitMQ 公用类库项目

 

2爬山涉水二个劳动者调节台项目

2.劳动者端代码:

3爬山涉水五个顾客调节台项目

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Producter
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName="localhost",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31         static void Main(string[] args)
32         {
33             DirectExchangeSendMsg();
34             Console.WriteLine("按任意键退出程序!");
35             Console.ReadKey();
36         }
37         /// <summary>
38         /// 单点精确路由模式
39         /// </summary>
40         private static void DirectExchangeSendMsg()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     var props = channel.CreateBasicProperties();
51                     props.Persistent = true;
52                     Console.WriteLine("请输入需要发送的消息:");
53                     string vadata = Console.ReadLine();
54                     while (vadata != "exit")
55                     {
56                         var msgBody = Encoding.UTF8.GetBytes(vadata);
57                         channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
58                         Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
59                         vadata = Console.ReadLine();
60                     }
61                 }
62             }
63         }
64     }
65 }

项目结构如图跋山涉水的近义词

3.主顾端代码爬山涉水

图片 2

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Consumer
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName = "127.0.0.1",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31 
32         static void Main(string[] args)
33         {
34             DirectAcceptExchange();
35 
36             Console.WriteLine("输入任意值退出程序!");
37             Console.ReadKey();
38         }
39 
40         private static void DirectAcceptExchange()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     while (true)
51                     {
52                         BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
53                         if (msgResponse != null)
54                         {
55                             var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
56                             Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
57                         }
58                         //System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
59                     }
60                 }
61             }
62         }
63     }
64 }

 

4.主次结果爬山涉水

二跋山涉水的近义词开采早先,要求引用RabbitMQ包

图片 3

设置相应的Nuget包,或然下载相关dll也足以,不过提议在线安装nuget,更低价

 

探求跋山涉水的近义词RabbitMQ.Client   安装新型版就能够,不知道怎么设置nuget,请移步 

注爬山涉水在率先步事先,你须要安装RabbitMQ客商端,可从

安装好以往,最早激动的写代码啦!

        然而RabbitMQ又是借助于Erlang OTP平台,所以,安装RabbitMQ此前,须要先从

 

        关于那旭日初升部分的开始和结果,推荐阅读跋山涉水的近义词

三爬山涉水达成RabbitMQ基本收发消息的功能

 

1:在RabbitMQ_Lib类库中新建类跋山涉水的近义词MyRabbitMQ.cs

  此德姆o只是‘direct’格局的音讯发送选拔格局。

图片 4图片 5

public class MyRabbitMQ
    {
        //连接工厂
        private ConnectionFactory factory { get; set; } = new ConnectionFactory
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        private IConnection connection { get; set; }

        private IModel channel { get; set; }

        //生产方 构造函数
        public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个转发器
            channel.ExchangeDeclare(exchangeName, exchangeType);
        }

        //消费方 构造函数
        public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个队列
            channel.QueueDeclare(queueName, true, false, false);

            //队列绑定
            channel.QueueBind(queueName, exchangeName, routingKey);
        }

        public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
        {
            channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
        }

        public QueueingBasicConsumer ReceiveMessage(string queueName = "")
        {
            //EventingBasicConsumer
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, true, consumer);
            return consumer;
        }
    }

View Code

能够见到,首先有五个构造函数,三个是给劳动者选用,三个是给买主应用,注意参数有所分歧,可以见到生产者与顾客关注的点是相当的小器晚成致的。

无论是生产或然开支,都以一个顾客端,都亟需成立二个RabbitMQ连接并创制贰个channel,才足以打开有关的操作,这么些操作都是由channel发起的,那样说应该比较白话了。

组织生产者的时候,主倘使创建多个转载器,转载器的名字及项目供给定义,

转载器常用类型包罗二种爬山涉水direct、fanout、topic,

这二种等级次序这里说的更清楚爬山涉水

本例子中是以topic为例子的

 

2:MQ_Producter项目中发送消息(生产者中发送音讯)

图片 6图片 7

class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "07281616_exchange_topic";
            string routingkeya = "0728.a.c.routingkey";
            string routingkeyb = "0728.b.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
            for (int i = 0; i < 3600; i++)
            {
                System.Threading.Thread.Sleep(1000);
                if (i % 2 == 0)
                {
                    var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                    myMQ.SendMessage(message, exchangeName, routingkeyb);
                }
                else
                {
                    var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeya}");
                    myMQ.SendMessage(message, exchangeName, routingkeya);
                }

            }
        }
    }

View Code

此地发送了3600次,每秒发二次消息,奇数和偶数发送的路由法则不等同,会有五个差别的客商端来抽取,那样便于大家测量试验新闻是不是被分发到了分裂的连串上

 

3爬山涉水几个顾客项目开展消息的吸收接纳

开销者大器晚成爬山涉水

图片 8图片 9

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281616_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
            }
        }
    }

View Code

 

买主二爬山涉水

图片 10图片 11

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281626_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.a.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
            }
        }
    }

View Code

 

三个客商分别收受不一样队列上的音讯

 

4:运行!

先编写翻译一下,到bin目录下先运营生产者,在运作多个买主

图片 12

 

也足以先关掉开支端,过7秒再关闭生产端,在web 管理分界面能够看见今后有2个种类里有消息,贰个3条贰个4条

图片 13

 

 四:总结

全部例子的享有代码都在那间了,代码里有关切释也很明白,是自己要好达成的率先个RabbitMQ收发成效,实际应用中必定能够有过多扩大,新手们有纠结或然本身了然的有窘迫的地点,烦请商议处提出哈,我们共同进步!

本文由澳门至尊网站发布于技术教程,转载请注明出处:初识RabbitMQ体系之三

关键词: