发布 / 订阅模式可以理解为广播模式:即exchange会将消息转发到所有绑定到这个exchange的队列上。针对这种广播模式,RabbitMQ增加了exchange Type的选项 AMQP_EX_TYPE_FANOUT,这种类型在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key。
举了实际应用的场景,比方说用户注册(注销,更改姓名等)新浪,同时需要开通微博、博客、邮箱等等,如果不采用队列,按照常规的线性处理,可能注册用户会特别的慢,因为在注册的时候,需要调各种其他服务器接口,如果服务很多的话,可能客户端就超时了。如果采用普通的队列,可能在处理上也会特别的慢(不是最佳方案)。如果采用订阅模式,则是最优的选择。
看下面的处理过程:
- 用户提交username、pwd….等之类的基本信息,将数据提交register.php中.
- register.php对数据进行校验,符合注册要求,生成uid,并将和基本信息json后,发布一条信息,同时直接显示用户注册成功。
- exchange中的多个队列,如(queue.process、queue.boke、queue.weibo、queue.email)都接受到了这个消息,根据各业务自身的逻辑来处理。
下面来看下代码,和RabbitMQ的PHP教程之工作队列 (二) 的主要区别:
- 不在send.php中申明队列,因为发布 / 订阅模式下,是可以随时添加新的订阅队列。
- exchange的Type指定为fanout(广播模式)
- 队列不需要指定route key,绑定exchange。
send.php
$channel = new \AMQPChannel($conn); $channel->qos(0,0); $exchange = new \AMQPExchange($channel); $exchange->setName('exchange.register'); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); for($i=0; $i<6; $i++) { $result = $exchange->publish("message [$i]"); var_dump($result); }
receive.php
$channel = new \AMQPChannel($conn); $channel->qos(0,1); $queue = new \AMQPQueue($channel); $queue->setName("queue.process"); //其他的业务队列可以更改为weibo、boke $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind('exchange.register'); $queue->consume('processMessage',AMQP_AUTOACK); function processMessage($envelope, $queue) { global $i; echo "Message $i: " . $envelope->getBody() . "\n"; $i++; }
相关阅读: