上一篇文章《PHP使用RabbitMQ入门1——hello,world》通过最简单的示例跑起来了,可以对php如何使用RabbitMQ有了一个简单的了解。本篇文章主要讲解创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。
有时候我们需要处理一些耗时任务,例如裁剪图片,转换pdf,发送邮件,下载文件等,都需要较长的时间。例如用户在点击发送验证码的按钮时,完全可以将要发送的邮件内容写入到RabbitMQ的队列中,然后无需等待发送邮件,就可以直接返回,提示用户去邮箱查看。然后后台worker程序再从RabbitMQ的队列中取出邮件内容执行发送操作。这样一来,就将原本需要同步等待的操作变为了异步操作,节省了时间,提高了用户体验。
下面就以发送邮件为例来讲解RabbitMQ的工作队列。
创建生产者:send.php
创建send.php文件用于将邮件内容推送至RabbitMQ的工作队列。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('x.x.x.x', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('sendemail', false, false, false, false);
$code = mt_rand(1000,9999);
$emailAddress = implode(' ', array_slice($argv, 1));
$msg = new AMQPMessage('你的验证码是:'.$code.'=>'.$emailAddress);
$channel->basic_publish($msg, '', 'sendemail');
echo " send email to {$emailAddress} \n";
$channel->close();
$connection->close();
执行程序:
php send.php 884358@qq.com
运行结果:
在rabbitmq服务器上运行命令:
rabbitmqctl list_queues
可以看到name为sendemail的队列的message字段的值为1,代表sendemail队列有一条消息还未消费。
创建消费者:worker.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('x.x.x.x', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('sendemail', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [√] Received ", $msg->body, "\n";
$arr = explode('=>',$msg->body);
echo " [√] Send email to ", $arr[1], "...\n";
sleep(2);
echo " [√] Done\n";
};
$channel->basic_consume('sendemail', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
执行程序:
php worker.php
消费者运行后,rabbitmq会将队列中未消费的消息推送给消费者,并将消息删除。
再到rabbitmq服务器上运行命令:
rabbitmqctl list_queues
可以看到未消费的消息变为0了。
循环调度
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了。
分别打开两个终端,分别输入以下命令:
php worker.php
然后再打开一个终端,运行send.php发送两次任务
php send.php 1@qq.com
php send.php 2@qq.com
执行两次send.php后,可以看到两个worker.php都获取到了任务(1个worker执行一个)
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量的消息。这种发送消息得方式叫做——轮询(round-robin)。
消息确认
当处理一个耗时的任务时,如果其中某个worker在处理任务时,挂掉了,那么消息也就丢失了。例如发送验证码时,worker在发送过程中,挂掉了,验证码没有发送成功,那么用户就不会收到验证码。而我们希望的是当其中一个worker挂掉后,RabbitMQ能把这个消息转发到另一个worker,以保证验证码可以发送成功。
为了防止消息丢失,RabbitMQ提供了消息响应机制(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果worker挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他worker。这样,就算worker偶尔的挂掉,也不会丢失消息。
设置basic_consume的第四个参数为false就可以打开消息确认了:
$channel->basic_consume('sendemail', '', false, false, false, false, $callback);
同时,在$callback匿名函数中增加一行响应:
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
完整的worker.php代码如下(把sleep改成10秒,便于我们有足够的时间来kill进程):
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('x.x.x.x', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('sendemail', false, false, false, false);
echo 'Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo "Received ", $msg->body, "\n";
$arr = explode('=>',$msg->body);
echo "Send email to ", $arr[1], "...\n";
sleep(10);
echo "Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('sendemail', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
运行上面的代码,即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。
【注意】一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。
为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
有些时候RabbitMQ服务器突然挂掉了,或者服务器突然重启后,原本保存在队列中的消息就会丢失,为了让服务器重启后消息的队列依旧可以使用,就需要将消息持久化保存到硬盘上。
持久化需要设置两点:
一是队列持久化。当RabbitMQ重启后,队列不会消失。
二是消息持久化。当RabbitMQ重启后,队列中的消息不会丢失。
必须将队列和消息都设置为持久化,才能保证消息不会丢失。
队列持久化
首先,为了不让队列消失,需要把队列声明为持久化(durable)。通过设置queue_declare的第三参数为true:
$channel->queue_declare('sendemail', false, true, false, false);
注意:设置前,需要重启下RabbitMq服务,或者重新取一个队列名。
需要注意的是,queue_declare在send.php和worker.php中都需要修改才能生效。因为RabbitMq不允许使用不同的参数重新定义一个队列,它会返回一个错误。
通过以上设置,就可以确保在RabbitMq重启之后queue_declare队列不会丢失。
消息持久化
消息持久化需要设置send.php中的delivery_mode = 2
$msg = new AMQPMessage('Your verification code is:'.$code.'=>'.$emailAddress,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
通过以上设置,RabbitMq就会把队列以及消息持久化的保存到硬盘上了,这样在重启后,数据也不会丢失。下面测试下:
首先重启下RabbitMq,让之前设置的队列清空:
systemctl restart rabbitmq-server.service
然后执行send.php发送消息到RabbitMq:
php send.php 111@qq.com
在RabbitMq服务器执行命令查看未发送的消息数:
rabbitmqctl list_queues
可以看到sendemail队列有一个消息还未发送。现在重启rabbitmq服务器:
systemctl restart rabbitmq-server.service
重启后再次执行命令查看未发送的消息数:
rabbitmqctl list_queues
可以看到,未消费的消息依然存在。
公平调度
RabbitMq目前只是按轮询的方式,把消息依次发送给运行的worker,如果有的worker比较繁忙,rabbimq依然会把消息派送给他。下面举例说明,新建两个worker,分别为worker1.php和worker10.php,其中把worker1.php的sleep函数的参数设置未1,worker10.php设置为10,通过该方式来简单的演示worker10.php需要10秒才能处理完消息,而worker1只需要1秒。通过send.php向RabbitMQ发送消息,然后观察RabbitMQ的消息调度方式。
worker1.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('x.x.x.x', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('sendemail', false, true, false, false);
echo 'Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo "Received ", $msg->body, "\n";
$arr = explode('=>',$msg->body);
echo "Send email to ", $arr[1], "...\n";
sleep(1);
echo "Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('sendemail', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
worker10.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('x.x.x.x', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('sendemail', false, true, false, false);
echo 'Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo "Received ", $msg->body, "\n";
$arr = explode('=>',$msg->body);
echo "Send email to ", $arr[1], "...\n";
sleep(10);
echo "Done\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('sendemail', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
分别打开两个终端,分别运行worker1.php和worker10.php。然后打开第三个终端,运行send.php发送消息。
从实验结果可以看出,虽然worker10.php更繁忙,worker1.php更闲,但rabbitmq依然会按顺序依次派发,并不会因为worker10.php更繁忙而少派发消息。
如果我们想根据worker的繁忙程度来调度消息,可以通过设置worker.php中channel对象的basic_qos方法的第二个参数为1,这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
$channel->basic_qos(null, 1, null);
打开worker1.php和worker10.php,将以上代码加入到basic_consume的上方。再次进行以上实验:
可以看到,一共9条消息,给worker1派发了8条,只给worker10派发了1条。
参考:
工作队列