带你认识消息队列之RabbitMQ

一只会飞的鱼儿 1年前 ⋅ 1278 阅读
ad

一、RabbitMQ 基本概念

AMQP是一种使用广泛的独立于语言的消息协议,它的全称是Advanced Message Queuing Protocol,即高级消息队列协议,它定义了一种二进制格式的消息流,任何编程语言都可以实现该协议。实际上,Artemis也支持AMQP,但实际应用最广泛的AMQP服务器是使用Erlang编写的RabbitMQ

AMQP模型如下:

 

内部流程图如下:

 
  1. Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
     
  2. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
     
  3. Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
     
  4. Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
     
  5. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
     
  6. Connection
    网络连接,比如一个TCP连接。
     
  7. Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
     
  8. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
     
  9. Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
     
  10. Broker
    表示消息队列服务器实体。

 

二、常见交换机

  • Default Exchange
      默认交换机。实则为名称为空的直连交换机。特点是每个队列都会绑定到默认交换机上,且路由键与队列名称相同。例如:创建名称为“simple-queue”的队列,AMQP代理会自动将该队列绑定到默认交换机上,绑定的路由键也为“simple-queue”。
  • Direct Exchange
      直流交换机。消息携带路由键(RoutingKey)经过该类型交换机时会绑定到规则匹配的队列中。

  • Fanout Exchange
      扇形交换机。交换机直接与队列绑定,及时绑定路由键也不起作用。当生产者发送消息后,消息经过该类型交换机直接绑定到与该交换机绑定的所有队列上。

  • Topic Exchange
      主题交换机。实则为直连交换机和扇形交换机相结合,即模糊匹配。其中规则为星号代表必须为一个单词,井号为零个或多个单词。例如:队列1的绑定键为A.*,队列2的绑定键为B.#,如果消息绑定的路由键为A.C,则队列1会收到;如果消息绑定的路由键为A.C.D,则队列2会收到。

  • Header Exchange
      头交换机。该类型交换机不依赖于路由键,而是由消息内容中的header属性进行匹配。

 

三、消息确认

     消费者在消费消息往往会因为网络或其他原因导致异常,因此需要和队列建立确认机制才能表明该条消息已经成功消费。因此在AMQP协议中给出两种建议:
  (1)自动确认模式:即消息发送给消费者后立即从队列中删除;
  (2)手动确认模式:即消费者者将消息者处理后给队列发送确认回执(ACK机制,Acknowledgement),再根据情况删除消息。

当多个消费者消费同一个队列的时候,MQ会采用轮询发送消息的机制,这个其实比较简单,就不展开说明了。具体参见官网就可以,这里主要总结一下消息确认机制。

通常情况下消费者处理一个消息至少需要耗费几秒,我们可能会好奇,如果一个消费者处理耗时较长,或者在处理消息的中途挂掉了将会发生什么。如果任何一个消费者出现处理消息耗时过长,或者在处理消息的中途挂掉则消息会被删除,同时RabbitMQ也会删除发送给该消费者的消息。

但是RabbitMQ不想让任何一个消息丢失,如果处理这条消息的消费者在毫无征兆的情况下挂掉了,RabbitMQ应该将这条没有正常处理的消息发送给其他消费者处理。为了确保消息不丢失,RabbitMQ支持消息确认机制——这种确认消息是由消费者发送给RabbitMQ,告知RabbitMQ该消息已经被消费,RabbitMQ可以将这个消息删除了。

如果消费者没有发送确认的消息,则RabbitMQ会认为这个消息没有正确消费,会将这个消息重新入队。然后将这个消息转发给其他消费者。

这里还是说一下RabbitMQ中的autoAck参数,官网中有这样一段话

 

 

这段话的意思就是,手动确认消息机制是默认开启的,如果autoAck(channel.basicConsume的第二个参数)设置为false,则手动确认消息机制将会关闭。当时看到这里有一丝疑惑,难道还分自动确认和手动确认?为什么得到消费者的回执需要关闭自动确认,直接开启自动确认不是更加省事么?后来我看到了下面一段话才恍然大悟:

 

 

RabbitMQ有两种消息确认机制,客户端告知RabbitMQ一条确认的消息被消费了这种即为手动确认机制,如果RabbitMQ将消息从队列中发出后立即给予确认,这种即为自动确认机制,RabbitMQ默认开启了第二种确认机制。

为了演示消息分发的机制,我们需要将autoAck属性置为false,关闭自动确认机制


关于Webfunny

Webfunny专注于前端监控系统,前端埋点系统的研发。 致力于帮助开发者快速定位问题,帮助企业用数据驱动业务,实现业务数据的快速增长。支持H5/Web/PC前端、微信小程序、支付宝小程序、UniApp和Taro等跨平台框架。实时监控前端网页、前端数据分析、错误统计分析监控和BUG预警,第一时间报警,快速修复BUG!支持私有化部署,Docker容器化部署,可支持千万级PV的日活量!

  点赞 0   收藏 1
  • 一只会飞的鱼儿
    共发布43篇文章 获得8个收藏
全部评论: 0