欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

python對RabbitMQ的簡單入門使用教程

 更新時間:2022年06月27日 09:20:17   作者:想個名字蒸難  
RabbitMq是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理中間件,下面這篇文章主要給大家介紹了關(guān)于python對RabbitMQ的簡單入門使用,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下

(一)RabbitMQ的簡介

RabbitMq 是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理中間件。消息隊列是一種應(yīng)用程序?qū)?yīng)用程序的通行方式,應(yīng)用程序通過寫消息,將消息傳遞于隊列,由另一應(yīng)用程序讀取 完成通信。而作為中間件的 RabbitMq 無疑是目前最流行的消息隊列之一。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

RabbitMQ總體架構(gòu)

PS:生產(chǎn)者和消費者可能在不同的程序或主機(jī)中,當(dāng)然也有可能一個程序有可能既是生產(chǎn)者,也是消費者。

RabbitMq 應(yīng)用場景廣泛:

1.系統(tǒng)的高可用:日常生活當(dāng)中各種商城秒殺,高流量,高并發(fā)的場景。當(dāng)服務(wù)器接收到如此大量請求處理業(yè)務(wù)時,有宕機(jī)的風(fēng)險。某些業(yè)務(wù)可能極其復(fù)雜,但這部分不是高時效性,不需要立即反饋給用戶,我們可以將這部分處理請求拋給隊列,讓程序后置去處理,減輕服務(wù)器在高并發(fā)場景下的壓力。

2.分布式系統(tǒng),集成系統(tǒng),子系統(tǒng)之間的對接,以及架構(gòu)設(shè)計中常常需要考慮消息隊列的應(yīng)用。

(二)RabbitMQ的安裝

apt-get update
apt-get install erlang
apt-get install rabbitmq-server


#啟動rabbitmq: service rabbitmq-server start
#停止rabbitmq: service rabbitmq-server stop
#重啟rabbitmq: service rabbitmq-server restart

#啟動rabbitmq插件:rabbitmq-plugins enable rabbitmq_management

啟用rabbitmq_management插件后就可以登錄后臺管理頁面了,瀏覽器輸入ip:15672

自帶的密碼和用戶名都是guest,但是只能本機(jī)登錄

所以下面我們添加新用戶,和自定義權(quán)限

#添加新用戶
rabbitmqctl add_user 用戶名 密碼

#給指定用戶添加管理員權(quán)限
rabbitmqctl set_user_tags 用戶名 administrator

給用戶添加權(quán)限
rabbitmqctl set_permissions -p / 用戶名 ".*" ".*" ".*"

在web頁面輸入用戶名,和密碼

(三)python操作RabbitMQ

python中使用pika操作RabbitMQ

pip install pika
#皮卡皮卡,哈哈

(四)RabbitMQ簡單模式

上代碼

# coding=utf-8
### 生產(chǎn)者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')#用戶名和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))#連接服務(wù)器上的RabbitMQ服務(wù)

# 創(chuàng)建一個channel
channel = connection.channel()

# 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句
channel.queue_declare(queue='hello')

for i in range(0, 100):
    channel.basic_publish(exchange='',#當(dāng)前是一個簡單模式,所以這里設(shè)置為空字符串就可以了
                          routing_key='hello',# 指定消息要發(fā)送到哪個queue
                          body='{}'.format(i)# 指定要發(fā)送的消息
                          )
    time.sleep(1)
    
# 關(guān)閉連接
# connection.close()

PS:RabbitMQ中所有的消息都要先通過交換機(jī),空字符串表示使用默認(rèn)的交換機(jī)

# coding=utf-8
### 消費者

import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,生產(chǎn)者和消費者都做這一步的好處是
# 這樣生產(chǎn)者和消費者就沒有必要的先后啟動順序了
channel.queue_declare(queue='hello')


# 回調(diào)函數(shù)
def callback(ch, method, properties, body):
    print('消費者收到:{}'.format(body))

# channel: 包含channel的一切屬性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通過 properties 傳入的參數(shù)
# body: basic_publish發(fā)送的消息


channel.basic_consume(queue='hello',  # 接收指定queue的消息
                      auto_ack=True,  # 指定為True,表示消息接收到后自動給消息發(fā)送方回復(fù)確認(rèn),已收到消息
                      on_message_callback=callback  # 設(shè)置收到消息的回調(diào)函數(shù)
                      )

print('Waiting for messages. To exit press CTRL+C')

# 一直處于等待接收消息的狀態(tài),如果沒收到消息就一直處于阻塞狀態(tài),收到消息就調(diào)用上面的回調(diào)函數(shù)
channel.start_consuming()

對于上面的這種模式,有一下兩個不好的地方:

一個是在我們的消費者還沒開始消費完隊列里的消息,如果這時rabbitmq服務(wù)掛了,那么消息隊列里的消息將會全部丟失,解決方法是在聲明隊列時,聲明隊列為可持久化存儲隊列,并且在生產(chǎn)者在將消息插入到消息隊列時,設(shè)置消息持久化存儲,具體如下

# coding=utf-8
### 生產(chǎn)者
import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))

# 創(chuàng)建一個channel
channel = connection.channel()

# 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,官方推薦,每次使用時都可以加上這句
channel.queue_declare(queue='durable_queue',durable=True)
#PS:這里不同種隊列不允許名字相同

for i in range(0, 100):
    channel.basic_publish(exchange='',
                          routing_key='durable_queue',
                          body='{}'.format(i),
                          properties=pika.BasicProperties(delivery_mode=2)
                          )
# 關(guān)閉連接
# connection.close()

消費者與上面的消費者沒有什么不同,具體的就是消費聲明的隊列,也要是可持久化的隊列,還有就是,即使在生產(chǎn)者插入消息時,設(shè)置當(dāng)前消息持久化存儲(properties=pika.BasicProperties(delivery_mode=2)),并不能百分百保證消息真的被持久化,因為RabbitMQ掛掉的時候它可能還保存在緩存中,沒來得及同步到磁盤中

在生產(chǎn)者插入消息后,立刻停止rabbitmq,并重新啟動,其實我們在web管理頁面也可看到未被消費的信息,當(dāng)然在啟動消費者后也成功接收到了消息

上面說的第二點不好就是,如果在消費者獲取到隊列里的消息后,在回調(diào)函數(shù)的處理過程中,消費者突然出錯或程序崩潰等異常,那么就會造成這條消息并未被實際正常的處理掉。為了解決這個問題,我們只需在消費者basic_consume(auto_ack=False),并在回調(diào)函數(shù)中設(shè)置手動應(yīng)答即可ch.basic_ack(delivery_tag=method.delivery_tag),具體如下

# coding=utf-8
### 消費者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,生產(chǎn)者和消費者都做這一步的好處是
# 這樣生產(chǎn)者和消費者就沒有必要的先后啟動順序了
channel.queue_declare(queue='queue')


# 回調(diào)函數(shù)
def callback(ch, method, properties, body):
    time.sleep(5)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('消費者收到:{}'.format(body.decode('utf-8')))


# channel: 包含channel的一切屬性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通過 properties 傳入的參數(shù)
# body: basic_publish發(fā)送的消息


channel.basic_consume(queue='queue',  # 接收指定queue的消息
                      auto_ack=False,  # 指定為False,表示取消自動應(yīng)答,交由回調(diào)函數(shù)手動應(yīng)答
                      on_message_callback=callback  # 設(shè)置收到消息的回調(diào)函數(shù)
                      )

# 應(yīng)答的本質(zhì)是告訴消息隊列可以將這條消息銷毀了

print('Waiting for messages. To exit press CTRL+C')

# 一直處于等待接收消息的狀態(tài),如果沒收到消息就一直處于阻塞狀態(tài),收到消息就調(diào)用上面的回調(diào)函數(shù)
channel.start_consuming()

這里只需要配置消費者,生產(chǎn)者并不要修改

還有就是在上的使用方式在,都是一個生產(chǎn)者和一個消費者,還有一種情況就是,一個生產(chǎn)者和多個消費者,即多個消費者同時監(jiān)聽一個消息隊列,這時候隊列里的消息就是輪詢分發(fā)(即如果消息隊列里有100條信息,如果有2個消費者,那么每個就會收到50條信息),但是在某些情況下,不同的消費者處理任務(wù)的能力是不同的,這時還按照輪詢的方式分發(fā)消息并不是很合理,那么只需要再配合手動應(yīng)答的方式,設(shè)置消費者接收的消息沒有處理完,隊列就不要給我放送新的消息即可,具體配置方式如下:

# coding=utf-8
### 消費者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的queue不存在,則會創(chuàng)建一個queue,如果已經(jīng)存在 則不會做其他動作,生產(chǎn)者和消費者都做這一步的好處是
# 這樣生產(chǎn)者和消費者就沒有必要的先后啟動順序了
channel.queue_declare(queue='queue')


# 回調(diào)函數(shù)
def callback(ch, method, properties, body):
    time.sleep(0)#通過設(shè)置休眠時間來模擬不同消費者的處理時間
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('消費者收到:{}'.format(body.decode('utf-8')))


# prefetch_count表示接收的消息數(shù)量,當(dāng)我接收的消息沒有處理完(用basic_ack標(biāo)記消息已處理完畢)之前不會再接收新的消息了
channel.basic_qos(prefetch_count=1)  # 還有就是這個設(shè)置必須在basic_consume之上,否則不生效

channel.basic_consume(queue='queue',  # 接收指定queue的消息
                      auto_ack=False,  # 指定為False,表示取消自動應(yīng)答,交由回調(diào)函數(shù)手動應(yīng)答
                      on_message_callback=callback  # 設(shè)置收到消息的回調(diào)函數(shù)
                      )

# 應(yīng)答的本質(zhì)是告訴消息隊列可以將這條消息銷毀了

print('Waiting for messages. To exit press CTRL+C')

# 一直處于等待接收消息的狀態(tài),如果沒收到消息就一直處于阻塞狀態(tài),收到消息就調(diào)用上面的回調(diào)函數(shù)
channel.start_consuming()

PS:這種情況必須關(guān)閉自動應(yīng)答ack,改成手動應(yīng)答。使用basicQos(perfetch=1)限制每次只發(fā)送不超過1條消息到同一個消費者,消費者必須手動反饋告知隊列,才會發(fā)送下一個

(五)RabbitMQ發(fā)布訂閱模式

發(fā)布訂閱會將消息發(fā)送給所有的訂閱者,而消息隊列中的數(shù)據(jù)被消費一次便消失。所以,RabbitMQ實現(xiàn)發(fā)布和訂閱時,會為每一個訂閱者創(chuàng)建一個隊列,而發(fā)布者發(fā)布消息時,會將消息放置在所有相關(guān)隊列中

這個模式中會引入交換機(jī)的概念,其實在RabbitMQ中,所有的生產(chǎn)者都不會直接把消息發(fā)送到隊列中,甚至生產(chǎn)者都不知道消息在發(fā)出后有沒有發(fā)送到queue中,事實上,生產(chǎn)者只能將消息發(fā)送給交換機(jī),由交換機(jī)來決定發(fā)送到哪個隊列中。

交換機(jī)的一端用來從生產(chǎn)者中接收消息,另一端用來發(fā)送消息到隊列,交換機(jī)的類型規(guī)定了怎么處理接收到的消息,發(fā)布訂閱模式使用到的交換機(jī)類型為 fanout ,這種交換機(jī)類型非常簡單,就是將接收到的消息廣播給已知的(即綁定到此交換機(jī)的)所有消費者。

當(dāng)然,如果不想使用特定的交換機(jī),可以使用 exchange=’’ 表示使用默認(rèn)的交換機(jī),默認(rèn)的交換機(jī)會將消息發(fā)送到 routing_key 指定的queue,可以參考簡單模式。

上代碼:

#生產(chǎn)者
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 創(chuàng)建一個指定名稱的交換機(jī),并指定類型為fanout,用于將接收到的消息廣播到所有queue中
channel.exchange_declare(exchange='交換機(jī)', exchange_type='fanout')

# 將消息發(fā)送給指定的交換機(jī),在fanout類型中,routing_key=''表示不用發(fā)送到指定queue中,
# 而是將發(fā)送到綁定到此交換機(jī)的所有queue
channel.basic_publish(exchange='交換機(jī)', routing_key='', body='這是一條測試消息')
#消費者
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

channel.exchange_declare(exchange='交換機(jī)', exchange_type='fanout')

# 使用RabbitMQ給自己生成一個專有的queue
result = channel.queue_declare(queue='333')
# result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 這里如果設(shè)置exclusive=True參數(shù),那么該隊列就是一個只有隊列,在消費者結(jié)束后,該專有隊列也會自動清除,如果queue=''沒有設(shè)置名字的話,那么就會自動生成一個
# 不會重復(fù)的隊列名

# 將queue綁定到指定交換機(jī)
channel.queue_bind(exchange='交換機(jī)', queue=queue_name)

print(' [*] Waiting for  message.')


def callback(ch, method, properties, body):
    print("消費者收到:{}".format(body.decode('utf-8')))

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

該模式與簡單模式的還有一個區(qū)別就是,這里的消息隊列都是由消費者聲明的,所以如果是生產(chǎn)者先啟動,并將消息發(fā)給交換機(jī)的畫,這里的消息就會丟失,所以我們也可以在消費者端聲明隊列并綁定交換機(jī)(不能是專有隊列),所以仔細(xì)想想,其實這所謂的發(fā)布訂閱模式并沒有說什么了不起,它不過是讓交換機(jī)同時推送多條消息給綁定的隊列,我們當(dāng)然也可以在簡單模式的基礎(chǔ)上多進(jìn)行幾次basic_publish發(fā)送消息到指定的隊列。當(dāng)然我們這樣做的話,可能就沒辦法做到由交換機(jī)的同時發(fā)送了,效率可能也沒有一次basic_publish的高

(六)RabbitMQ RPC模式

下面實現(xiàn)由rpc遠(yuǎn)程調(diào)用加減運算

客戶端

import pika
import uuid
import json

class RPC(object):

    def __init__(self):
        self.call_id = None
        self.response = None
        user_info = pika.PlainCredentials('root', 'root')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
        self.channel = self.connection.channel()

        # 創(chuàng)建一個此客戶端專用的queue,用于接收服務(wù)端發(fā)過來的消息
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        # 判斷接收到的response是否屬于對應(yīng)request
        if self.call_id == props.correlation_id:
            self.response = json.loads(body.decode('utf-8')).get('result')

    def call(self, func, param):
        self.response = None
        self.call_id = str(uuid.uuid4())  # 為該消息指定uuid,類似于請求id
        self.channel.queue_declare(queue='rpc_queue')
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',  # 將消息發(fā)送到該queue
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,  # 從該queue中取消息
                correlation_id=self.call_id,  # 為此次消息指定uuid
            ),
            body=json.dumps(
                {
                    'func': func,
                    'param': {'a': param[0], 'b': param[1]}
                }
            )
        )
        
        self.connection.process_data_events(time_limit=3)# 與start_consuming()相似,可以設(shè)置超時參數(shù)
        return self.response

rpc = RPC()

print("發(fā)送消息到消費者,等待返回結(jié)果")

response = rpc.call(func='del', param=(1, 2))

print("收到來自消費者返回的結(jié)果:{}".format(response))

服務(wù)端

import pika
import json

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))

channel = connection.channel()

# 指定接收消息的queue
channel.queue_declare(queue='rpc_queue')

def add_number(a, b):
    return a + b

def del_num(a, b):
    return a - b

execute_map = {
    'add': add_number,
    'del': del_num
}

