澳门至尊网站-首页

您的位置:澳门至尊网站 > 程序编程 > 新闻队列

新闻队列

2019-10-23 08:43

RabbitMQ队列

首先大家在讲rabbitMQ此前我们要说一下python里的queue跋山涉水的近义词二者干的职业是如出大器晚成辙的,都是队列,用于传递音信

在python的queue中有五个一个是线程queue,三个是进程queue(multiprocessing中的queue)。线程queue不能跨进度,用于多少个线程之间开展数据同步交互;进度queue只是用于父进度与子进程,也许同属于同意父进度下的四个子进程举行交互。也正是说即使是八个精光独立的主次,就算是python程序,也依旧无法用这一个进程queue来通信。那假诺我们有多个单身的python程序,分属于多个进度,也许是python和任何语言

安装:windows下

先是要求设置 Erlang遇到

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

然后安装RabbitMQ了 

率先下载RabbitMQ 的Windows版本

下载地址:

安装pika:

以前设置过了pip,直接展开cmd,运转pip install pika

安装实现之后,完成一个最简便的行列通讯爬山涉水

图片 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创设壹当中坚的socket,然后创建一个管道,在管道中发新闻,然后声雅培个queue,起个体系的名字,之后真正的发音讯(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运转就一向运行下去,他不停收一条,长久在此卡住。

在上头不管是produce依旧consume,里面都宣示了八个queue,那些是干什么吗?因为我们不掌握是主顾先最早运转照旧生产者先运营,那样只要未有注脚的话就能报错。

上边大家来看一下生机勃勃对多,即八个劳动者对应多个客户:

第风流倜傥大家运维3个客商,然后不断的用produce去发送数据,大家得以看来客户是通过风度翩翩种轮询的主意张开连发的承担多少,每种花费者花费二个。

那么如日中天旦我们客商收到了消息,然后管理那个信息要求30分钟,在拍卖的进度中,花费者断电了宕机了,那花费者还未曾拍卖完,大家设那一个职分大家不可以小看理完,那大家应有有贰个认可的音讯,说那一个义务到位了可能是还没达成,所以笔者的生产者要确认花费者是还是不是把那么些任务管理完了,花费者管理完事后要给这一个生产者服务器端发送八个承认音信,生产者才会把那么些任务从新闻队列中删除。若无拍卖完,成本者宕机了,未有给劳动者发送确认音讯,那就意味着还未有管理完,那大家看看rabbitMQ是怎么管理的

咱俩能够在客户的callback中增添一个time.sleep()进行模拟宕机。callback是一个回调函数,只要事件一触发就能够调用这些函数。函数施行完了就意味着信息管理完了,要是函数未有管理完,这就评释。。。。

咱俩得以看到在客商代码中的basic_consume()中有叁个参数叫no_ack=True,那一个意思是这条音讯是不是被拍卖完都不会发送确认音讯,日常我们不加那几个参数,rabbitMQ暗中认可就能够给你设置成音讯管理完了就活动发送确认,我们今日把那个参数去掉,並且在callback中增添一句话运营爬山涉水ch.basic_ack(delivery_tag=method.delivery_tag)(手动管理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

图片 2图片 3图片 4

运营的结果正是,笔者先运营贰次生产者,数据被开销者1抽取到了,可是本身把客户1宕机,截至运行,那么花费者2就收到了消息,即只要花费者绝非发送确认音信,生产者就不会把消息删除。

RabbitMQ消息长久化爬山涉水

我们能够变越来越许多的音讯队列,那我们怎么查看音信队列的状态呢跋山涉水的近义词rabbitmqctl.bat list_queues

图片 5

现行反革命的场所是,新闻队列中还会有音信,可是服务器宕机了,那那个消息就丢了,那小编就要求那个新闻强制的持久化跋山涉水的近义词

channel.queue_declare(queue='hello2',durable=True)

 

在历次评释队列的时候增进贰个durable参数(客商端和劳务器端都要加上这一个参数),

图片 6

在这里个情形下,大家把rabbitMQ服务注重启,开掘独有队列名留下了,可是队列中的新闻未有了,那样大家还要求在劳动者basic_publish中加多三个参数跋山涉水的近义词properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此就能够使得音信长久化

现行是一个劳动者对应七个买主,很公道的收发收发,然而其实的动静是,我们机器的布局是区别等的,有的配置是单核的黄金时代对配置是多核的,也许i7管理器管理4条新闻的时候和此外的Computer管理1条音讯的时日基本上,那差的微型电脑这里就汇集结音讯,而好的微处理器这里就能够造成闲置,在实际中做运转的,我们会在负载均衡中装置权重,何人的布局高权重高,职分就多一点,不过在rabbitMQ中,我们只做了二个大约的处理就足以兑现公道的新闻分发,你有多大的力量就管理多少音讯

即爬山涉水server端给客商端发送音讯的时候,先反省以往还只怕有多少音信,假设当前音讯并未有管理完毕,就不会发送给这些花费者新闻。假设当前的主顾绝非新闻就发送

其一只要求在开销者端实行改变加代码跋山涉水的近义词

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在转移二个consume2,在callback中sleep20秒来模拟

图片 7图片 8图片 9

自家先运维八个produce,被consume接收,然后在起步一个,就被consumer2接纳,不过因为consumer第22中学sleep20秒,处理慢,所以此时在开发银行produce,就又给了consume举行拍卖

 

PublishSubscrible(信息发表订阅)

方今都以1对1的出殡和安葬选取数据,那作者想1对多,想广播相近,生产者发送三个新闻,全体成本者都收到消息。那大家咋做吧?那个时候大家将在用到exchange了

exchange在豆蔻梢头端收音信,在另如日方升端就把音信放进queue,exchange必得正确的明亮收到的音讯要干什么,是还是不是合宜发到四个一定的queue如故发给大多queue,也许说把她放任,那些都被exchange的品类所定义

exchange在概念的时候是有品种的,以调节到底是那一个queue符合条件,可以接纳音讯:

fanout:全数bind到此exchange的queue都足以担负消息

direct跋山涉水的近义词通过rounroutingKey和exchange决定的老大唯黄金时代的queue能够收到消息

topic跋山涉水的近义词全部相符routingKey的routingKey所bind的queue能够采纳音讯

headers:通过headers来支配把音信发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

这边的exchange以前是空的,以后赋值log;在那地也远非注脚queue,广播无需写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在耗费者这里咱们有定义了八个queue,注意一下解说中的内容。不过大家在发送端未有声明queue,为啥发送端无需选用端需求吗?在consume里有贰个channel.queue_bind()函数,里面绑定了exchange转变器上,当然里面还索要三个queue_name

运行结果爬山涉水

图片 10图片 11图片 12图片 13

就也就是收音机同样,实时播报,打开八个买主,生产者发送一条数据,然后3个客商同期收纳到

有取舍的收受信息(exchange_type = direct)

RabbitMQ还援助依照首要字发送,即爬山涉水队列绑定关键字,发送者将数据依照重大字发送到信息exchange,exchange依据重大字剖断应该将数据发送到钦点的连串

图片 14

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

尤其紧凑的过滤(exchange_type=topic)

图片 15

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

以上都是服务器端发音信,顾客端收音讯,音信流是单向的,那如果大家想要发一条命令给长途的客商端去推行,然后想让客商端实践的结果回到,则这种格局叫做rpc

RabbitMQ RPC

图片 16

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身一个围堵情势,未有新闻就等候音讯,有新闻就收过来

self.connection.process_data_events()是一个非阻塞版的start_consuming,正是说发了叁个事物给顾客端,每过一点日子去反省有未有消息,若无音信,能够去干其余专门的工作

reply_to = self.callback_queue是用来收取反应队列的名字

corr_id = str(uuid.uuid4()),correlation_id第新生事物正在蒸蒸日上在客商端会通过uuid4生成,第二在劳务器端重临实践结果的时候也会传过来贰个,所以说假设服务器端发过来的correlation_id与协和的id相同,那么服务器端发出来的结果就明确是自个儿正要顾客端发过去的通令的施行结果。未来就八个服务器端三个客户端,不在乎缺人不认账。未来顾客端是非阻塞版的,大家能够不让它打字与印刷未有新闻,而是举行新的下令,这样就两条音讯,不自然按顺序实现,那我们就需求去明显种种重返的结果是哪个命令的试行结果。

总体的情势是这么的爬山涉水生产者发了一个限令给花费者,不晓得客商端何时回来,依旧要去收结果的,不过它又不想进去阻塞形式,想每过一段时间看这一个信息收回来没有,假使音讯收回来了,就象征收完了。 

运行结果跋山涉水的近义词

图片 17图片 18

服务器端开启,然后在起步顾客端,顾客端先是等待音信的发送,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

本文由澳门至尊网站发布于程序编程,转载请注明出处:新闻队列

关键词: