在上一篇文章中,简单实现了消息的发送与接受(参考:RabbitMQ的PHP教程之入门 (一)),但在实际的应用中,很少会使用,本人也不建议使用,因为这种隐藏式的申明,很容易给人造成困扰。但它展示了RabbitMQ的工作原理。
本人将介绍RabbitMQ work queues (工作队列),首先举例说明下工作队列的使用场景。有个高并发的某某活动,因为并发量很高,Mysql的写入的性能不能满足吞吐(其他类似于mongo的库除外),此时,可用借助RabbitMQ的工作队列来处理,将直接写入mysql的数据,以消息的形式先发送到RabbitMq的工作队列中,消费端从队列中消费数据入库,这样:1)客户端不需要考虑mysql的瓶颈,队列做为一个缓冲区。2)可用加入多个消费者,加速消费工作队列,避免造成工作队列消息积压。
我们再hello world的发送、接收的代码中,我做了几点调整。
- 指定exchange,并显式申明它的类型。
- 将exchange持久化,这样那怕Rabbit重启,exchange也不会消失。
- 将queue持久化,这样那怕Rabbit重启,queue也不会消失(包括queue中的消息)
- 在exchange publish消息时,指定 route key,同时在queue中绑定rout key,这样exchange在转发消息时,能够将消息转发到与route key匹配的的队列中。
我们看具体的代码
send.php
$conn = new \AMQPConnection($config); $conn->connect(); $channel = new \AMQPChannel($conn); $channel->qos(0,0); $queue = new \AMQPQueue($channel); $queue->setName('queue.activity'); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind('exchange.activity','route.activity'); $exchange = new \AMQPExchange($channel); $exchange->setName('exchange.activity'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); for($i=0; $i<6; $i++) { $result = $exchange->publish("message [$i]",'route.activity'); var_dump($result); }
receive.php
$conn = new \AMQPConnection($config); $conn->connect(); $channel = new \AMQPChannel($conn); $channel->qos(0,0); $queue = new \AMQPQueue($channel); $queue->setName("queue.activity"); $queue->setFlags(AMQP_PASSIVE); $queue->declareQueue(); $queue->bind('exchange.activity','route.activity'); $queue->consume('processMessage',AMQP_AUTOACK); function processMessage($envelope, $queue) { global $i; echo "Message $i: " . $envelope->getBody() . "\n"; $i++; }
当开始多个receive.php进程时,就可以发现每个进程轮询接收到队列中message
相关阅读:
Pingback引用通告: RabbitMQ的PHP教程之发布/订阅 (三) | 精彩每一天
Pingback引用通告: RabbitMQ的原理与操作示例 | 精彩每一天