def on_request(ch, method, props, body):
    body = json.loads(body.decode('utf-8'))
    func = body.get('func')
    param = body.get('param')
    result = execute_map.get(func)(param.get('a'), param.get('b'))
    print('進(jìn)行{}運算,并將結(jié)果返回個消費者'.format(func))

    ch.basic_publish(exchange='',  # 使用默認(rèn)交換機(jī)
                     routing_key=props.reply_to,  # response發(fā)送到該queue
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),  # 使用correlation_id讓此response與請求消息對應(yīng)起來
                     body=json.dumps({'result': result}))

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
# 從rpc_queue中取消息,然后使用on_request進(jìn)行處理
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

(七)說點啥

對于rabbitmq的模式還有Routing模式和Topics模式等,這里就不復(fù)述了,其實pika對于RabbitMQ的使用還有很多細(xì)節(jié)和參數(shù)值得深究。這篇博客也就是簡單的記錄下我對pika操作raabbitmq過程和簡單的理解

參考鏈接:

https://www.cnblogs.com/guyuyun/p/14970592.html

https://blog.csdn.net/wohu1104/category_9023593.html

(八)結(jié)語

到此這篇關(guān)于python對RabbitMQ的簡單入門使用的文章就介紹到這了,更多相關(guān)python RabbitMQ使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • python logging模塊的使用總結(jié)

    python logging模塊的使用總結(jié)

    這篇文章主要介紹了python logging模塊使用總結(jié)以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。,需要的朋友可以參考下
    2019-07-07
  • 用sqlalchemy構(gòu)建Django連接池的實例

    用sqlalchemy構(gòu)建Django連接池的實例

    今天小編就為大家分享一篇用sqlalchemy構(gòu)建Django連接池的實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-08-08
  • python rolling regression. 使用 Python 實現(xiàn)滾動回歸操作

    python rolling regression. 使用 Python 實現(xiàn)滾動回歸操作

    這篇文章主要介紹了python rolling regression. 使用 Python 實現(xiàn)滾動回歸操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-06-06
  • Python Django 命名空間模式的實現(xiàn)

    Python Django 命名空間模式的實現(xiàn)

    這篇文章主要介紹了Python Django 命名空間模式的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-08-08
  • Effective Python bytes 與 str 的區(qū)別

    Effective Python bytes 與 str 的區(qū)別

    這篇文章主要介紹了Effective Python bytes 與 str 的區(qū)別,Python 有兩種類型可以表示字符序列,下面圍繞Python bytes 與 str 的相關(guān)資料展開內(nèi)容,需要的朋友可以參考一下
    2021-11-11
  • 詳解Python數(shù)據(jù)可視化編程 - 詞云生成并保存(jieba+WordCloud)

    詳解Python數(shù)據(jù)可視化編程 - 詞云生成并保存(jieba+WordCloud)

    這篇文章主要介紹了Python數(shù)據(jù)可視化編程 - 詞云生成并保存(jieba+WordCloud),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-03-03
  • 關(guān)于微信小程序爬蟲token自動更新問題

    關(guān)于微信小程序爬蟲token自動更新問題

    本文主要介紹了關(guān)于微信小程序爬蟲關(guān)于token自動更新問題,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-09-09
  • Python-apply(lambda x: )的使用及說明

    Python-apply(lambda x: )的使用及說明

    這篇文章主要介紹了Python-apply(lambda x: )的使用及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 簡要講解Python編程中線程的創(chuàng)建與鎖的使用

    簡要講解Python編程中線程的創(chuàng)建與鎖的使用

    這篇文章主要介紹了簡要講解Python編程中線程的創(chuàng)建與鎖的使用,Python中雖然有GIL的存在,但依然是能夠創(chuàng)建多個線程來交替使用的,需要的朋友可以參考下
    2016-02-02
  • Pycharm及python安裝詳細(xì)步驟及PyCharm配置整理(推薦)

    Pycharm及python安裝詳細(xì)步驟及PyCharm配置整理(推薦)

    這篇文章主要介紹了Pycharm及python安裝詳細(xì)步驟以及PyCharm配置整理,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-04-04

最新評論