工作队列

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。

生产者 (task.php)

<?php 
	require_once __DIR__ . '/vendor/autoload.php';
	use PhpAmqpLib\Connection\AMQPStreamConnection;
	use PhpAmqpLib\Message\AMQPMessage;

	require_once __DIR__ . '/config.php';
	$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
	$channel = $connection->channel();

	$channel->queue_declare('hello', false, false, false, false);//声明队列

	$data = implode(' ', array_slice($argv, 1));
	if(empty($data)) $data = "Hello World!";

	$msg = new AMQPMessage($data);
	$channel->basic_publish($msg, '', 'hello');//发布消息到队列
	echo " [x] Sent ", $data, "\n";
	$channel->close(); //通道关闭
	$connection->close();//链接关闭

消费者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/config.php';
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password']);
$channel = $connection->channel();

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
	echo " [x] Received ", $msg->body, "\n";
	sleep(substr_count($msg->body, '.'));//模拟任务延迟,通过.控制延迟的秒数
	echo " [x] Done", "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
		$channel->wait();
}

$channel->close();
$connection->close();

循环调度

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

我们开启多个console,一个生产者,2个work消费者,执行php task_consumper.php,查看下效果。

默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

生产者执行4次脚本:

vilay@zhujianpingdeMacBook-Pro www % docker exec -i vilay_php bash -c "cd /var/www/mq;php task.php 'A very hard task which takes two seconds1..'"
 [x] Sent A very hard task which takes two seconds1..
vilay@zhujianpingdeMacBook-Pro www % docker exec -i vilay_php bash -c "cd /var/www/mq;php task.php 'A very hard task which takes two seconds2..'"
 [x] Sent A very hard task which takes two seconds2..
vilay@zhujianpingdeMacBook-Pro www % docker exec -i vilay_php bash -c "cd /var/www/mq;php task.php 'A very hard task which takes two seconds3..'"
 [x] Sent A very hard task which takes two seconds3..
vilay@zhujianpingdeMacBook-Pro www % docker exec -i vilay_php bash -c "cd /var/www/mq;php task.php 'A very hard task which takes two seconds4..'"
 [x] Sent A very hard task which takes two seconds4..

消费者1结果:

[*] Waiting for messages. To exit press CTRL+C
 [x] Received Hello vilay!
 [x] Done
 [x] Received Hello vilay!
 [x] Done
 [x] Received A very hard task which takes two seconds..
 [x] Done
 [x] Received A very hard task which takes two seconds1..
 [x] Done
 [x] Received A very hard task which takes two seconds3..
 [x] Done

消费者2结果:

[*] Waiting for messages. To exit press CTRL+C
 [x] Received A very hard task which takes two seconds2..
 [x] Done
 [x] Received A very hard task which takes two seconds4..
 [x] Done

消息确认

当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

代码修改

$callback = function($msg){
	echo " [x] Received ", $msg->body, "\n";
	sleep(substr_count($msg->body, '.'));
	echo " [x] Done", "\n";
	$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);  //第四个参数$no_ack

basic_consume 参数

$queue = '',
	 $consumer_tag = '',
	 $no_local = false,
	 $no_ack = false,
	 $exclusive = false,
	 $nowait = false,
	 $callback = null,
	 $ticket = null,
	 $arguments = array()

运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

忘记确认

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

消息持久化

如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

首先,为了不让队列消失,需要把队列声明为持久化(durable):

$channel->queue_declare('hello', false, true, false, false);//第三个参数设置为true,默认为false

尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue。

$channel->queue_declare('task_queue', false, true, false, false);

这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。

这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。另外,我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为2。

$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );

消息持久化注意事项:

将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。

公平调度

你应该已经发现,它仍旧没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

$channel->basic_qos(null, 1, null);

关于队列大小

如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

参考博客:

RabbitMQ+PHP 教程二(Work Queues)

工作队列