PHP使用enqueue/amqp-lib實(shí)現(xiàn)rabbitmq任務(wù)處理
更新時(shí)間:2024年03月11日 11:58:28 作者:huaweichenai
這篇文章主要為大家詳細(xì)介紹了PHP如何使用enqueue/amqp-lib實(shí)現(xiàn)rabbitmq任務(wù)處理,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以學(xué)習(xí)一下
一:拓展安裝
composer require enqueue/amqp-lib
文檔地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介紹
1:連接rabbitmq
$factory = new AmqpConnectionFactory([
'host' => '192.168.6.88',//host
'port' => '5672',//端口
'vhost' => '/',//虛擬主機(jī)
'user' => 'admin',//賬號
'pass' => 'admin',//密碼
]);
$context = $factory->createContext();
2:聲明主題
//聲明并創(chuàng)建主題 $exchangeName = 'exchange'; $fooTopic = $context->createTopic($exchangeName); $fooTopic->setType(AmqpTopic::TYPE_FANOUT); $context->declareTopic($fooTopic); //刪除主題 $context->deleteTopic($fooTopic);
3:聲明隊(duì)列
//聲明并創(chuàng)建隊(duì)列 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $context->declareQueue($fooQueue); //刪除隊(duì)列 $context->deleteQueue($fooQueue);
4:將隊(duì)列綁定到主題
$context->bind(new AmqpBind($fooTopic, $fooQueue));
5:發(fā)送消息
//向隊(duì)列發(fā)送消息
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
//向隊(duì)列發(fā)送優(yōu)先消息
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue(queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
//設(shè)置隊(duì)列的最大優(yōu)先級
$fooQueue->setArguments(['x-max-priority' => 10]);
$context->declareQueue($fooQueue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setPriority(5) //設(shè)置優(yōu)先級,優(yōu)先級越高,消息越快到達(dá)消費(fèi)者
->send($fooQueue, $message);
//向隊(duì)列發(fā)送延時(shí)消息
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDelayStrategy(new RabbitMqDlxDelayStrategy())
->setDeliveryDelay(5000) //消息延時(shí)5秒
->send($fooQueue, $message);
6:消費(fèi)消息【接收消息】
//消費(fèi)消息
$consumer = $context->createConsumer($fooQueue);
$message = $consumer->receive();
// process a message
//業(yè)務(wù)代碼
$consumer->acknowledge($message);//ack應(yīng)答,通知rabbitmq成功,刪除對應(yīng)任務(wù)
// $consumer->reject($message);ack應(yīng)答,通知rabbitmq失敗,不刪除對應(yīng)任務(wù)
//訂閱消費(fèi)者
$fooConsumer = $context->createConsumer($fooQueue);
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
// process message
//業(yè)務(wù)代碼
$consumer->acknowledge($message);//ack應(yīng)答,通知rabbitmq成功,刪除對應(yīng)任務(wù)
// $consumer->reject($message);ack應(yīng)答,通知rabbitmq失敗,不刪除對應(yīng)任務(wù)
return true;
});
$subscriptionConsumer->consume();
//清除隊(duì)列消息
$queueName = 'rabbitmq';
$queue = $context->createQueue($queueName);
$context->purgeQueue($queue);
三:簡單實(shí)現(xiàn)
1:發(fā)送消息
//連接rabbitmq
$factory = new AmqpConnectionFactory([
'host' => '192.168.6.88',
'port' => '5672',
'vhost' => '/',
'user' => 'admin',
'pass' => 'admin',
'persisted' => false,
]);
$context = $factory->createContext();
//聲明主題
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
//聲明隊(duì)列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
//將隊(duì)列綁定到主題
$context->bind(new AmqpBind($fooTopic, $fooQueue));
//發(fā)送消息到隊(duì)列
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
2:消費(fèi)消息
$factory = new AmqpConnectionFactory([
'host' => '192.168.6.88',
'port' => '5672',
'vhost' => '/',
'user' => 'admin',
'pass' => 'admin',
'persisted' => false,
]);
$context = $factory->createContext();
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooConsumer = $context->createConsumer($fooQueue);
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
// process message
//業(yè)務(wù)代碼
$consumer->acknowledge($message);//ack應(yīng)答,通知rabbitmq成功,刪除對應(yīng)任務(wù)
// $consumer->reject($message);ack應(yīng)答,通知rabbitmq失敗,不刪除對應(yīng)任務(wù)
return true;
});
$subscriptionConsumer->consume();到此這篇關(guān)于PHP使用enqueue/amqp-lib實(shí)現(xiàn)rabbitmq任務(wù)處理的文章就介紹到這了,更多相關(guān)PHP rabbitmq任務(wù)處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
兩種php去除二維數(shù)組的重復(fù)項(xiàng)方法
這篇文章主要介紹了兩種php去除二維數(shù)組的重復(fù)項(xiàng)方法,大家可以進(jìn)行比較看哪一種更適合自己,需要的朋友可以參考下2015-11-11
PHP開發(fā)環(huán)境配置(MySQL數(shù)據(jù)庫安裝圖文教程)
下載完軟件后開始PHP開發(fā)環(huán)境的配置。注意是開發(fā)環(huán)境,不是服務(wù)器環(huán)境。2010-04-04

