PHP使用enqueue/amqp-lib實(shí)現(xiàn)rabbitmq任務(wù)處理
一:拓展安裝
composer require enqueue/amqp-lib
文檔地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介紹
1:連接rabbitmq
$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88',//host 'port' => '5672',//端口 'vhost' => '/',//虛擬主機(jī) 'user' => 'admin',//賬號(hào) 'pass' => 'admin',//密碼 ]); $context = $factory->createContext();
2:聲明主題
//聲明并創(chuàng)建主題 $exchangeName = 'exchange'; $fooTopic = $context->createTopic($exchangeName); $fooTopic->setType(AmqpTopic::TYPE_FANOUT); $context->declareTopic($fooTopic); //刪除主題 $context->deleteTopic($fooTopic);
3:聲明隊(duì)列
//聲明并創(chuàng)建隊(duì)列 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $context->declareQueue($fooQueue); //刪除隊(duì)列 $context->deleteQueue($fooQueue);
4:將隊(duì)列綁定到主題
$context->bind(new AmqpBind($fooTopic, $fooQueue));
5:發(fā)送消息
//向隊(duì)列發(fā)送消息 $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message); //向隊(duì)列發(fā)送優(yōu)先消息 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue(queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); //設(shè)置隊(duì)列的最大優(yōu)先級(jí) $fooQueue->setArguments(['x-max-priority' => 10]); $context->declareQueue($fooQueue); $message = $context->createMessage('Hello world!'); $context->createProducer() ->setPriority(5) //設(shè)置優(yōu)先級(jí),優(yōu)先級(jí)越高,消息越快到達(dá)消費(fèi)者 ->send($fooQueue, $message); //向隊(duì)列發(fā)送延時(shí)消息 $message = $context->createMessage('Hello world!'); $context->createProducer() ->setDelayStrategy(new RabbitMqDlxDelayStrategy()) ->setDeliveryDelay(5000) //消息延時(shí)5秒 ->send($fooQueue, $message);
6:消費(fèi)消息【接收消息】
//消費(fèi)消息 $consumer = $context->createConsumer($fooQueue); $message = $consumer->receive(); // process a message //業(yè)務(wù)代碼 $consumer->acknowledge($message);//ack應(yīng)答,通知rabbitmq成功,刪除對(duì)應(yīng)任務(wù) // $consumer->reject($message);ack應(yīng)答,通知rabbitmq失敗,不刪除對(duì)應(yīng)任務(wù) //訂閱消費(fèi)者 $fooConsumer = $context->createConsumer($fooQueue); $subscriptionConsumer = $context->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message //業(yè)務(wù)代碼 $consumer->acknowledge($message);//ack應(yīng)答,通知rabbitmq成功,刪除對(duì)應(yīng)任務(wù) // $consumer->reject($message);ack應(yīng)答,通知rabbitmq失敗,不刪除對(duì)應(yīng)任務(wù) return true; }); $subscriptionConsumer->consume(); //清除隊(duì)列消息 $queueName = 'rabbitmq'; $queue = $context->createQueue($queueName); $context->purgeQueue($queue);
三:簡(jiǎn)單實(shí)現(xiàn)
1:發(fā)送消息
//連接rabbitmq $factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false, ]); $context = $factory->createContext(); //聲明主題 $exchangeName = 'exchange'; $fooTopic = $context->createTopic($exchangeName); $fooTopic->setType(AmqpTopic::TYPE_FANOUT); $context->declareTopic($fooTopic); //聲明隊(duì)列 $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE); $context->declareQueue($fooQueue); //將隊(duì)列綁定到主題 $context->bind(new AmqpBind($fooTopic, $fooQueue)); //發(fā)送消息到隊(duì)列 $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message);
2:消費(fèi)消息
$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false, ]); $context = $factory->createContext(); $queueName = 'rabbitmq'; $fooQueue = $context->createQueue($queueName); $fooConsumer = $context->createConsumer($fooQueue); $subscriptionConsumer = $context->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message //業(yè)務(wù)代碼 $consumer->acknowledge($message);//ack應(yīng)答,通知rabbitmq成功,刪除對(duì)應(yīng)任務(wù) // $consumer->reject($message);ack應(yīng)答,通知rabbitmq失敗,不刪除對(duì)應(yīng)任務(wù) return true; }); $subscriptionConsumer->consume();
到此這篇關(guān)于PHP使用enqueue/amqp-lib實(shí)現(xiàn)rabbitmq任務(wù)處理的文章就介紹到這了,更多相關(guān)PHP rabbitmq任務(wù)處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
兩種php去除二維數(shù)組的重復(fù)項(xiàng)方法
這篇文章主要介紹了兩種php去除二維數(shù)組的重復(fù)項(xiàng)方法,大家可以進(jìn)行比較看哪一種更適合自己,需要的朋友可以參考下2015-11-11php抽象類和接口知識(shí)點(diǎn)整理總結(jié)
這篇文章主要介紹了php抽象類和接口知識(shí)點(diǎn),整理總結(jié)了php抽象類與接口的概念、原理、操作技巧及相關(guān)使用注意事項(xiàng),需要的朋友可以參考下2019-08-08PHP開(kāi)發(fā)環(huán)境配置(MySQL數(shù)據(jù)庫(kù)安裝圖文教程)
下載完軟件后開(kāi)始PHP開(kāi)發(fā)環(huán)境的配置。注意是開(kāi)發(fā)環(huán)境,不是服務(wù)器環(huán)境。2010-04-04