一、概述
1.1 本文档的目标
此文档定义了一个网络协议-高级消息队列协议(AMQP), 它使一致的客户端程序可以与一致的消息中间件服务器进行通信.
我们面对的是这个领域有经验的技术读者,同时还提供了足够的规范和指南.技术工程师可以根据这些文档,在任何硬件平台上使用各种编程语言来构建遵从该协议的解决方案。
1.2 摘要
1.2.1 为什么使用AMQP?
AMQP在一致性客户端和消息中间件(也称为”brokers”)之间创建了全功能的互操作.
我们的目标是实现一种在全行业广泛使用的标准消息中间件技术,以降低企业和系统集成的开销,并且向大众提供工业级的集成服务。
我们的宗旨是通过AMQP,让消息中间件的能力最终被网络本身所具有,并且通过消息中间件的广泛使用发展出一系列有用的应用程序.
1.2.2 AMQP范围
为了完全实现消息中间件的互操作性,需要充分定义网络协议和消息代理服务的功能语义。
因此,AMQP通过如下来定义了网络协议(AMQP是协议!)和服务端服务:
- 一套确定的消息交换功能,也就是“高级消息交换协议模型”。AMQP模型包括一套用于路由和存储消息的功能模块,以及一套在这些模块之间交换消息的规则
- 一个网络线级协议(数据传输格式),AMQP促使客户端可使用AMQ模型来与服务器交互.
可以只实现AMQP协议规范中的的部分语义,但是我们相信这些明确的语义有助于理解这个协议。
1.2.3 高级消息队列模型(AMQ 模型)
我们需要明确定义服务器语义,因为所有服务器实现都应该与这些语义保持一致性,否则就无法进行互操作. 因此AMQ 模型定义了一系列模块化组件和标准规则来进行协作. 有三种类型的组件可以连接服务器处理链来创建预期的功能:
- “交换器(exchange)” :接收来自发布者应用程序的消息,并基于任意条件(通常是消息属性和内容)将这些消息路由到消息队列(message queues).
- “消息队列(message queue)”:存储消息直到它们可以被消费客户端应用程序(或多线程应用程序)安全处理.
- “绑定(binding)”:定义了消息队列与交换器之间的关系,并提供了消息路由条件.
使用这些模型我们可以很容易地模拟经典的存储转发队列和面向消息中间件的主题订阅概念. 我们还可以表示更为复杂的概念,例如:基于内容的路由,工作负载分配和按需消息队列。
大致上讲, AMQP 服务器类似与邮件服务器, 每个交换器都扮演了消息传送代理,每个消息队列都作为邮箱,而绑定则定义了每个传送代理中的路由表.发布者发送消息给独立的传送代理,然后传送代理再路由消息到邮箱中.消费者从邮箱中收取消息. 相比较而言,在AMQP之前的许多中间件系统中,发布者直接发送消息到独立收件箱(在存储转发队列的情况下),或者发布到邮件列表中 (在主题订阅的情况下).
区别就在于用户可以控制消息队列和交换器之间的绑定规则,这可以做很多有趣的事情,比如定义一条规则:“将所有包含这样消息头的消息都复制一份再发送到消息队列中”。
AMQ模型是基于下面的需求来驱动设计的:
- 支持与主要消息产品相媲美的语义。.
- 提供与主要消息产品相媲美的性能水平.
- 允许通过应用程序使用服务器特定语义来编程.
- 灵活性,可扩展性,简单性
1.2.4 高级消息队列协议(AMQP)
AMQP协议是具有现代特征的二进制协议: 它是多通道的, 协商的,异步的,安全的,便携的,自然的,高效的。 AMQP通常划分成两层:
- 功能层( functional layer)定义了一系列命令(分成功能独立的逻辑类),可为应用程序做有用工作。
- 传输层,将这些方法从应用程序应用搬到服务器并返回,它同时会处理通道复用,帧同步, 内容编码,心跳检测, 以及数据表示和错误处理.
在不改变协议可视功能的前提下,可使用任意的传输协议来替换传输层.也可以将同一个传输层用于不同的高级协议.
AMQ模型是基于下面需求驱动的:
- 为了保证一致性实现之间的互操作性。.
- 提供服务质量的显式控制.
- 一贯的、明确的命名.
- 通过协议允许服务器完全可配置.
- 使用命令标记法可轻易地映射到应用程序级别API.
- 一目了然,每个操作都在做自己的事情。
AMQP传输层的设计是由以下需求驱动的(没有特定顺序):
- 坚实的,使用二进制编码来打包和解包.
- 处理任何大小的消息,没有明显限制(实际上有限制).
- 在单个连接上携带携带多个通道.
- 长时间生存的,没有显著的内建限制。
- 允许异步命令在管道中排队.
- 易于扩展,以处理新的更新的需求。
- 与未来版本兼容。
- 可修复,使用一个强大的断言模型。
- 对编程语言保持中立。
- 适合代码扩展过程。
1.2.5 规模化部署
AMQP范围涵盖不同等级规模,大致如下:
- 开发/临时使用: 1台服务器, 1个用户, 10个消息队列,每秒 1个消息.
- 产品应用程序: 2服务器, 10-100个用户, 10-50个消息队列,每秒10个消息(36Kmessages/hour).
- 部门任务关键应用: 4 台服务器, 100-500个用户, 50-100 个消息队列, 每秒100个消息(360K/hour).
- 区域任务关键应用: 16 台服务器, 500-2,000 个用户, 100-500 个消息队列和主题,
每秒1000个消息(3.6M/hour). - 全球任务关键应用: 64 台服务器, 2K-10K 个用户, 500-1000 个消息队列和主题,
每秒10,000 个消息(36M/hour). - 市场数据(交易): 200 台服务器, 5K 个用户, 10K 主题, 每秒100K个消息(360M/hour).
规模越大,消息传输延迟就越重要.例如,市场数据变化相当快. 实现可以依据不同的服务质量和管理能力不区分对待,但必须与本规范兼容.
1.2.6 功能范围
我们要支持多种消息传递架构:
- 使用多个writers和一个reader来存储转发.
- 使用多个writers和多个readers来分散工作负载.
- 使用多个writers和多个readers来发布订阅
- 使用多个writers和多个readers来基于内容路由.
- 使用多个writers和多个readers来进行队列文件传输.
- 在两个节点之间进行点对点连接.
- 使用多个源和多个readers来分布市场数据.
1.3 文档组织
文档分成五个章节, 其中大部分设计为可按你的兴趣来独立阅读:
- “概述” (本章). 读本章来了解介绍.
- “总体架构”,这章中我们描述了架构和AMQP总体设计. 本章的目的是帮助系统架构师了解AMQP如何工作的.
- “功能说明”, 这章中我们定义了应用程序如何与AMQP一起协同工作. 本章先讲述了一个可读性讨论,其次是每个协议命令的详细规范,以作为实施者参考。在阅读本章之前,你应该阅读总体构架.
- “技术说明”,这章中我们定义了AMQP传输层是如何工作的. 这章由简短讨论和线路结构的详细说明组成.你如果想线路级协议是如何工作的,可以阅读本章.
1.4 约定
1.4.1 实现者指导方针
- 我们使用了IETF RFC 2119中定义的术语 MUST, MUST NOT, SHOULD, SHOULD NOT, 和 MAY.
- 当讨论需要AMQP服务器的特定行为时,我们使用术语”the server”.
- 当讨论需要AMQP客户端的特定行为时,我们使用术语”the client”.
- 我们使用术语”the peer” 来代表服务器或客户端.
- 如果没有指示,数字值是十进制的.
- Protocol常量是以大写名称表示. AMQP实现应当在定义,源码,以及文档中使用这些名称.
- 属性名称,方法名称,以及帧字段是以小写名称来表示. AMQP实现应当在定义,源码,以及文档中使用这些名称.
- AMQP中的字符串是区分大小写的.例如, “amq.Direct”与"amq.direct”是两个不同的交换器.
1.4.2 版本编号方式
AMQP版本使用两个或三个数字进行表示 – 主版本号,次版本号以及可选的修订版本号.为了方便,版本号表示为:major-minor[-revision] 或major.minor[.revision]:
- 官方说明中,major, minor, 和revision均支持0到99的数字.
- Major, minor, 和 revision 中100及其以上的数字保留用于内部测试和开发.
- 版本号表明了语法和语义互操作性。
- 版本 0-9-1 表示 major = 0, minor = 9, revision = 1.
- 1.1版本表示为major = 1, minor = 1, revision = 0. AMQP/1.1等价于AMQP/1.1.0或AMQP/1-1-0.
1.4.3 技术术语
这些术语在本文档的上下文中有特殊的意义:
- AMQP 命令架构(AMQP command architecture): 用于在AMQ模型架构上执行操作的线级协议命令.
- AMQ模块架构(AMQ model architecture): 表示关键实体和语义的逻辑框架,它必须对兼容AMQP实现的服务器可用, 使得服务器的状态可以通过客户端按本规范中定义的语义来实现.
- Connection: 网络连接,如.一个TCP/IP socket连接.
- Channel: 两个AMQP节点之间双向通信流. 通道是多路复用的,因此单个网络连接可以支撑多个通道
- Client: AMQP连接或通道的发起人. AMQP不是对称的.客户端生产和消费消息,而服务器端入列和路由消息.
- Server:接受客户端连接,实现AMQP消息队列和路由功能的过程.也称为”broker”.
- Peer: AMQP连接中任意一方.AMQP连接明确包含两个节点(一个是client, 一个是server).
- Frame: 一个正式定义的连接数据包。框架总是连接写和读数据包-作为连接上的一个单元。
- Protocol class: 用于处理特定类型功能的AMQP命令集合 (也称为方法).
- Method: 用于在节点之间传递特定类型的AMQP命令帧.
- Content: 服务器和应用程序之间传送的数据.这个术语是“message”的同义词。
- Content header:描述内容属性特定类型帧.
- Content body: 包含原始应用程序数据的特定类型帧.内容体帧完全不透明-服务器不以任何方式检查或修改其body内容.
- Message: 与Content同义.
- Exchange: 服务器中接收来自生产者应用程序的消息的实体,并可选择将这些消息路由到服务器中的消息队列.
- Exchange type: 交换器特定模型的算法和实现.而”交换器实例”是服务器中用于接收和路由消息的实体.
- Message queue: 保存消息并将它们转发给消费者应用程序的命名实体。
- Binding: 用于创建消息队列和交换器绑定关系的实现.
- Routing key: 一个虚拟地址,虚拟机可用它来确定如何路由一个特定消息.
- Durable: 服务器资源可在服务器重启时恢复.
- Transient: 服务器资源或消息会在服务器重启后擦除或重置.
- Persistent: 服务器存储在可靠的磁盘存储上的消息,并且在服务器重新启动后不丢失.
- Consumer: 从消息队列中请求消息的客户端应用程序.
- Producer: 发布消息到交换器中的客户端程序.
- Virtual host: 交换器,消息队列以及相关对象的集合. 虚拟主机是共享同一个身份验证和加密环境的独立服务器域。
- Assertion: 一个必须为true且可继续执行的条件.
- Exception: 一个失败的断言,可通过Channel或Connection来关闭.
在AMQP中这些术语没有特殊意义:
- Topic: 通常是分发消息的一种手段; AMQP使用一个或多个交换器类型来实现topics.
- Subscription:通常是从topics中接收数据的请求, AMQP以消息队列和绑定的方式来实现订阅.
- Service: 通常与server一个含义. AMQP使用“server”来遵循IETF标准命名。
- Broker: 通常与server一个含义。AMQP使用术语“client”和“server”来遵循IETF标准术语。
- Router: 有时用来描述交换器的动作.交换器也可以作为消息终点, “router” 在网络领域中有特殊意义,因此AMQP不会使用它.
二、总体架构
2.1 AMQ 模块架构
本节将讲解了服务器语法,须标准化来保证AMQP实现之间可互操作的语义。
2.1.1 主要实体
下面的图显示了整体AMQ模型:
我们可以总结一下中间件服务器是什么:它是一个接受消息的数据服务器,并主要做两件事情,依据条件将消息路由给不同的消费者,当消费者消费速度不够快时,它会把消息缓存在内存或磁盘上.
在AMQP之前的服务器中,它们会通过实现了特定类型路由和缓存的庞大引擎来完成. AMQ模块使用较小的模块结合更多样和稳健的方案来实现. 它把这些任务分成了两个不同角色:
- 交换器, 它接受来自生产者的消息并将它们路由到消息队列.
- 消息队列, 它存储消息消息并把它们转发给消费者应用程序.
在交换器和消息队列之间有一个明显的界面,称为绑定(binding),我们随后会进行讲解.
AMQP提供了运行时程序语义,主要有两方面:
- 运行时通过该协议可创建任意的交换器和消息队列类型的能力(有些是在标准中定义的,但可以添加其他作为服务器扩展)。
- 运行时通过协议包装交换器和消息队列来创建任何需要的消息处理系统的能力.
2.1.1.1 消息队列(Message Queue)
消息队列用于在内存或磁盘上存储消息, 并将它们依次投递给一个或多个消费者应用程序.消息队列是消息存储和分发的实体. 每个消息队列是完全独立的,且是一个相当聪明的对象。
消息队列有多个属性:私有的或共享的, 持久的或临时的,客户端命名的或服务器端命名的等等.
通过选择希望的属性,我们可以使用消息队列来实现传统的中间件实体,如:
- 共享存储转发队列:它可以持有消息,并以round-robin方式在消费者之间分发消息.存储转发队列通常是在多个消费者之间是持久化的.
- 私有回复队列:它可以持有消息,并把消息转发给单个消费者. 回复队列通常是临时的,服务端命名的,且对于某个消费者来说是私有的.
- 私有订阅队列:它可持有来自不同订阅源的消息,并将它们转发给单个消费者.
- 订阅队列通常是临时的,服务器端命名的,并对于某个消费者来说是私有的.
AMQP没有定义这些类别:这些只是如何使用消息队列的例子.创建如持久化,共享订阅队列的新实体没什么意义.
2.1.1.2 交换器(Exchange)
交换器接收来自生产者应用程序的消息,并将它们按照事先约定的规则路由到消息队列中.
这些预先约定的规则或条件称为绑定. 交换器会与路由引擎匹配.
也就是说,他们会检查消息,并使用他们的绑定表来决定如何将这些消息转发到消息队列或其他交换器中。交换器永远不会存储信息。“交换器”一词是指一类算法或算法实例。
更确切的说,我们谈到了交换器类型和交换器实例.
AMQP定义了许多交换器类型,它们覆盖了常见消息路由分发的基础类型. AMQP服务器提供了这些交换器的默认实例.使用AMQP的应用程序可额外创建它们自己的交换器实例.交换器类型是命名的,这样创建交换器的应用程序就可以告知服务器他们使用的交换器类型.
交换器实现也可以是命名的,这样应用程序可指定如何绑定队列来发布消息.
交换器还可以做更多的消息路由.它们可作为服务器内的智能路由代理,按需接受消息和生产消息. 交换器概念的目的是定义一套模型或标准,使得可以合理地扩展AMQP服务器,因为可扩展性会对互操作产生影响.
2.1.1.3 路由键 (Routing Key)
在一般情况下,交换器会检查消息的属性,如,它的header字段,body内容,并使用这些和其他来源中的数据来决定如何消息路由。
在大多数简单情况下,交换器会检查某个单一的键字段,我们称之为“路由键”。
- 路由键是一个虚拟地址,该虚拟地址可用来决定如何路由消息。
- 对于点对点的路由,路由键通常是消息队列的名称。
- 对于主题发布订阅路由,路由键通常是topic层次结构值。
- 在更复杂的情况下,路由键可以是消息header字段和/或消息内容的组合体。
2.1.1.4 类比电子邮件
如果我们做过一个类似的电子邮件系统,那我们会看到AMQP概念就不再是激进的:
- 一个AMQP消息类似于电子邮件;
- 消息队列就像一个邮箱;
- 消费者是一个可读取和删除电子邮件的邮件客户端;
- 交换器类似于一个MTA (邮件传输代理),它会检查电子邮件,并基于路由键和表来决定如何将电子邮件发送到一个或多个邮箱中;
- 路由键对应于电子邮件中To:,Cc: ,Bcc: 地址, 不包含服务端信息(路由完全是AMQP服务器内部的行为);
- 每个交换器实例类似于单独的MTA过程,用于处理一些电子邮件子域名或特定类型的电子邮件传输;
- 绑定类似于MTA路由表中的实体.
AMQP的强大来自于创建队列(邮箱),交换器(MTA过程),和绑定(路由实体)的能力,并可在运行时,将这些链接在一起,这远远超出了简单的”to” 地址到邮箱名称的映射.
我们不应该把email与AMQP类比得太远:它们之间有根本上的区别。AMQP面临的挑战是如何来存储和转发服务器中的消息,SMTP(IETF RFC 821)称其为“自治系统”。而在电子邮件中的挑战是自治系统之间如何路由消息。
在一个服务器内路由和在多个服务器之间路由,方式是不同的,应该有不同的解决方案.
在多个AMQP服务器(拥有不同实体)之间路由时,必须明确建立不同的桥梁, 为达到在多个独立实体之间传送消息的目的,一个AMQP服务器必须作为另一个AMQP服务器的客户端.这种工作方式很适合需要使用AMQP的业务类型,因为这些桥梁可以为业务流程,合同义务和安全问题打下基础.
2.1.2 消息流(Message Flow)
下面的图展示了通过AMQ模块服务器的消息流:
2.1.2.1 消息生命周期
一个AMQP消息由一组属性和不透明的内容组成。一个新消息是由生产者应用程序通过使用AMQP client API来创建的.生产者将“内容”附着在消息中,并对其设置一些消息“属性”。生产者使用路由信息来标记消息,其表面上类似于地址,但几乎可以创建任何模式。然后,生产者将消息发送到服务器上的交换器中。
当消息到达服务器时,交换器通常会将消息路由到一级存在于服务器上的消息队列中.如果消息不能路由,交换器会默默地丢弃或者将其返回给生产者. 生产者可以选择如何来处理未路由消息.
单个消息可存在于多个消息队列. 服务器可以不同方式进行处理,如通过拷贝消息或通过引用计数器等. 这不影响互操作性。然而,当一个消息被路由到多个消息队列时,它在每个消息队列上都是一样的。没有独特的标识符来区分不同的副本。
当消息到达消息队列时,消息队列会通过AMQP,立即尝试将消息传递给消费者应用程序.如果不行,消息队列会存储消息(按发布者要求存储在内存或磁盘中),并等待消费者准备好.如果没有消费者,消息队列通过AMQP将消息返回给生产者(再次地,如果生产者对此有要求的话).
当消息队列把消息投递给消费者后,它会从内部缓冲区中删除消息.这有可能立即发生,也有可能在消费者应答它已成功处理之后删除.消费者可选择如何以及何时来应答消息.同样地, 消费者也可以拒绝消息(一个否定应答).
生产者消息和消费者应答可以组成事务. 当一个应用程序同时扮演两种角色时,通常它会做混合工作:发送消息和发送应答,然后提交或回滚事务.
从服务器投递消息给消费者,这个过程不是事务的,它只能通过消息应答来处理.
2.1.2.2 生产者能看到什么
通过与电子邮件系统的类比,我们可以看到生产者不能直接向消息队列发送消息.
如果允许这样做,将会破坏AMQ模块中的抽象. 这就像允许电子邮件绕过MTA的路由表,直接发送到邮箱中一样. 这会导致在中间过程中不能插入过滤处理,例如,垃圾邮件检测.
AMQ 模块也使用了与电子邮件系统一样的准则:所有消息都发向单一的某个点:交换器或MTA,然后这个点根据规则和隐藏在发送者中的信息来检查消息,并将消息路由到落脚点(对于发送者来说,信息仍然是隐藏的).
2.1.2.3 消费者能看到什么
当我们站在消费者角度来看与电子邮件系统的类比,这就开始崩溃了(break down). Email客户端是被动的 – 它们可以读取它们的邮箱,但它们不能对邮箱的收取产生任何影响.而AMQP消费者也可以是被动的,就像email客户端一样. 也就是说,我们可以编写一个程序来希望特定消息队列准备好绑定, 并且应用程序脱离消息队列来简单处理消息(译者注:不懂).
此外,我们也允许AMQP 客户端程序执行下面的操作:
- 创建或销毁消息队列;
- 通过绑定来定义消息队列填充的方式;
- 选择不同的交换器,这将完全改变路由语义.
使用协议,这有点像电子邮件系统能做的:
- 创建一个新邮箱;
- 告诉MTA带特定header字段的消息都可以拷贝到这个邮箱中;
- 完全改变电子邮件系统解析地址和其它消息头的方式
我们看到AMQP更像是一种语言的连接片而非一个系统.这正是目标的一部分,通过协议来使服务器行为可编程化.
2.1.2.4 原子模式
大多数集成架构不需要这个级别的复杂度.就像业余摄影师一样,大多数AMQP用户需要傻瓜式的模式. AMQP通过两方面来简化了概念:
- 针对消息生产者的默认交换器;
- 基于队列名称作为路由键来匹配默认绑定.
实际上,给予适当的权限,默认的绑定让生产者直接发送消息到消息队列–它模拟了传统中间件中简单的“发送到目的地”的解决方案。
默认绑定不会阻止消息队列更复杂方式的使用。然而它使得在不需要了解交换器和绑定如何工作的情况下,就可以使用AMQP.
2.1.3 交换器
2.1.3.1 交换器类型
每种交换器类型都实现了某种路由算法.这里有许多标准的交换器类型(将在”功能说明"章节中讲解), 但有两点是很重要的:
- 基于路由键来路由的direct 交换器类型. 默认交换器是direct交换器.
- 基于路由模式来路由的topic 交换器类型.
在启动时,服务器将会创建一系列交换器,如direct 和topic交换器.
2.1.3.2 交换器生命周期
每个AMQP 服务器都预先创建了许多交换器(实例).这些交换器当服务器启动时就存在了,不能被销毁. AMQP 应用程序也可以创建它们自己的交换器.AMQP不会使用像这样的”create”方法,相反它使用 “declare”方法,其意义是:”如果你不存在就创建,否则继续”.这是合理的:应用程序可以为了私有使用而创建交换器,并在完成工作时进行销毁. AMQP提供了方法来销毁交换器,但一般来说,应用程序不会这样做.在本章我们的例子中,我们假设交换器已在服务器启动时创建过了. 我们不会展示声明交换器的应用程序.
2.1.4 消息队列
2.1.4.1 消息队列属性
当客户端程序创建了消息队列时,它可以选择一些重要的属性:
- name – 如果没有指定,服务器会选择一个名称,并将其提供给客户端.一般来说,当应用程序共享消息队列时,它们已经对消息队列名称作了事先的约定,当一个应用程序需要出于其自身目的来要求队列时,它可让服务器提供一个名称.
- exclusive – 如果设置了,队列将只属于当前连接,且在连接关闭时删除.
- durable – 如果设置了, 消息会进行存储,并在服务器重启时激活. 当服务器重启时,它可能会丢失瞬时消息.
2.1.4.2 队列生命周期
这里主要有两种消息队列生命周期:
- 持久化消息队列:它们可被多个消费者共享,并可独立地存在- 即.不管是否有消费者接收它们,它都可以继续存在收集消息.
- 临时消息队列:对某个消费者是私有的,只能绑定到此消费者.当消费者断开连接时,消息队列将被删除.
也存在一些变化,如共享消息队列会在最后一个消费才断开连接时删除消息队列.下面的图展示了临时消息队列创建和删除的过程:
2.1.5 绑定
绑定表示的是交换和消息队列之间的关系,该关系告诉交换器如何路由消息。绑定是从客户端应用程序命令(一个拥有和使用消息队列的应用程序)中来绑到交换器上的。我们可以在伪代码中表达一个绑定命令,如下所示:
Queue.Bind <queue> TO <exchange> WHERE <condition>
让我们看一下三种典型使用情况: 共享队列,私有回复队列,发布订阅.
2.1.5.1 构造共享队列
共享队列是经典的中间件"点对点队列".在AMQP中,我们可使用默认交换器和默认绑定.我们假设消息队列称为”app.svc01″. 这里是创建共享队列的伪代码:
Queue.Declare queue=app.svc01
在这个共享队列中,我们有许多消费者.要从共享队列中消费, 每个消费者可以这样做:
Basic.Consume queue=app.svc01
要发送到共享队列, 每个生产者都要将消息发布到默认交换器:
Basic.Publish routing-key=app.svc01
2.1.5.2 构建回复队列
回复队列通常是临时的,服务器分配名称的. 它们通常也是私有的,即只能由单个消费者读取. 除了这些特殊情况外,回复队列使用与标准队列相同的匹配条件,因此我们也可以使用默认交换器.
下面是创建回复队列的伪代码, 这里的S:表示一个服务器回复:
Queue.Declare queue=<empty> exclusive=TRUE S:Queue.Declare-Ok queue=tmp.1
要发布到回复队列,生产者需将消息发送到默认交换器中:
Basic.Publish exchange=<empty> routing-key=tmp.1
有一个标准消息属性-Reply-To, 它专门设置用来携带回复队列的名称.
2.1.5.3 构建Pub-Sub 订阅队列
在经典中间件中,术语”订阅(subscription)” 在概念上是模糊的,它至少涉及了两个不同的概念: 匹配消息的条件和用于保存匹配消息的临时队列. AMQP把这个工作分成了绑定和消息队列两个部分.在AMQP中没有称为订阅的实体.
让我们来描述pub-sub订阅:
- 为单个消费者(或某些情况下,多个消费者)保存消息
- 通过一系列匹配主题,消息字段或内容的不同绑定方式来从多个源中收集消息
订阅队列与命名队列或回复队列之间的关键区别是订阅队列名称与路由目的无关,路由是通过抽象匹配条件来完成的,而不是1对1路由键字段匹配来完成的.
我们以常见的主题树pub-sub模型进行讲解并对其实现. 我们需要一个能够在主题树上匹配的交换器类型. 在AMQP中,这是”topic” 交换器类型. topic交换器会匹配类似”STOCK.USD.*”通配符, 如像”STOCK.USD.NYSE”这样的路由键.
我们不能使用默认的交换器或绑定,因为它们不会做topic风格的路由. 因此我们必明确地创建一个绑定.以下是创建和绑定pub-sub订阅队列的伪代码:
Queue.Declare queue=<empty> exclusive=TRUE S:Queue.Declare-Ok queue=tmp.2 Queue.Bind queue=tmp.2 TO exchange=amq.topic WHERE routing-key=STOCK.USD.*
要从订阅队列中消费消息,消费者需要这样做:
Basic.Consume queue=tmp.2
当发布消息时,生产者可以这样做:
Basic.Publish exchange=amq.topic routing-key=STOCK.USD.ACME
topic交换器会使用其它绑定表处理传入的路由键(“STOCK.USD.ACME”),并会找到一个匹配项tmp.2.然后它会把消息路由到那个订阅队列上.
2.2 AMQP 命令架构
本章节解释了应用程序如何与服务器对话.
2.2.1 协议命令 (类&方法)
中间件是复杂的,我们在设计协议结构的挑战是要驯服其复杂性。
我们的方法是基于类来建立传统API模型,这个类中包含方法,并定义了方法明确应该做什么.
这会导致大量的命令集合,但一个命令应该相对容易理解.
AMQP命令组合在类中.每个类都覆盖了一个特定功能领域.有此类是可选的 -每个节点都实现了需要支持的类.
有两种不同方法对话:
- 同步请求-响应,在其中一个节点发送请求,另一个节点发送回复.
同步请求和响应适用于性能不是关键的地方. - 异步通知, 在其中,一个节点发送消息但不希望得到回复.异步方法适用于性能是至关重要的地方.
为使处理方法简单,我们为每个异步请求定义了不同的回复. 也就是说,没有哪个方法可作为两个不同请求的回复.这意味着一个节点,发送一个同步请求后,可以接受和处理传入的方法,直到得到一个有效的同步答复. 这使得AMQP与更加传统的RPC协议是有区别的.
方法可以形式上定义为同步请求,同步回复(针对特定请求),或者是异步的. 最后,每个方法都形貌地定义为客户端(即. 服务器到客户端),或服务端(客户端到服务器).
2.2.2 映射AMQP到中间件API
我们已经设计AMQP是可映射到中间件的API.这种映射有一些是智能的(不是所有方法, 也不是所有参数对应用程序来说都是有意义的),也有一些是机械的(给定一些规则,所有方法都可以在无人工干预的情况下映射).
这么做的优势是对于那些了解AMQP语义(本章描述的类)的开发者会在它们使用的环境中找到相同的语义.
例如,下面是Queue.Declare 方法示例:
Queue.Declare queue=my.queue auto-delete=TRUE exclusive=FALSE
这会转换为线路级帧(wire-level frame):
或者更高级的API:
映射为异步方法的伪代码逻辑是:
映射为同步方法的伪代码逻辑是:
值得一提的是,对于大部分应用程序, 中间件可以完全隐藏在技术层面中, 而且实际API使用的影响会小于中间件的健壮性和能力性.
2.2.3 无确认
一个聊天式协议(chatty protocol)是很慢的. 如果在这些情况中,性能是很严重的,我们会使用异步.
一般我们从一个节点发送消息到另一个节点. 我们会使发送方法尽可能地快,而不用等待确认.在必要时,我们可以以较高级别来实现窗口和限制,如以消费者水平.
我们免除了确认,因为我们为所有操作使用了断言模型. 要么它们成功,要么就是我们有关闭通道或连接的异常.
AMQP中可以没有确认.成功是寂静的,而失败是喧闹的.当应用程序明确需要追踪成功和失败时,它们应该使用事务.
2.2.4 Connection类
AMQP是一个连接协议. 连接设计为长期的,且可运载多个通道. 连接生命周期是这样的:
- client打开与服务器的TCP/IP连接并发送一个协议头(protocol header).这只是client发送的数据,而不是作为方法格式的数据.
- server使用其协议版本和其它属性,包括它支持安全机制列表(Start方法)进行响应.
- client选择一种安全机制(Start-Ok).
- server开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询(Secure).
- client向server发送一个认证响应(Secure-Ok). 例如,对于使用”plain”机制,响应会包含登录用户名和密码.
server 重复质询(Secure) 或转到协商,发送一系列参数,如最大帧大小(Tune). - client接受或降低这些参数(Tune-Ok).
- client 正式打开连接并选择一个虚拟主机(Open).
- 服务器确认虚拟主机是一个有效的选择 (Open-Ok).
- 客户端现在使用希望的连接.
- 一个节点(client 或 server) 结束连接(Close).
- 另一个节点对连接结束握手(Close-Ok).
- server 和 client关闭它们的套接字连接.
没有为不完全打开的连接上的错误进行握手. 根据成功协议头协商(后面有详细定义),在发送或收到Open 或Open-Ok之前,如果一个节点检测到错误,这个节点必须关闭socket,而不需要发送任何进一步的数据。
2.2.5 Channel 类
AMQP是一个多通道协议. 通道提供了一种方式来将一个重量级TCP/IP连接分成多个轻量级连接.
这使得协议对于防火墙更加友好,因为端口使用是可预测的. 这也意味着传输调整和网络服务质量可以得到更好的利用.
通道是独立的,它们可以同时执行不同的功能,可用带宽会在当前活动之间共享.
这是令人期待的,我们鼓励多线程客户端应用程序经常使用”每个通道一个线程”编程模型.
然而,从单个client打开一个或多个AMQP servers连接也是完全可以接受的.
通道生命周期如下:
- client打开一个新通道(Open).
- server确认新通道准备就绪(Open-Ok).
- client和server按预期来使用通道.
- 一个节点(client或server) 关闭了通道(Close).
- 另一个节点对通道关闭进行握手(Close-Ok).
2.2.6 Exchange 类
交换器类让应用程序来管理服务器上的交换器。这个类可以让应用程序脚本自己布线(而不是依赖于一些配置接口)。注:大多数应用程序不需要这个级别的复杂度,传统的中间件是不太可能能够支持这种语义。
交换器生命周期如下:
- client 请求server确保交换器是否存在(Declare). client可细化到,”如果交换器不存在则进行创建”,或 "如果交换器不存在,警告我,不需要创建”.
- client发布消息到交换器.
- client可选择删除交换器(Delete).
2.2.7 Queue 类
queue类可让应用程序来管理服务器上的消息队列. 在几乎所有消费消息的应用程序中,这是基本步骤,至少要验证期望的消息队列是否实际存在.
持久化消息队列的生命周期相当简单:
- client断言消息队列存在(Declare, 使用”passive”参数).
- server确认消息队列存在(Declare-Ok).
- client从消息队列中读取消息。
临时消息队列的生命周期更加有趣:
- client创建消息队列(Declare,不提供队列名称,服务器会分配一个名称). server 确认(Declare-Ok).
- client 在消息队列上启动一个消费者. 消费者的精确功能是由Basic类定义的。
- client 取消消费者, 要么是显示取消,要么是通过关闭通道/连接隐式取消的
- 当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,server会删除消息队列.
AMQP 像消息队列一样为主题订阅实现了分发机制. 这使结构更多有趣,订阅可以在合作订阅应用程序池中进行负载均衡.
订阅生命周期涉及到额外的绑定阶段:
- client 创建消息队列(Declare),server进行确认(Declare-Ok).
- client 绑定消息队列到一个topic交换器 (Bind),server进行确认(Bind-Ok).
- client像前面的例子来使用消息队列.
2.2.8 Basic 类
Basic 类实现本规范中描述的消息功能.它支持如下主要语义:
- 从client发送消息给server, 异步发生(Publish)
- 启动和停止消费者(Consume, Cancel)
- 从server发送消息给client, 异步发生(Deliver, Return)
- 应答消息(Ack, Reject)
- 同步从消息队列中取消息 (Get).
2.2.9 Transaction 类
AMQP 支持两种类型的事务:
1. 自动事务: 每个发布的消息和应答都处理为独立事务.
2. Server 本地事务, 服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.
Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:
1. 应用程序要求在每个通道中都有事务(Select).
2. 应用程序做一些工作(Publish, Ack).
3. 应用程序提交或回滚工作(Commit, Roll-back).
4. 应用程序做一些工作,循环往复。
事务能覆盖发布内容和应答,但不能覆盖投递(deliveries). 因此回滚不能导致消息重新入队或者重新投递, 客户端有权在事务中确认这些消息。
2.3 AMQP 传输架构
这个章节解释了命令是如何映射到线路协议的.
2.3.1 一般描述
AMQP是二进制协议. 信息被组织成各种类型的帧(frames). Frames可以携带协议方法和其它信息.所有 帧(frames)都有同样的格式: 帧头(frame header),帧负载(frame payload)和帧尾(frame end).帧负载( frame payload)的格式依赖于帧类型(frame type).
我们假设有一个可靠的面向流的网络传输层(TCP/IP或相当的).
在单个套接字连接中,可以存在多个独立控制线程,它们被称为通道.
每个帧都使用通道编号来编号.通过交织它们的帧,不同的通道共享连接。对于任何给定的通道,帧运行在一个严格的序列,这样可以用来驱动一个协议解析器(通常是一个状态机).
我们使用一组小的数据类型,如位,整数,字符串和字段表来构造帧。帧字段是紧密包装的,不会使得它们缓慢或解析复杂。从协议规范中生成框架层是相对简单的。
线路级格式的设计是可扩展性,一般可以用于任意的高层协议(不只是AMQP)。我们假设AMQP将来会扩展、改进,随时间的推移线路级格式仍然会得到支持。
2.3.2 数据类型
AMQP数据类型用于方法帧中,它们是:
- Integers ( 1到8个字节),用来表示大小,数量,范围等等. Integers通常是无符号的,在帧中可能是未对齐的.
- Bits,用来表示开/关值.位被包装成字节。
- 短字符串(short string),用来保存短的文本属性.短字符串限制为255个字节,可以在无缓冲区溢出的情况下进行解析.
- 长字符串(long string),用来保存二进制数据块.
- 字段表(Field tables),用来保存名称-值对(name-value pairs). 字段值类型可以是字符串,整数等等.
2.3.3 协议协商
AMQP client 和server 可对协议进行协商.这就是说当client连接时,server可处理client能接受或修改的操作.当两个点对结果达成一致时, 连接会继续前行.协商是一种有用的技术,因为它让我们可以断言假设和前提条件。在AMQP,我们协商协议的下面方面:
- 实现协议和版本. server 可在同一个端口上保存多个协议.
- 加密参数和两者之间的认证.这是功能层的一部分,以前解释过。
- 最大帧大小,通道数量,以及其它操作限制.
达成一致的限制可能会使两者重新分配关键缓存区以避免死锁.每个传入的帧要么服从达成的限制(这是安全的),或者超过它们(在这种情况下,另一方必须断开连接).这非常符合"它要么工作,要么就完全不工作”的AMQP哲学.
两个节点达成一致的最低限度为:
- 服务器必须告诉客户端它提出了什么限制。
- 客户端进行响应,并可能减少其连接的限制。
2.3.4 限制帧
TCP/IP是一个流协议,即没有限制帧的内建机制. 现有协议可以几种不同的方式解决这个问题:
- 每个连接中只发送单个帧.这很简单,但很慢.
- 在流中添加帧定界符.这很简单,但解析较慢.
- 计算帧的大小, 并在每个帧的前面发送大小。这是简单和快速,和我们的选择.
2.3.5 帧细节
所有的帧都由一个头(header,7个字节),任意大小的负载(payload),和一个检测错误的帧结束(frame-end)字节组成:
要读取一个帧,我们必须:
- 读取header,检查帧类型(frame type)和通道(channel).
- 根据帧类型,我们读取负载并进行处理.
- 读取帧结束字节.
在实际实现中,如果性能很关键的话,我们应该使用读前缓冲(read-ahead buffering)”或“收集读取(gathering reads)”,以避免为了读一个帧而做三次独立的系统调用。
2.3.5.1 方法帧
方法帧可以携带高级协议命令(我们称之为方法(methods)).一个方法帧携带一个命令. 方法帧负载有下面的格式:
要处理一个方法帧,我们必须:
- 读取方法帧负载.
- 将其拆包成结构. 方法通常有相同的结构,因此我们可以快速对方法进行拆包.
- 检查在当前上下文中是否允许出现方法.
- 检查方法参数是否有效.
- 执行方法.
方法主体(bodies) 由AMQP数据字段(位,整数, 字符串和字符串表组成)构成. 编组代码直接从协议规范中生成,因此是非常快速地.
2.3.5.2 内容帧
内容是我们通常AMQP服务器在客户端与客户端之间传送和应用数据. 粗略地说,内容是由一组属性加上一个二进制数据部分组成的。它所允许的属性集合由Basic类定义,而这些属性的形式为内容头帧(content header frame)。其数据可以是任何大小,也有可能被分解成几个(或多个)块,每一个都有内容体帧(content body frame)。
看一个特定通道的帧,当它们在线路上传输时,我们可能会看到下面这样的东西:
某些方法(如Basic.Publish, Basic.Deliver等等.)通常情况下定义为传输内容.
当一个节点发送像这样的方法帧时,它总是会遵循一个内容头帧(conent header frame)和零个或多个内容体帧(content body frame)的形式.
一个内容头帧有下面的格式:
某些方法(如Basic.Publish, Basic.Deliver等等.)通常情况下定义为传输内容.
当一个节点发送像这样的方法帧时,它总是会遵循一个内容头帧(conent header frame)和零个或多个内容体帧(content body frame)的形式.
一个内容头帧有下面的格式:
我们将内容体放置在不同的帧中(并不包含在方法中),因此AMQP可支持零拷贝技术,这样其内容就不需要编组或编码. 我们将内容属性安放在它们自己的帧中,以便收件人可以有选择地丢弃他们不想处理的内容。
2.3.5.3 心跳帧
心跳是一种设计用来撤销(undo)TCP/IP功能的技术,也就是说在长时间超时后,它有能力通过关闭broker物理连接来进行恢复.在某些情景下,我们需要快速知道节点连接是否断开了,或者是由于什么原因不能响应了.因为心跳可以在较低水平上进行,我们在传输层次上按节点交换的特定帧类型来处理,而不是按类方法.
2.3.6 错误处理
AMQP使用异常来处理错误.任何操作错误(未找到消息队列,访问权限不足)都会导致一个通道异常. 任何结构化的错误(无效参数,坏序列的方法.)都会导致一个连接异常.异常会关闭通道或连接,同时也会向客户端应用返回响应码和响应文本.我们使用了类似于HTTP等协议和其它大多数协议中的三位回复代码和文字回复文本方案.
2.3.7 关闭通道和连接
连接或通道,对于客户端来说,当其发送Open时则被认为是“打开”的,对于服务器端来说,当其发送Open-Ok时则被认为是打开的。基于这一点,一个希望关闭通道或连接的对等体也必须使用握手协议来这样做。
可出于任何原因,可能会正常地或异常地关闭一个通道或连接-因此必须仔细小心。
对于突然或意外关闭,并不能得到快速探测,因此当发生异常时,我们可能会丢失错误回复代码。
正确的设计是对于所有关闭必须进行握手,使我们关闭后对方知道相应的情况。
当一个节点决定关闭一个通道或连接时,它发送一个Close方法。接收节点必须使用Close-Ok来响应Close,然后双方可以关闭他们的通道或连接。请注意,如果节点忽略了关闭,当两个节点同时发送Close时,可能会发生死锁。
2.4 AMQP Client 架构
可直接从应用程序中读写AMQP帧,但这是相当糟糕的设计.
即使是最简单的对话框也比较复杂(比如同HTTP比较),应用程序开发者没必要为了向消息队列发送消息, 而来理解二进制这样的东西. 推荐的AMQP client架构须由下面的多个抽象层组成:
- 帧层. 此层接受AMQP协议方法,并按某种语言格式(结构,类等等) 来序列化成线路级帧.帧层可以根据AMQP规范机械产生(这是在一个协议的建模语言,专为AMQP定义了XML实现).
- 连接管理层. 此层用于读写AMQP帧,并管理所有连接,会话逻辑.在此层中,我们可以封装打开连接和会话,错误处理,内容传输和接收的全部逻辑. 此层的大部分都可通过AMQP规范来生成.例如,规范定义了哪些方法可以携带内容, 因为逻辑发送方法和可选的发送内容可以机械的生成.
- API 层. 此层暴露了应用程序工作的特定API. API层可能会反映一些现有的标准,或暴露高层AMQP的方法,或对本节前面介绍的内容做一个映射。AMQP方法设计为使这些映射简单有用。API层本身可能是由多个层组成的,如.构建于AMQP方法API之上的高级API.
此外,通常还会有一些I / O层,这此可以是非常简单的(同步套接字读取和写入)或复杂的(完全异步多线程I / O)。此图显示了整体推荐的架构:
在本文档中,当我们说”client API”的时候,我们指的则是应用程序下的所有层(i/o,帧,连接按理和API层).我们通常将客户端API和应用程序分开说, 在这里,应用程序会使用客户端API来同中间件服务器进行对话.
三、功能说明
3.1 Server 功能说明
3.1.1 消息和内容
消息是中间件路由和队列系统处理的原子单元。消息可携带一份内容,它包括一个内容头,一组属性,和一个内容体,和持有一个不透明的二进制数据块。
一个消息可以对应到许多不同应用程序的实体:
- 一个应用程序级消息
- 一个传输文件
- 一个数据流帧等等.
消息可以持久化.一个持久化消息可以安全地存储在磁盘上,即使是在严重的网络故障,服务器崩溃、溢出等情况下也可确保投递.消息也可以有优先级.高优先级消息会在等待同一个消息队列时,在低优先级消息之前发送. 当消息必须被丢弃以确保服务器质量水平,将会优先丢弃低优先级消息.
服务器不能修改接收到并将传递给消费者应用程序的消息内容体. 服务器可在内容头中添加额外信息,但不能删除或修改现有信息.
虚拟主机(Virtual Hosts)
虚拟主机是服务器内的数据分区, 它为在共享基础设施上的管理带来了方便.
一个虚拟主机包括其命名空间,一组交换器,消息队列以及所有相关对象. 每个连接必须关联一个单个虚拟主机.
在认证后,客户端可在Connection.Open方法中选择虚拟主机. 这意味着,服务器上的认证方案可在此服务器上的所有虚拟主机上共享. 然而,对于每个虚拟主机来说,也可以独特的认证方案. 对于每个虚拟主机需要不同的身份验证方案的管理员应该使用单独的服务器。
连接中的所有通道都在同一个虚拟主机上工作.在同一个连接中,没有与不同虚拟主机通信的方式, 也没有在不断开连接重新开始的情况下,切换到其它虚拟主机的可能性.
该协议没有提供用于创建或配置虚拟主机的机制-这在服务器内是一个不确定的方式,是完全依赖于实现的。
3.1.3 交换器
交换器是一个虚拟主机内的消息路由代理。交换器实例(我们通常称之为“交换器”)接受消息和路由信息-主要是一个路由键-或者将消息传递到消息队列,或到内部服务。交换器是基于每个虚拟主机命名的。
应用程序可以在权限范围内自由地创建、共享、使用和销毁交换器实例.交换器可能是持久的、临时的或自动删除的。持久化的交换器会持续到他们被删除,临时的交换器会持续到服务器关闭。自动删除的交换器直到他们不再使用。服务器提供了一组特定的交换器类型。每个交换器类型都实现了一个特定的匹配和算法,如下一节中定义的。AMQP只要求少量的交换器类型,并推荐了一些。此外,每个服务器实现可以添加自己的交换类型。
交换器可以将单个消息并发地路由到的消息队列中。这将创建一个独立消息的多个实例。
3.1.3.1 Direct交换器类型
direct 交换器按如下方式来工作:
1. 消息队列使用路由键K来绑定交换器.
2. 发布者使用路由键R来向交换器发送消息.
3. 在K=R时,消息会传递到消息队列中.
server必须实现direct交换器,并且在每个虚拟主机中必须预定义两个direct交换器: 一个名为 amq.direct, 另一个无公共名称(为Publish方法的默认交换器).
注意,消息队列可以使用任何有效的路由键值进行绑定,但通常消息队列使用它们自己的名称作路由键来绑定.
事实上,所有消息队列必须能使用其自身队列名称作路由键自动绑定无名称的交换器上.
3.1.3.2 Fanout 交换器类型
fanout交换器类型按如下方式来工作:
1. 消息队列不使用参数来绑定交换器.
2. 发布者向交换器发送消息.
3. 消息无条件传递给消息队列。
fanout 交换器是微不足道的设计与实现.此交换器类型和预声明的交换器称为amq.fanout,它是强制的.
3.1.3.3 Topic交换器类型
topic交换器类型按如下方式来工作:
1. 消息队列使用路由模式P来绑定到交换器.
2. 发布者使用路由键R来向交换器发送消息.
3. 当R匹配P时,消息将被传递到消息队列.
用于topic交换器的路由键必须由0个或多个由点号
用于topic交换器的路由键必须由点分隔的零或多个单词组成.每个单词必须包含字母A-Z和a-z 以及数字0-9.
路由模式与路由键遵循相同的规则,* 用于匹配单个单词,# 用于匹配0个或多个单词.因此路由模式*.stock.# 会匹配路由键usd.stock 和eur.stock.db 但不匹配stock.nasdaq.
对于topic交换器我们建议的设计是保持所有已知路由键的集合,当发布者使用了新的路由键时,才更新此集合. 通过给定一个路由键来确实所有绑定是可能的,因此可为消息快速找到消息队列. 此交换器类型是可选的.
server应该实现topic交换器类型,在这种情况下,server 必须在每个虚拟主机中预先定义至少一个 topic交换器,其名称为amq.topic.
3.1.3.4 Headers交换器类型
headers交换器类型按如下方式进行工作:
1. 消息队列使用包含匹配绑定和带有默认值的header参数表来绑定交换器.在这种交换器类型中,不使用路由键.
2.发布者向交换器发送消息,这些消息的headers属性中包含名称-值对的表.
3.如果消息头属性与队列绑定的参数相匹配,则消息传递给队列。
匹配算法是由参数表中的名称值对这样的特殊绑定参数来控制的. 这个参数的名称是’x-match’.
它可以接受两种值, 以表示表格中其它的名称值对将如何来进行匹配:
- ‘all’ 则表明所有其它的名称值对必须与路由消息的头属性相匹配(即.AND匹配)
- ‘any’ 则表明只要消息头属性中的任何一个字段匹配参数表中的字段,则消息就应该被路由(即. OR匹配).
绑定参数中的字段必须与消息字段中的字段相匹配,这些情况包括:如果绑定参数中的字段没有值且在消息头中存在相同名称的字段,或者绑定参数中的字段有值,且消息属性中存在同样的字段且有相同的值。
任何以’x-‘而不是’x-match’开头的字段为将来保留使用并会被忽略.
server应该实现headers交换器类型, 且server必须在每个虚拟主机中预先声明至少一个headers交换器,且名称为amq.match.
3.1.3.5 System交换器类型
system交换器类型按如下方式进行工作:
1. 发布者使用路由键S来向交换器发送消息.
2. system交换器将其传递给系统服务S.
系统服务以”amq.”开头,为AMQP保留使用. 在服务器环境中,所有其它名称可自由使用. 此交换器类型是可选的.
3.1.3.6 实现定义的交换器类型
所有非规范交换器类型必须以”x-“开头. 不以”x-“开头的交换器作为将来AMQP标准保留使用.
3.1.4 消息队列
消息队列是一个名为FIFO的缓冲区且为一组消费者应用程序保存消息.
在其权限范围内,应用程序可以自由地创建、共享、使用和销毁消息队列.
注意,在一个队列中可能存在多个读者,或存在客户端事务,或存在使用了优先级字段,或存在使用了消息选择器,或特定实现了投递优化的队列可能不会真正地展现出FIFO特性. 唯一可以确保FIFO的方式是只有一个消费者连上了队列.在那些情况下,队列可描述为弱-FIFO.
消息队列可能是持久化的或自动删除的.持久化消息队列会持续到它们删除时为止. 临时消息队列可持续到服务器关闭时为止.自动删除消息队列可持续到它们不再使用时为止.
Message队列可将消息存储在内存,磁盘,或两者的组合中.消息队列是基于虚拟主机来命名的.
消息队列保存信息,并可在一个或多个消费客户端之间进行分发.路由到消息队列中的消息不能再发给多个客户端,除非在失败或拒绝后进行重发.
单个消息队列可在同个时间可独立地持有不同类型的内容.也就是,如果Basic和文件内容都发给了同一个消息队列,这些将会作为请求独立地分发给消费应用程序.
3.1.5 绑定
绑定是消息队列和交换器之间的关系.绑定特有的路由参数将告诉交换器那些队列应该得到消息. 应用程序可根据需要来驱动消息流向它们的消息队列. 绑定的寿命依赖于定义它们的消息队列 – 当消息队列被销毁时,其绑定也会被销毁.Queue.Bind 方法的特定语义将依赖于交换器类型.
3.1.6 消费者
我们使用术语”consumer”来表示应用程序和控制客户端程序来接收消息队列中的实体.当客户端启动一个消费者,它就在服务器中创建了一个消费实体 .当客户端退出一个消费者时,它就销毁了一个服务器中的消费者实体. 属于单个客户端通道的消费者可异步地将消息发送到队列中.
3.1.7 服务质量
服务质量控制了消息发送的速度. 服务质量依赖于被分发的内容类型.一般的服务质量,在客户端应答消息前,会使用预提取的概念来指定发送多少个消息或多少个字节的数量. 目标是提前发送消息数据,以减少延迟。
3.1.8 确认/应答
应答是从客户端程序发出的正式信号,用以表示消息队列中的消息已经得到成功处理. 有两种应答模型:
1. 自动地(Automatic), 在这种情况下,只要消息投递到了应用程序,服务器就会立即从消息队列中删除消息(通过 Deliver 或 Get-Ok 方法).
2. 明确地(Explicit),在这种情况下,客户端程序必须对每个消息发磅一个Ack方法以表示消息被处理了.客户端层可以不同方式来实现明确应答,如.只要收到了消息或当应用程序表示消息已经处理了.
这些区别不会影响AMQP或互操作性.
3.1.9 流控制(Flow Control)
流控制是一个用来中止节点消息流的紧急过程. 它在客户端和服务器端都按同样方式工作,且都是由Channel.Flow命令实现的. 流控制是唯一可以阻止一个过度生产发布者的机制.如果它使用消息确认(这通常意味着使用事务), 消费者则可以使用更优雅的预取机制窗口。
3.1.10 命名约定
这些约定规范了AMQP实体命名. 服务器和客户端必须遵守这些约定:
- 用户定义的交换器类型前辍必须是”x-“
- 标准交换器实例前辍是”amq.”
- 标准系统服务前辍是”amq.”
- 标准消息队列前辍是”amq.”
- 所有其他的交换器、系统服务和消息队列名称都在应用程序空间中。
3.2 AMQP 命令说明(Classes & Methods)
3.2.1 解释性注释
出于互操作原因,AMQP方法可以定义特定的最小值(如每消息队列的消费者数量)。这些极小值被定义在每个类的描述中。
遵从AMQP的实现应该为这些字段实现合理值, 最小值只用在最小能力的平台上.
语法使用这样的标记法:
- ‘S:’ 指示从服务器发送到客户端的数据或方法;
- ‘C:’ 指示从客户端发送到服务器的数据或方法;
- +term or +(…) 表达式表示1个或多个实例;
- *term or *(…) 表达式表示0个或多个实例.
我们定义的方法是:
- 一个同步请求(“syn request”).发送节点应该等待特定的回复方法,但可以异步实现此方法;
- 一个同步回复(“syn reply for XYZ”);
- 一个异步请求或答复 (“async”).
3.2.2 类和方法细节
这部分是由生成的文件amqp-xml-spec.odt提供。
四、技术说明
4.1 IANA分配的端口号
IANA为标准AMQP的TCP和UDP分配了5672端口。UDP端口被保留用于将来的组播实现。
4.2 AMQP 线程级格式
4.2.1 正式协议语法
我们为AMQP提供了一个完整语法(这只是AMQP提供的参考,跳到下一节,你会发现不同的帧类型和格式):
我们使用了IETF RFC 2234中定义的增强BNF语法. 总体而言,
- 规则的名称仅仅是名称本身。
- 终端是由一个或多个数字字符指定的,这些字符的基本解释为“d”或“x”。
- 通过列出一系列规则名称,一个规则可以定义一个简单的,有序的字符串的值.
- 其他数值的范围可以简洁指定,使用破折号(“-”)来表示替代值的范围。
- 在圆括号中的元素被视为单个元素,其内容是严格有序的。
- 由/分隔的元素是可替代值.
- 元素之间的操作符 “*”表示重复.完整格式为: “<a>*<b>element”,这里<a>的<b>是可选的十进制值, 表示只能出现大于<a>而小于<b>的元素.
- 规则形式: “<n>element” 等价于<n>*<n>element.
- 方括号中的元素是可选元素.
4.2.2 协议头
client必须通常发送一个协议头开始新连接.它是8字节序列:
协议头由大写字母”AMQP”,其后跟常量%d0组成:
- 1. 协议主版本号, 按照章节1.4.2中描述的使用.
- 2. 协议次版本号, 按照章节1.4.2中描述的使用.
- 3. 协议修订版本, 按照章节1.4.2中描述的使用.
该协议协商模型与现有HTTP协议兼容,使用常量文本字符串来发起连接, 并使用防火墙来检测协议的开始以决定应用什么规则.
client和服务通过以下方式来达成协议版本一致:
- client打开一个到AMQP服务器的新socket连接,并发送协议头.
- server可接受或拒绝协议头.如果它拒绝了协议头,它将会输出一个有效的协议头到socket,然后再关闭socket.
- 否则它会同意(leaves)socket打开,并相应地实现协议.
示例:
实现者指导方针:
- server可接受非AMQP协议,如HTTP.
- 如果server无法识别socket数据中的前5个字节,或者它不支持client请求的协议版本,它必须输出一个有效的协议头到socket,然后再关闭socket (必须确保client应用程序能收到数据) ,最后再关闭socket连接.服务器可以打印诊断信息以辅助调试。
- client可使用服务器支持的最高版本来进行检测,如果收到了服务器发回的这种信息,就可使用较低版本来进行重连
- 实现了多版本AMQ的Clients和servers都应该使用8字节的协议头来标识协议.
4.2.3 通用帧格式
所有帧都以7个字节的头开始,其中包括一个type字段 ,一个channel字段和一个size字段:
AMQP 定义了如下的帧类型:
- Type = 1, “METHOD”: 方法帧
- Type = 2, “HEADER”: 内容头帧
- Type = 3, “BODY”: 内容体帧.
- Type = 4, “HEARTBEAT”: 心跳帧.
通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧.
size字段是负载的大小,不包括结束帧字节. 由于AMQP假设是一个可靠的连接协议,我们使用结束帧来检测错误客户端和服务器实现引起的错误.
实现者指导方针:
- 结束帧必须是十六进制值%xCE.
- 如果一个节点收到了未定义类型的帧,它必须将其视为致命的协议错误,并关闭连接,而不进一步地发送任何数据
- 当一个节点读取到帧时,在解码帧前,它必须检查结束帧是否是有效的. 如果结束帧无效,它必须将其视为致使的协议错误,并关闭连接,而不进一步地发送任何数据. 它应该记录相关问题的日志信息,这样就可以服务器或客户端帧代码实现中表示错误.
- 节点发送的帧大小不能超过约定的大小. 节点收到超过大小的帧时,必须发出一个回复码为501(帧错误)的连接异常信号.
- 对于所有心跳帧,方法帧,连接类的头和体,通道编号必须为0. 节点收到非0通道编号的这些帧必须使用回复码503(无效命令)来发出异常信号.
4.2.4 方法负载
方法帧的体包括一个不可变的数据字段列表,称为”arguments”.所有方法体都以类型和方法的标识符开始:
实现者指导方针:
- class-id 和 method-id是由AMQP类和方法定义的常量.
- arguments 是特定于每个方法中的一组AMQP字段.
- Class id 中%x00.01-%xEF.FF范围内的值被AMQP标准类保留使用.
- Class id 中%xF0.00-%xFF.FF (%d61440-%d65535) 范围内的值可用于非标准扩展类实现.
4.2.5 AMQP 数据字段
AMQP有两种级别的数据字段:用于方法参数的原生数据字段, 以及用于多个应用之间传递数据的字段表. 字段表是原生数据字段的超集.
4.2.5.1 Integers
AMQP定义了这些原生整数类型:
- 无符号字节(8 bits).
- 无称号短整形(16 bits).
- 无符号长整形(32 bits).
- 无符号长长整形(64 bits).
整形和字符串长度总是无符号的,且按网络字节顺序保存. 当存在两个高低系统时(如.两个Intel CPUS),我们不会对它们的交互尝试优化.
实现方针:
实现不能假设帧内的整形编码在内存边界中是对齐的.
4.2.5.2 Bits
AMQP定义了一个原生位字段类型. 位累积成整个字节. 当在帧中两个或更多位相邻时,它们会被包装成一个或多个字节,且在每个字节中以低位开始.
没有要求在一个帧中的所有位必须是连续的,但这通常是做,以尽量减少帧尺寸。
4.2.5.3 Strings
AMQP 字符串是可变长度,由一个整数长度后跟零个或多个字节数据表示. AMQP定义了两种原生字符串类型:
- 短字符串(Short strings),以8位无称号整形长度后跟0个或多个字节数据存储. 短字符串可携带最多255字节的UTF-8数据, 但不能包含二进制零字节.
- 长字符串(Long strings), 以32位无称号整形长度后跟0个或多个字节数据存储. 长字符串可包含任意数据.
4.2.5.4 时间戳(Timestamps)
时间戳是以精度为1秒的64位POSIX time_t 格式保存的.使用64伴可以避免31位和32位相关的time_t值概括问题(wraparound issues).
4.2.5.5 字段表
字段表是包含名称-值对的长字符串. 名称-值对编码为:以短字符串定义名称,字节定义值类型和值. 有效的表字段类型是原生整形,位,字符串,时间戳类型的扩展. 多字节整形字段通常是按网络字节顺序保存的.
指导方针:
- 字段名称必须以字母开头,其后可跟’$,’#’,数字,下划线,最大长度为128个字符.
- server应该验证字段名称,如果收到了无效的字段名称,它应该使用回复码503(语法错误)来发出异常信号.
- 十进制值不用于支持浮点值,它是固定的业务值,如货币汇率和金额。其字节编码代表了位置编号,其后跟着一个无符号的长整数.“十进制”是无符号的.
- 重复字段是非法的。对于一个包含重复字段的表,其行为是未定义的。
4.2.6 内容帧
某些特定的方法(Publish, Deliver, etc.) 会携带内容.请参考 “Functional Specifications” 来了解每种方法的说明,以及它们是否是携带内容的方法.
内容由1个或多个帧组成:
- 只有一个内容头帧能提供内容属性.
- 可选的, 可以有1个或多个内容体帧.
特定通道上的内容帧是严格有序的. 也就是说,它们可以和其它通道的帧混合,但同一个通道内两个帧是不可能混合或重叠, 也不可能出现单个内容上的内容帧与相同通道上的方法帧相混合.
注意,任何非内容帧都会明确地标识内容的结束. 尽管可从内容头中知道内容的大小,但也允许发送者在不关闭通道的情况下中止内容发送.
实现者指导方针:
收到不完整或错误格式内容的节点必须使用回复码500(非希望帧)抛出一个连接异常. 这包括缺少内容头,内容头中错误的class IDs,缺少内容体帧等等.
4.2.6.1 内容头
内容头负载有下面的格式:
实现者指导方针:
- class-id必须与方法帧class id匹配. 节点必须对无效的class-id使用501回复码(帧错误)抛出一个连接异常.
- weight字段未使用且必须是0.
- body大小是一个64位值,它定义了内容体的总大小,也就是后面内容体帧的body大小的总和. 0表示无内容体帧.
- property flags是位数组,它表示每个属性的存在性. 位是从最高到最低进行排序的,位15代表第一个属性.
property flags可指定多于16属性.如果最后位(0)被设置了,这表明其后有进一步的属性标志字段。根据需要,这里有许多属性标志字段。
- 属性值是特定类的AMQP数据字段.
- 位属性仅由它们各自的属性标志(0或1)表示,并且在属性列表中不存在。
- 内容帧中的通道编码不能为0.在内容帧中收到0通道编号的节点必须使用504回复码(通道错误)来发出异常信号
4.2.6.2 内容体
内容体负载是是不透明的二进制块,其后跟着一个结束帧字节:
内容体可以根据需要分成多个帧.帧负载的最大大小可在连接时,由两端进行协商.
实现者指导方针:
节点必须要能将分成多个帧的内容体作为单一集合进行存储处理,要么分成更小的帧重新传输,要么 连接成单个块分发给应用程序.
4.2.7 心跳帧
心跳帧告诉收件人发件人仍然是活的. 在连接时,心跳帧的速率和时间都可以调整.
实现者指导方针:
- 心跳帧的通道编号必须为0. 收到无效心跳帧的节点需使用501回复码(帧错误)来抛出异常.
- 如果节点不支持心跳,它必须在不发出错误或失败信号的情况下丢弃心跳帧.
- client收到Connection.Tune方法后,必须要开始发送心跳, 并在收到Connection.Open后,必须要开始监控.server在收到Connection.Tune-Ok后,需要开始发送和监控心跳.
- 节点应该尽最大努力按固定频率来发送心跳. 心跳可在任何时候发送. 任何发送字节都可作为心跳的有效替代,因此当超过固定频率还没有发送非AMQP心跳时,必须发送心跳.如果节点在两个心跳间隔或更长时间内,未探测到传入的心跳,它可在不遵循Connection.Close/Close-Ok握手的情况下,关闭连接,并记录错误信息.
- 心跳应该具有持续性,除非socket连接已经被关闭, 包括在Connection.Close/Close-Ok 握手期间或之后的时间.
4.3 通道复用
AMQP 允许节点创建多个独立的控制线程.每个通道都可作为共享单个socket的虚拟连接:
实现者指导方针:
- AMQP节点可支持多个通道.在连接协商期间,可定义最大通道数目,节点可协商这个数值为1.
- 每个节点都应该以公平的方式平衡所有打开通道的流量. 这种平衡可以每帧为基础,也可以以每个通道上的总交通流量为基础. 节点不应该允许一个非常繁忙的通道让一个不太繁忙的通道饿死.
4.4 可见性保证
服务器必须确保客户端对服务器状态的观察是一致的。
下面的示例说明了在这种情况下,客户端的观察方法:
- Client 1 和 Client 2 连上了同一个虚拟主机
- Client 1 声明了一个队列
- Client 1 收到了Declare.Ok回复 (观察”的一个例子)
- Client 1 将其告知了Client 2
- Client 2 对同一个队列做了被动声明
可见性必须保证Client 2能看到队列(在没有删除的情况下)
4.5 通道关闭
当发生以下事件时,server会考虑通道已经关闭了:
- 节点关闭了通道或其父连接使用了Close/Close-Ok握手.
- 节点在通道或父连接上抛出了异常.
- 节点未使用 Close/Close-Ok握手关闭了父连接socket.
当服务器关闭通道时,通道上任何未应答的消息将标记为重新分发.
当服务器关闭连接时,它会删除连接所拥有的自动删除信息.
4.6 内容同步
在某些情况下,同步请求响应方法会对同一个信道上的异步内容传递产生影响,包括:
- Basic.Consume 和 Basic.Cancel 方法, 这些会启动和停止消息队列中的消息流.
- Basic.Recover 方法,它会要求服务器重新分发消息到通道.
- Queue.Bind, Queue.Unbind, 和Queue.Purge 方法, 它会影响消息进入消息队列.
实现者指导方针:
请求-响应效果在response方法之前必须不可见,但在之后必须可见.
4.7 内容排序保证
流经通道的方法顺序是稳定的:方法按发送时的顺序接收. 这是由AMQP使用的TCP/IP传输所保证的.
此外,服务器也会按一种稳定的方式来处理内容.尤其是,经过服务器中单个路径的内容会保持顺序.
对于设定了优先级并经过单个路径内容,我们定义了一个内容处理路径-由一个传入通道,一个交换器,一个队列和一个传出通道组成.
实现者指导方针:
server必须保持流经单个内容处理路径上的顺序性,除非在Basic.Deliver或Basic.Get-Ok方法上设置了redelivered字段,可根据条件规则来设置字段.
4.8 错误处理
4.8.1 异常
使用标准的异常编程模型, AMQP不会发出成功信号,只在失败时才发出信号. AMQP定义了两种异常级别:
- 通道异常.指那些关闭通道引起的错误.通道异常通常是因为软错误引起的,这些错误并不影响应用程序的其它部分.
- 连接异常. 这些关闭socket连接的异常通常是因为硬错误造成的,如程序错误,错误配置或其他需要干预的情况。
4.8.2 回复代码格式
AMQP 回复代码按照 IETF RFC 2821的回复代码的严重程度和理论进行定义.
4.9 限制
AMQP规范为将来的AMQP扩展或同种线路级格式使用了如下限制:
- 每个连接上的通道数量: 16位通道数量.
- 协议类数量: 16位class id.
- 每个协议内的方法数量: 16位 method id.
AMQP规范对于数据做了如下限制:
- 短字符串的最大长度为: 255字节.
- 长字符串或字段表的最大长度: 32位大小.
- 帧负载的最大大小: 32位大小
- 内容的最大长度: 64位大小.
服务器或客户端也可以对资源施加自己的限制,如并发连接的数量、每个通道的消费者数量、队列的数量等。这些不影响互操作性,因此并没有指定。
4.10 安全
4.10.1 目标和原则
为了防止缓冲区溢出,我们在所有地方都使用特定长度的缓冲区. 当读取数据时,所有数据都可以使用允许的最大长度来进行验证.无效的数据可以被明确地处理,通过关闭通道或连接。
4.10.2 拒绝服务攻击
AMQP 通过回复码并关闭通道或连接来处理错误.这避免了错误出现后的模糊状态.在连接协商期间,服务器可假设特殊条件是因获取访问服务器的敌对尝试所造成的.对于连接协商中的任何异常,一般处理是暂停该连接 (可能是一个线程)几秒种时间,然后再关闭网络连接. 这包括语法错误,过大数据,或认证失败.服务器应该记录所有这些异常标志或阻止客户端挑起多个故障.
Pingback引用通告: PHP中的AMQP类 | 精彩每一天