PHP實現(xiàn)基于Redis的MessageQueue隊列封裝操作示例
本文實例講述了PHP實現(xiàn)基于Redis的MessageQueue隊列封裝操作。分享給大家供大家參考,具體如下:
Redis的鏈表List可以用來做鏈表,高并發(fā)的特性非常適合做分布式的并行消息傳遞。
項目地址:https://github.com/huyanping/Zebra-PHP-Framework
左進右出
$redis->lPush($key, $value); $redis->rPop($key);
以下程序已在生產(chǎn)環(huán)境中正式使用。
基于Redis的PHP消息隊列封裝
<?php
/**
* Created by PhpStorm.
* User: huyanping
* Date: 14-8-19
* Time: 下午12:10
*
* 基于Redis的消息隊列封裝
*/
namespace Zebra\MessageQueue;
class RedisMessageQueue implements IMessageQueue
{
protected $redis_server;
protected $server;
protected $port;
/**
* @var 消息隊列標志
*/
protected $key;
/**
* 構(gòu)造隊列,創(chuàng)建redis鏈接
* @param $server_config
* @param $key
* @param bool $p_connect
*/
public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false)
{
if (empty($key))
throw new \Exception('message queue key can not be empty');
$this->server = $server_config['IP'];
$this->port = $server_config['PORT'];
$this->key = $key;
$this->check_environment();
if ($p_connect) {
$this->pconnect();
} else {
$this->connect();
}
}
/**
* 析構(gòu)函數(shù),關(guān)閉redis鏈接,使用長連接時,最好主動調(diào)用關(guān)閉
*/
public function __destruct()
{
$this->close();
}
/**
* 短連接
*/
private function connect()
{
$this->redis_server = new \Redis();
$this->redis_server->connect($this->server, $this->port);
}
/**
* 長連接
*/
public function pconnect()
{
$this->redis_server = new \Redis();
$this->redis_server->pconnect($this->server, $this->port);
}
/**
* 關(guān)閉鏈接
*/
public function close()
{
$this->redis_server->close();
}
/**
* 向隊列插入一條信息
* @param $message
* @return mixed
*/
public function put($message)
{
return $this->redis_server->lPush($this->key, $message);
}
/**
* 向隊列中插入一串信息
* @param $message
* @return mixed
*/
public function puts(){
$params = func_get_args();
$message_array = array_merge(array($this->key), $params);
return call_user_func_array(array($this->redis_server, 'lPush'), $message_array);
}
/**
* 從隊列頂部獲取一條記錄
* @return mixed
*/
public function get()
{
return $this->redis_server->lPop($this->key);
}
/**
* 選擇數(shù)據(jù)庫,可以用于區(qū)分不同隊列
* @param $database
*/
public function select($database)
{
$this->redis_server->select($database);
}
/**
* 獲得隊列狀態(tài),即目前隊列中的消息數(shù)量
* @return mixed
*/
public function size()
{
return $this->redis_server->lSize($this->key);
}
/**
* 獲取某一位置的值,不會刪除該位置的值
* @param $pos
* @return mixed
*/
public function view($pos)
{
return $this->redis_server->lGet($this->key, $pos);
}
/**
* 檢查Redis擴展
* @throws Exception
*/
protected function check_environment()
{
if (!\extension_loaded('redis')) {
throw new \Exception('Redis extension not loaded');
}
}
}
如果需要一次寫入多個隊列,可以使用如下調(diào)用方式:
<?php $redis = new RedisMessageQueue(); $redis->puts(1, 2, 3, 4); $redis->puts(5, 6, 7, 8, 9);
模仿HTTPSQS輸出結(jié)果的封裝如下,提供了寫入位置和讀取位置記錄的功能:
<?php
/**
* Created by PhpStorm.
* User: huyanping
* Date: 14-9-5
* Time: 下午2:16
*
* 附加了隊列狀態(tài)信息的RedisMessageQueue
*/
namespace Zebra\MessageQueue;
class RedisMessageQueueStatus extends RedisMessageQueue {
protected $record_status;
protected $put_position;
protected $get_position;
public function __construct(
$server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'),
$key = 'redis_message_queue',
$p_connect = false,
$record_status=true
){
parent::__construct($server_config, $key, $p_connect);
$this->record_status = $record_status;
$this->put_position = $this->key . '_put_position';
$this->get_position = $this->key . '_get_position';
}
public function get(){
if($queue = parent::get()){
$incr_result = $this->redis_server->incr($this->get_position);
if(!$incr_result) throw new \Exception('can not mark get position,please check the redis server');
return $queue;
}else{
return false;
}
}
public function put($message){
if(parent::put($message)){
$incr_result = $this->redis_server->incr($this->put_position);
if(!$incr_result) throw new \Exception('can not mark put position,please check the redis server');
return true;
}else{
return false;
}
}
public function puts_status(){
$message_array = func_get_args();
$result = call_user_func_array(array($this, 'puts'), $message_array);
if($result){
$this->redis_server->incrBy($this->put_position, count($message_array));
return true;
}
return false;
}
public function size(){
return $this->redis_server->lSize($this->key);
}
public function status(){
$status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0;
$status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0;
$status['unread_queue'] = $this->size();
$status['queue_name'] = $this->key;
$status['server'] = $this->server;
$status['port'] = $this->port;
return $status;
}
public function status_normal(){
$status = $this->status();
$message = 'Redis Message Queue' . PHP_EOL;
$message .= '-------------------' . PHP_EOL;
$message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL;
$message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL;
$message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL;
$message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL;
return $message;
}
public function status_json(){
return \json_encode($this->status());
}
}
更多關(guān)于PHP相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《php+redis數(shù)據(jù)庫程序設(shè)計技巧總結(jié)》、《php面向?qū)ο蟪绦蛟O(shè)計入門教程》、《PHP基本語法入門教程》、《PHP數(shù)組(Array)操作技巧大全》、《php字符串(string)用法總結(jié)》、《php+mysql數(shù)據(jù)庫操作入門教程》及《php常見數(shù)據(jù)庫操作技巧匯總》
希望本文所述對大家PHP程序設(shè)計有所幫助。
相關(guān)文章
微信小程序使用wx.request請求服務(wù)器json數(shù)據(jù)并渲染到頁面操作示例
這篇文章主要介紹了微信小程序使用wx.request請求服務(wù)器json數(shù)據(jù)并渲染到頁面操作,結(jié)合實例形式分析了微信小程序使用wx.request發(fā)送網(wǎng)絡(luò)請求及返回結(jié)果渲染到wxml界面相關(guān)操作技巧,需要的朋友可以參考下2019-03-03
JavaScript網(wǎng)絡(luò)請求工具庫axios使用實例探索
這篇文章主要為大家介紹了JavaScript網(wǎng)絡(luò)請求工具庫axios使用實例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01
JavaScript位移運算符(無符號) >>> 三個大于號 的使用方法詳解
這篇文章主要介紹了JavaScript位移運算符(無符號) >>> 三個大于號 的使用方法詳解的相關(guān)資料,需要的朋友可以參考下2016-03-03

