说明
本节主要叙述
RabbitMQ
中的AMQP 0-9-1
协议即Advance Message Queuing Protocol - 高级消息队列协议
,该协议是理解RabbitMQ
消息模型的重要理论基础。
AMQP
模型和AMQP 0-9-1
协议概览
什么是AMQP 0-9-1
AMQP 0-9-1
是一种高级消息协议,它允许特定的客户端应用程序能够与特定的消息代理中间件(messaging middleware brokers)
进行消息传输和交流。
消息代理中间件和其扮演的角色
中间件接收来自于消息发布者(也叫生产者,产生消息的应用程序组)传递的消息并且把它们路由给接收者(也叫消费者,处理消息的应用程序组)
由于
AMQP 0-9-1
是一种网络协议,所以生产者、消费者、和消息代理
都可以运行在网络中不同的机器上。
AMQP 0-9-1 模型简介
AMQP 0-9-1
模型定义了如下规则:消息被发送到交换机
exchanges
,而后交换机利用绑定规则bindings
将消息副本发布到消息队列queue
,然后消息队列中的消息,要么被传递给了订阅消息队列的消费者,要么消费者按照需要从消息队列拉取fetch/pull
消息。
当一条消息发布的时候,发布者可能会指定一些消息属性message attributes(也叫message meta-data消息元数据)
,其中有一些消息属性是用于消息中间件处理消息,其余的部分则是用于消息消费者对于消息中间件完全透明。由于网络的不稳定性,消息在传输过程中可能出现失败的情况,鉴于此
AMQP 0-9-1
提供了一种消息确认机制message acknowledgements
: 当一条消息传递给消费者后该消费者发送一条通知notifies
给消息中间件来确认消息,无论是自动的还是开发者自己这样做,当消息确认机制使用时,只有当消息代理收到通知后才会将该条或该组消息从消息队列中移除。在一些特定情况下,比如当消息不能被路由时,消息可能会被返回给消息发布者,或者删除,或者如果消息代理实现了某种扩展插件,则这些无法被路由的消息可能会被放入一个称之为
dead letter queue
(死亡标记队列)的队列中,发布者可以通过指定一些确定的消息属性 message attributes
来响应出现这种情景时消息应该如何被处理。
exchange、bindings、queue
均被称为AMQP实体-AMQP entities
AMQP是一种可编程的协议
AMQP 0-9-1
是一种可编程协议,实体和路由规则应该被实现该协议的应用来定义。协议只是为具体操作提供一种处理的规范,具体实现(exchange和queue如何路由,如何定制绑定规则等
)应该有具体应用来完成。这种方式给了开发者很大的自由空间,但是也需要他们能意识到可能潜在的定义冲突,实际上定义冲突很少见,大多都由错误的设定引起。
交换机和交换机的类型
交换机
exchanges
是AMQP 0-9-1
协议定义的消息应该被送达的实体。交换机拿到消息并且将它路由到一个或者多个队列中去,具体的路由算法由交换机的种类决定。交换机类型和路由规则被统称为(binding)
绑定规则(交换机和消息队列间的绑定规则),AMQP 0-9-1
代理提供了四种交换机类型
名称 默认预定义名称 Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)
除了交换机类型以外,交换机还定义了一些属性,其中最重要的属性如下:
- Name 名称
- Durability 持续性(控制交换机能从在重启后恢复)
- Auto-delete 交换机在最后一条消息出栈之后是否删除
- Arguments 参数,可选用于插件或者消息代理的一些特殊用途
交换机可以是持续性的也可以是瞬态的,持续性的交换机可以在重启后恢复,而瞬态的交换机则需要重新定义。
Default exchange/默认交换机
默认交换机是被消息代理预定义的没有名称的直转交换机
Direct exchange也有叫直连交换机
他有一个特殊属性使得它非常有用,每一个创建的消息队列都会使用队列的名称绑定路由键routing key
来和默认交换机绑定。比如当你定义了一个名为
search_index_online
的消息队列时,队列会使用search_index_online
自动绑定到默认交换机,消息发布者可以使用默认的交换机附带search_index_online
的路由键来发送消息到消息队列,看起来没有定义任何交换机,像是直接将消息传递给了队列。
Direct exchange/直转交换机/直连交换机
直转交换机基于路由键
routing key
给消息队列转发消息,直转交换机可用于消息单播(尽管实际上可以用于多播消息)直转交换机模式如下
- 消息队列通过路由键K与交换机绑定
- 当交换机收到一个需要转发到路由键等于R的消息队列时,如果K=R,则交换机将消息发送给消息队列
直转交换机通常用于任务分发,但在使用时必须注意,在
AMQP 0-9-1
协议中消费者之间才有负载均衡,而不是消息队列之间。直转路由过程如下
Fanout Exchange/扇面交换机
扇面交换机是一种多播交换机,会将消息路由到所有绑定到该交换机上的消息队列,并且会忽略路由键,如果有N个消息队列绑定到该交换机上,则交换机会将消息副本传递给每一个绑定到上面的消息队列,扇面交换机的应用场景
- 大量多用户在线游戏,可以将消息广播给所有人比如流行游戏吃鸡,全员广播时很切合此类业务需求。
- 体育新闻类场景,可以利用扇面交换机将最新分数以及信息广播到所有手机端
扇面交换机图示如下
Topic Exchange / 主题交换机
主题交换机可以将消息路由到一个或者多个与路由键规则匹配的消息队列,具体匹配规则以后详述类似于数据库模糊匹配规则
Header Exchange / 头部交换机
头部交换机会根据 消息指定的头部信息将消息路由到不同的消息队列,头部交换机会忽略路由键,取而代之的是使用头部信息来判断路由地址
消息可以指定一组或者多组头部信息用于头部交换机匹配消息队列,我们可以使用
x-match
头来指定满足任意一组还是必须所有满足才能将消息传递给消息队列x
,可以选any/all
需要注意的是在交换机中以
x-
开头的头部将不会参与解析。
Queue / 队列
队列用于储存消息,和其他消息系统和任务系统大致相同,队列和交换机共享一些属性,也定了一些额外的属性
- Name
- Durable 当一个代理重启之后是否会恢复
- Exclusive 独占,只维持一个链接,当链接关闭后,该队列是否会被删除
- Auto-delete 至少有一个订阅者,当最后一个订阅者取销订阅之后,是否会被删除
- Arguments 额外属性,一般用于消息代理
要使用一个队列必须声明队列,在声明队列时如果队列已经存在并且声明的队列和已存在队列的属性不一致时会抛出一个通道级别的异常
PRECONDITION_FAILED
不满足前提条件
Queue Name
应用程序可以自己指定一个队列名称,也可以让消息代理生成一个名称,队列名称最多为255位的
UTF-8
字符集,AMQP 0-9-1
代理可以自动生成队列名称,要使用这一特性只需传入一个空的名称,代理会返回创建的队列名称。以
amq.
开头的队列名称是消息代理内部使用的,当应用创建这类名称的队列时,会抛出ACCESS_REFUSED
异常
Queue Durability / 队列持续性
持续性的队列将会被持久化的磁盘中,当消息代理重启之后,被持久化的队列将会被重新声明,但只有持久化的消息才会被恢复队列持久化和消息持久化是彼此独立的。
Bindings / 绑定
绑定是交换机将消息路由到队列所使用的规则
Consumers / 消费者
在队列中保存消息是无意义,只有当消息能够被应用消费时才有意义在
AMQP 0-9-1
协议模型中消费者有两种方式获得队列中的消息
- 将消息传递给他们
push API
- 获取需要的消息
pull API
在
push api
模式下,消费者必须标明对某种类型的消息有兴趣,此种情况也叫注册一个消费者或者订阅一个主题
当然可能完全可能存在不止一个消费者订阅了某一个消息队列,每一个消费者都有一个标识符叫consumer tag
消费标签,消费标签是一串字符串,可以用来退订消息队列。
Message Acknowledgements / 消息确认
当消费者接收和处理一个消息时有可能出现一些意外情况,比如消息无法处理,或者消费应用程序由于某些原因崩溃亦有甚者由于网络原因导致接收到的消息产生问题。这种情景下就产生了一个问题!?
AMQP
代理该什么时候从队列里将这些消息删除呢?AMQP 0-9-1
规范推荐两种选择
- 在代理将消息发送给消费者之后(不管是使用
basic.deliver
还是使用basic.get-ok
这两个AMQP
方法)- 在消费者返回一个确认标记后(使用
basic.ack
-AMQP
方法)前一种称之为
自动确认模型
,后一种称为显示确认模型
在显示确认模型
的情况下,消费者应用可以控制何时发送确认标记,可以是在收到消息后,也可以是在消息处理完成或者被保存之后,或者对消息的处理全部完成之后。如果一个消费者由于宕机或者崩溃没有返回确认标记,那么消息队列将重发该消息到另外的消费者应用,如果没有消费者应用程序订阅到该消息队列,那么消息队列将会等到有消费者订阅之后,再将消息重新传递给消费者。
AMQP 0-9-1 方法
AMQP 0-9-1被构造成一系列的方法。方法即操作(像HTTP方法),而且与面向对象开发语言的方法不相同。AMQP方法被分组为类。类仅仅是AMQP方法的逻辑分组。AMQP 0-9-1 reference提供了所有关于AMQP方法的完整介绍。
让我们看一下exchange类,一系列有关exchange操作的方法。它包含了一下的操作。
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
(请注意,RabbitMQ网站参考资料也包含了对exchange类特属于RabbitMQ的扩展,这些我们不在这篇文章中讨论)
以上的操作行成了逻辑队:exchange.declare和exchange.declare-ok,exchange.delete和exchange.delete-ok。这些操作是“请求”(被客户端发送)和“响应”(brokers在响应前面提到的“请求”时发送)。
例如,客户端请求broker使用exchange.declare方法申明一个新的exchange:
如上图所示,exchange.declare携带了几个参数。它们使客户端能够指定exchange名称,类型,持久性等待。
如果操作成功,broker使用exchange.declare-ok方法响应。
除了channel号码exchange.declare-ok不携带任何参数(稍后将在这篇文章后面描述channel)
事件的顺序与另外一组AMQP queue类中的方法很相似:queue.declare和queue.declare-ok:
并不是所有的AMQP方法都有与之相对应的方法。一些(basic.publish是使用最广发的一个)没有相对于的“响应”方法和其他一些(例如basic.get)有多个可能的“响应”。
Connections
AMQP连接通常是长期存活的。AMQP是一个应用级协议,它使用TCP保持稳定传输。AMQP连接使用身份认证并且可以使用TLS(SSL)来保护连接。当应用程序不再需要连接到AMQP broker时,它应该优雅的关闭AMQP连接而不是突然关闭底层的TCP连接。
Channels
一些应用程序需要多余AMQP broker进行多连接。然而在同一时间保持多个TCP连接是不可取的,因为这么做消耗系统资源并且让配置防火墙变得更加困难。AMQP 0-9-1连接是与channel多路复用的,channel被认为是“共享单个TCP连接的轻量级连接”。
对于使用多线程或者多进程进行处理的应用程序,在每一个线程或者进程中打开一个新的channel而不是彼此间共享一个channel是非常常见的。
一个特定channel上的通信与另一个channel上的通信是完全分开的,所以每个AMQP方法也包含channel号码,客户端用该channel号码来计算方法用于哪个通道(比如哪个事件处理程序需要被激活)
Rejecting Messages / 拒绝消息
消费者在处理消息时可能成功也可能失败,当消费者处理消息失败时,消费者可以通过拒绝一个消息来表明消息处理失败,此种情况下,消费者应用可以指示消息代理丢弃或者重新入队消息,当只有一个消息队列或者只有一个消费者时需要防止 消息的无限循环(拒绝-重入队列-拒绝-重入队列....)
Negative Acknowledgements / 消极确认
在
AMQP 0-9-1
模型下basic.reject
只能拒绝单条消息,如果使用RabbitMQ的话,RabbitMQ提供了一个扩展叫消极确认
也叫nacks
,要拒绝多条消息可以设置multiple = true
详情点我
Prefetching Message / 预取消息
当多个消费者订阅一个消息队列时,如果能在下次发送确认标记之前指定每一个消费者可以接收多少条消息,这将非常有用,因为者可以用于实现简单的负载均衡或在消息生产者批量发布消息时提高吞吐量,要注意的是RabbitMQ仅支持管道级别的预取计数
prefetch-count
而非基于链接connection
或者大小size
的预取
Message Attributes and Payload / 消息属性和负载
在
AMQP
模型下消息是有属性的,有一些属性是AMQP 0-9-1
规范定义的,下面是一些例子
- Content type 文档类型
- Content encoding 文档编码
- Routing key 路由键
- Delivery mode (persistent or not) 交付模式 - 持久化/不持久化
- Message priority 消息优先级
- Message publishing timestamp 消息发布时间
- Expiration period 过期周期
- Publisher application id 发布应用id
除此之外消息还有一个负载
payload(消息携带的数据)
,该负载对AMQP
协议是不透明的,对消息来讲负载可有可无,在负载区应该存放序列化数据如JSON、Thrift、Protocol Buffers and MessagePack
使用的文档类型和编码应该在content-type
和content-encoding
中指定消息的持久化需要消息自己指定持久化模型,当消息需要持久化时,消息将会被保存到磁盘中,当消息代理重启后,这些消息将会被恢复,但是消息持久化会影响性能。
Virtual Hosts / 虚拟主机
为了能使单个消息代理能够承载多套独立的环境(用户组,交换机组,消息队列等都成套独立)所以
AMQP
有了虚拟主机的概念,就像多个web应用使用虚拟主机一样,彼此之间完全独立,每套虚拟主机里面都包含其exchange,queue,consumer,publisher
等。
AMQP是可扩展的
AMQP 0-9-1具有几个扩展点:
- 自定义交换类型使开发人员可以实现路由方案,这些路由方案提供的现成交换类型不能很好地覆盖,例如 geodata-based 路由。
- 交换和队列的声明可以包含代理可以使用的其他属性。例如,RabbitMQ中的按队列消息TTL是通过这种方式实现的。
- 特定于代理的协议扩展。例如,参见 RabbitMQ实现的扩展。
- 可以引入新的AMQP 0-9-1方法类。
- 可以使用其他插件扩展代理,例如,RabbitMQ管理 前端和HTTP API被实现为插件。
这些功能使AMQP 0-9-1模型更加灵活,可适用于非常广泛的问题。
参考
https://www.rabbitmq.com/tutorials/amqp-concepts.html
https://www.cnblogs.com/xiaochengzi/p/6895126.html
https://segmentfault.com/a/1190000019218541