一般的隊(duì)列系統(tǒng),是指linux中的crontab定時(shí)啟動(dòng)腳本來(lái)處理任務(wù):
首先下載一個(gè)rabbitmq的客戶(hù)端,他相當(dāng)于一個(gè)容器,裝排隊(duì)數(shù)據(jù)的容器
http://www.rabbitmq.com/download.html
默認(rèn)的端口是55672 訪問(wèn)地址http://127.0.0.1:55672/
默認(rèn)帳號(hào)密碼 guest guest
你可以看到rabbitmq 的管理界面
mq的任務(wù)是一個(gè)不浪費(fèi)資源,的一個(gè)隊(duì)列系統(tǒng)!
php使用需要下載一個(gè)amqp擴(kuò)展
或者直接點(diǎn)擊下面的地址找到適合自己的版本,下載
http://pecl.php.net/package/amqp/1.2.0/windows
rabbitmq.1.dll 放在C盤(pán)windows下
php_amqp.dll 放入php擴(kuò)展中
開(kāi)啟php_amqp.dll的引用
重啟服務(wù)器
用phpinfo();
查看是否引用成功,如果出現(xiàn)以下的amqp擴(kuò)展,那就說(shuō)明成功了
首先是rabbitmq的生產(chǎn)者:
創(chuàng)建第一個(gè)index文件:然后去mq中查看,如果添加一個(gè)test001的隊(duì)列名信息,就說(shuō)明已經(jīng)添加進(jìn)去了,xx22的信息已經(jīng)在mq中存儲(chǔ)!
接下來(lái)就需要跑數(shù)據(jù)了。
createQueue(array('xxx','2222'),'test001');
echo "ok";
function createQueue($message,$queueName,$exchangeName = '', $queueKey = '')
{
$queueName = self::getQueueName($queueName);
$conn_args = array('host' =>'localhost', 'port'=> '5672',
'login' =>'guest', //mq帳號(hào)
'password'=> '', //mq密碼
'vhost' => '/');
$conn = new AMQPConnection($conn_args);
$conn->connect();
$channel = new AMQPChannel($conn);
if (!$exchangeName) {
$exchangeName = $queueName;
}
$queueName = $queueName;
if (!$queueKey) {
$queueKey = $queueName;
}
$ex = new AMQPExchange($channel);
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_TOPIC);
$ex->setFlags(AMQP_DURABLE); //exchange持久化
$ex->declareExchange();
$q = new AMQPQueue($channel);
$q->setName($queueName);
$q->setFlags(AMQP_DURABLE); //queue持久化
$q->declareQueue();
$q->bind($exchangeName, $queueKey);
$channel->startTransaction();
/**
* 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是設(shè)置消息的優(yōu)先級(jí),測(cè)試中發(fā)現(xiàn)并未起作用。
* 消息還有其他屬性,請(qǐng)參考http://www.php.net/manual/zh/amqpexchange.publish.php
*/
$result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));
$channel->commitTransaction();
$conn->disconnect();
}
有了生產(chǎn)者,那就有消費(fèi)者。
腳本如果沒(méi)有其他的修改或問(wèn)題,基本上都是常年啟動(dòng)的:
消費(fèi)者基類(lèi):
class WorkerCommand{
function qInit($q_name,$e_name='',$k_route=''){
$q_name = Utils::getQueueName($q_name);
$conn_args = array(
'host' => '127.0.0.1', //mq的配置
'port' => '5672',
'login' => 'guest',
'password' => 'huoxingxing',
'vhost' => '/'
);
//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//創(chuàng)建交換機(jī)
$ex = new AMQPExchange($channel);
if (!$e_name) {
$e_name = $q_name;
}
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類(lèi)型
$ex->setFlags(AMQP_DURABLE); //持久化
// echo "Exchange Status:" . $ex->declareExchange() . "\n";
//創(chuàng)建隊(duì)列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
// echo "Message Total:" . $q->declareExchange() . "\n";
if (!$k_route) {
$k_route = $q_name;
}
//綁定交換機(jī)與隊(duì)列,并指定路由鍵
// echo 'Queue Bind: ' . $q->declareQueue($e_name, $k_route) . "\n";
//阻塞模式接收消息
echo "Message:\n";
while (True) {
$q->consume(array($this,'processMessage'));
//$q->consume('processMessage', AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答
}
$conn->disconnect();
}
}
消費(fèi)者:
class WorkerWareSyncBackUpCommand extends WorkerCommand {
function actionIndex()
{
$this->qInit('SyncWareBackup');
}
function processMessage($envelope, $queue)
{
$msg = json_decode($envelope->getBody());
Utils::doBackUp('back',$msg,'');
$queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答
}
}