关于RabbitMQ,看这篇文章就够了

RabbitMQ学习笔记

一:消息中间件

  • 什么是消息中间件:
    消息队列中间件(简称MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
    一般分为传递模式:点对点(P2P)和发布/订阅(Pub/sub)模式。
    目前主流的消息中间件:RabbitMQ、Kafka、RocketMQ、ActiveMQ。它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的中间件通信。消息中间件提供了有保证的消息发送,应用程序开发任务无需了解远程过程调用(RPC)和网络通信协议的细节。
  • 消息中间件作用:
    • 异步:
      很多时候应用不想也没有必要立即同步处理一些消息,可以将消息放入消息中间件中,由另外的线程或应用去处理。
    • 解耦:两个应用系统之间如果有关联,一般使用接口进行交互。这样两套应用就会有耦合,其中一个应用出现问题,有可能会导致另一个关联应用异常。使用MQ作为中间介质,两个应用彼此之和MQ交互,可以松耦合。
    • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见,如果以能处理这类峰值能力为标准而投入资源,无疑是巨大的浪费。使用MQ能够使关键组件支撑突发访问压力,不会因为突发的超负荷而完全崩溃。

二:RabbitMQ简介

RabbitMQ是采用Erlang语言基于AMQP协议的消息中间件。那么RabbitMQ的模型架构是什么?AMQP又是什么?这两者之间又有什么关系?消息从生产者发出到消费者消费这一过程要经历什么?

  • 相关概念:
    • 生产者和消费者:
      Producer:生产者,就是投递消息的一方。
      生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2部分:消息体和标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由建。生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者(Consumer)。
      Consumer:消费者,就是接收消息的一方。
      消费者连接到RabbitMQ服务器,并订阅到队列上啊。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然也不需要知道。
      Broker:消息中间件的服务节点。
      对于RabbitMQ来说,一个RabbitMQ Broker可以简单的看做一个RabbitMQ服务节点或者实例。
    • 队列:
      Queue:队列,是RabbitMQ的内部对象,用于存储消息。消费者可以从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息。RabbitMQ不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理起来异常复杂。
    • 交换器、路由建、绑定:
      Exchange:交换器。我们暂时可以理解成生产者将消息投递到队列中,实际上这个在RabbitMQ中不会发生。真实情况是,生产者将消息发送给Exchange(交换器),由交换器将消息路由到一个或者多个队列中。如果路由不到,会返回给生产者,或者直接丢弃。这里可以将RabbitMQ中的交换器看做一个实体。RabbitMQ中交换器有四种类型,不同的类型有着不同的路由策略,后续的交换器类型(Exchange Types)中会介绍。
      RoutingKey:路由键。生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定的routingKey来决定消息流向哪里。
      Binding:绑定。RabbitMQ中通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列了。
      生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会路由到对应的队列中。在绑定多个队列到同一个交换器的时候,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。而实际上,在某些情形下,RoutingKey和BindingKey可以看做同一个东西,尤其是在direct交换器类型下,Routingkey和BindingKey需要完全匹配才能使用,所以直接都写成routingKey会显得方便很多。但在topic交换器类型下,RoutingKey和BindingKey之间需要做模糊匹配,两者并不是相同的。
    • 交换器类型:
      RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种。
      fanout:
      它会将所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
      direct:
      direct类型的交换器路由规则也很简单,它会把消息路由到那些RoutingKey和BindingKey完全匹配的队列中。
      topic:
      前面说到的direct类型的交换器路路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下并不能完全满足实际业务的需求。topic类型的交换器在匹配规则上进行扩展,它与direct类型的交换器类似,也是讲消息路由到BindingKey和RoutingKey相匹配的队列中,但匹配规则有些不同,会支持一些模糊匹配。可以为已点号“.”分割的字符串,也可以存在两种特殊字符“_”,“#”用于模糊匹配,其中“_”用于匹配一个单词,“#”用于匹配多规格单词(也可以是零个)。
      headers:
      headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器交互性能很差,并且不实用,基本上不会看到它的存在。
    • RabbitMQ运转流程:
      了解了以上的术语之后,我们来回顾下整个消息队列的使用过程。
      生产者发送消息过程:
      1:生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
      2:生产者声明一个交换器,并设置相关属性,比如交换器类型,是否持久化等。
      3:生产者生命一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等。
      4:生产者通过路由键(BindingKey)将交换器和路由绑定起来。
      5:生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等。
      6:相应的交换器根据接收到的路由键查找匹配的队列。
      7:如果找到,则将从生产者发送过来的消息存入相应的队列中。
      8:如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者。
      9:关闭信道。
      10:关闭连接。
      消费者接收消息过程:
      1:消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
      2:消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置一些相应的回调函数,以及做一些准备工作。
      3:等待RabbitMQ Broker回应并投递到队列中的消息,消费者接收消息。
      4:消费者确认(ack)接收到的消息。
      5:RabbitMQ从队列中删除相应已经被确认的消息。
      6:关闭信道。
      7:关闭连接。
      Connection和Channel:
      上面我们发现引入了两个新的概念:Connection和Channel。我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接。这个连接就是一条TCP连接,也就是Connection。可以理解Connection对象就是RabbitMQ对TCP连接的一个抽象。一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
      我们完全可以直接使用Connection完成信道的工作,为什么还要引入信道呢?在实际情况中,一个应用程序中会有很多歌线程需要从RabbitMQ中生产或者消费消息。那么必然需要建立多个Connection,也就是许多个TCP连接。然而对于操作系统而言,建立和销毁TCP连接都是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之呈现。RabbitMQ采用类似NIO的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
      每个线程把持一个信道,所以信道服用了Connection的TCP连接。同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。
  • AMQP协议:
    从前面的内容可以了解到RabbitMQ是遵从AMQP协议的,换句话说,RabbitMQ就是AMQP协议的Erlang的实现。RabbitMQ中的交换器、队列、绑定、路由等都是遵循AMQP协议中的相应的概念。
    AMQP说到底还是一个通信协议,通信协议都会涉及报文交互。AMQP本身是应用层的协议,其填充于TCP协议层的数据部分。AMQP是通过协议命令进行交互的。AMQP协议可以看做一系列结构化命令的集合。这里的命令代表一种操作,类似于HTTP中的方法。RabbitMQ只不过封装了AMQP协议,内部还是根据AMQP协议调用了AMQP的命令。
  • 使用交换器和队列:
    RabbitMQ的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会,如果要衡量RabbitMQ当前的QPS只需看队列即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。
    生产者和消费者都可以尝试创建(生命操作)队列。但是如果业务本身在架构设计之初就已经充分预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好,这样业务代码可以免去声明过程。预先创建还有一个好处就是:可以确保交换器和队列之间正确的绑定匹配:很多时候,由于人为原因、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失。
    另外预估队列的使用情况非常重要,可以合理的安排资源和后期更好的扩展。

代码:

  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RabbitProducer {  

private static final String EXCHANGE_NAME = "exchange_test";
private static final String ROUTING_KEY = "routing_test";
private static final String QUEUE_NAME = "queue_test";
private static final String IP_ADDRESS = "192.168.1.1";
private static final int PORT = 5672;

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root0000");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//type:direct, durable:true, autoDelete:false, arguments:null
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//durable:true, exclusive:false, autoDelete:false, arguments:null
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
channel.close();
connection.close();
}
}

exchangeDeclare方法详解:
exchangeDeclare有多个重载方法,返回值为Exchange.DeclareOK,用来标识成功声明了一个交换器。
各个参数说明:
exchange:交换器名称。
type:交换器类型,如:direct,fanout,topic等
durable:设置是否持久化。durable设置为true表示持久化。反之。持久化可以将交换器存盘,在服务重启的时候不会丢失相关信息。
autoDelete:设置是否自动删除。设置为true表示自动删除。自动删除的前提是至少有一个队列或交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器解绑。注意不能错误地把这个参数理解为:当与此交换器的连接客户端都断开时,自动删除本交换器。
internal:设置是否是内置的。true则表示内置交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的这种方式。
argument:其他一些结构化参数。
queueDeclare方法详解:
不带任何参数的queueDeclare方法默认创建一个由RabbitMQ命名的匿名队列、排他的、自动删除、非持久化的队列。
各个参数说明:
queue:队列名字。
durable:持久化。
exclusive:设置是否排他。如果一个队列被声明为排他队列。改队列仅对首次声明它的连接可见,并在连接断开时自动删除。
autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误的理解为:当连接到此队列的所有客户端断开时,这个队列自动删除。不能把这个参数错误的理解为:当连接到此队列的所有客户端断开时,这个队列自动删除。因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
arguments:设置队列的其他一些参数。
queueBind方法详解:
将队列和交换器绑定的方法。
各个参数说明:
queue:队列名字。
exchange:交换器名称。
routingKey:用来绑定队列和交换器的路由键。
argum:定义绑定的一些参数。
除了queueBind()方法,也可以使用queueUnbind()方法将已绑定的队列和交换器进行解绑。
当然,不仅可以将交换器和队列绑定,也可以将交换器和交换器绑定。使用exchangeBinding()。但这两个方法都不太常用。
channel.basicPublish方法详解:
如果要发送一个消息,可以使用Channel类的basicPublish方法,比如发送内容为“hello world”的消息:

1
2
byte[] body = "Hello wrold".getBytes();  
channel.basicPublish(exchangeName, routingKey, null, body);

为了更好的控制发送,可以使用mandatory这个参数,或者发送一些特定属性的信息:

1
2
3
channel.basicPublish(exchange, routingKey, mandatory, immediate,   
MessageProperties.PERSISTENT_TEXT_PLAIN,
meessageBodyBytes);

例:下面这行代码发送了一条消息,投递模式为2,即消息会被持久化在服务器中。同时消息的优先级为1,content-type为“text/plain”。

1
2
3
4
5
6
7
8
channel.basicPublish(exchange, routingKey,   
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("root")
.build(),
messageBodyBytes);

各个参数说明:
BasicProperties:消息的基本属性值,其包含14个属性成员,分别有:contentType、contentEncoding、headers(Map<String, Object>)、deliverMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。

  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RabbitConsumer {

private static final String QUEUE_NAME = "queue_test";
private static final String IP_ADDRESS = "192.168.1.1";
private static final int PORT = 5672;

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root0000");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//设置客户端最多接收未被ack的消息的个数
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
channel.close();
connection.close();
}
}

上面代码中显式的设置autoAck为false,然后在接收到消息之后进行显式ack操作。对于消费者来说,这个设置可以防止消息不必要的丢失。
basicCosume方法详解:

1
2
3
4
String basicConsume(String queue, boolean autoAck,   
String consumerTag, boolean noLocal,
boolean exclusive, Map<String, Object> arguments,
Consumer callback) throws IOException;

各个参数说明:
queue:队列名称。
autoAck:设置是否自动确认,建议为false。
consumerTag:消费者标签,用来区分多个消费者。
noLocal:设置为true表示不能将同一个Connection中生产者发送的消息传送给这个Connection的消费者。
exclusive:设置是否排他。
arguments:设置消费者的其他参数。
callback:设置消费者的回调函数,用来处理RabbitMQ推送过来的消息。

三:RabbitMQ进阶

  • 消息何去何从:
    以上channel.basicPublish()中有两个参数:mandatory和immediate。他们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。
    • mandatory:
      当mandatory参数为true时,交换器无法根据自身类型和路由找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数为false时,则消息会直接被丢弃。
    • immediate:
      当immediate参数为ture,如果交换器在消息路由到队列时发现并不存在消费者,那么这条消息不会存入队列中,当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回给生产者。不常用。
      RabbitMQ 3.0开始去掉了对immediate参数的支持。对此官方解释是:immediate参数会影响镜像队列的性能,增加代码复杂性。
  • 过期时间(TTL):
    Time To Live,RabbitMQ可以对消息和队列设置TTL。
    目前有两种方法设置消息TTL:第一种是通过队列属性设置,队列中所有的消息都有相同的过期时间。第二种是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过了TTL值,就会变成“死信”。
    通过队列属性设置消息TTL的方法是在channel.queueDeclare方法中加入x-message-ttl参数实现,参数的单位是毫秒。Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);如果不设置TTL,表示此消息不会过期。
    针对每条消息设置TTL的方法是在channel.basicPublish方法中加入expiration的属性参数:AMQP.BasicProperties properties = new AMQP.BasicProperties(); //持久化消息 properties.setDeliveryMode(2); properties.setExpiration("60000"); channel.basicPublish(exchangeName, routingKey, mandatory, properties, messageBodyBytes);
  • 死信队列:
    DLX: Dead-Letter-Exchange,可以称之为死信交换器,也被称为死信邮箱。当消息在一个队列中变成死信之后,它被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列:
    • 消息被拒绝
    • 消息过期
    • 队列大道最大长度
      DLX也是一个正常的交换器,和一般的交换器并没有区别。它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动的将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息进行相应的处理。
      通过在channel.queueDeclare方法中设置 x-dead-letter-exchange 参数来为这个队列添加DLX://创建DLX:dlx_exchange channel.exchangeDeclare("dlx_exchange", "direct"); Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx_exchange"); //为队列添加DLX channel.queueDeclare("myqueue", false, false, false, args);DLX是一个非常有用的特性。它可以处理异常情况下,消息不能被消费者正确消费而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况。
  • 延迟队列:
    延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
    使用场景:
    用户希望通过手机远程遥控家里的智能设备智能在指定的时间工作,这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再讲指令推送到智能设备。
    在AMQP协议中,RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过DLX和TTL模拟出延迟队列的功能。
    实现:
    死信队列的用户同样也是延迟队列的用法。对于死信队列,同样可以看作是延迟队列。假设一个应用中需要将每个消息都设置为10秒的延迟,生产者通过交换器exchange.x将消息发送到指定的队列xQueue中。此时queue.x配置DLX:exchange.y,当消息在queue.x中过期后,直接进入exchange.y,exchange.y将消息路由到死信队列:queue.y。消费者监听queue.y。
  • 优先级队列:
    优先级队列,具有高优先级的队列具有最高的优先权,优先级高的消息具备优先被消费的特权。可以通过设置队列的 x-max-priority参数来实现。Map<String, Object> args = new HashMap<>(); args.put("x-max-priority", 10); channel.queueDeclare("queue.priority", true, false, false, args);上述代码配置了一个最大优先级为10的队列,之后我们在发送消息的时候设置消息当前的优先级:AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange_priority", "routing_priority", properties, messageBodyBytes);上面的代码中设置的消息的优先级为5,默认值最低为0,最高为队列设置的最大优先级10。优先级高的消息可以被优先消费。当然前提是:如果消费者的消费速度大于生产者,这样Broker中没有消息堆积的情况下,设置优先级也就没什么实际意义了。
  • RPC实现:
    RPC: Remote Procedure Call,即远程过程调用。
    一般在RabbitMQ中进行rpc调用很简单,为了接收相应的消息,我们需要在请求消息中发送一个回调队列:String callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties properties = new BasicProperties.Builder.replyTo(callbackQueueName).build(); channel.basicPublish("", "rpc_queue", properties, messageBodyBytes);对于代码中的BasicProperties这个类,上面说过,其包含14个,这里就用到两个属性。
    • replyTo:通常用来设置一个回调队列。
    • correlationId:用来关联请求和其调用RPC之后的回复。
      但是如上面代码一样,为每个RPC请求创建一个回调队列是非常低效的。但是幸运的是这里有一个通用的解决方案-可以为每个客户端创建一个单一的回调队列。
      这样就产生了一个新的问题,对于回调队列而言,在其接收到一条回复的消息之后,它并不知道这条消息应该和哪一个请求匹配。这里就用到了correlationId这个属性。我们应该为每一个请求设置一个唯一的correlationId。
  • 持久化:
    持久化可以提高RabbitMQ的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。针对此,RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
    • 交换器持久化:是通过在声明交换器的时候将durable参数置为true实现的。如果交换器不设置持久化,那么在RabbitMQ重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器了。
    • 队列的持久化:同样,是通过在通过在声明队列时将durable参数设置为true实现。如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列元数据会丢失,自然数据也会跟着丢失。队列的持久化能保证其本身的元数据不会因异常情况而丢失。但是并不能保证内部存储的消息不会丢失。
    • 消息的持久化:要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。
      注意:
      同时设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息持久化,重启之后队列会消失,继而消息也会丢失。将所有的消息都设置为持久化,会严重影响RabbitMQ的性能。写入磁盘的速度比写入内场的速度慢的不知一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要讲消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
      将交换器、队列、消息都设置了持久化之后,就能保证数据百分百不丢失了吗?答案是否定的。
      首先从消费者来说,如果在订阅消息队列时将autoAck参数设置为true,那么当消费者接收到消息之后,还没来得及处理就宕机了,这样也算数据丢失。当然这种情况很好解决,将autoAck参数设置为false,进行手动确认。
      其次,在持久化的消息正确存入RabbitMQ之后,还需要一段时间(虽然很短)才能存入磁盘之中。RabbitMQ并不会为每条消息进行同步存盘处理。有可能刚保存到操作系统缓存之中还没来得及刷到磁盘之中。如果在这段时间RabbitMQ服务节点发生了宕机或重启等异常情况,name这些消息也将会丢失。解决方式是引入RabbitMQ的镜像队列机制,相当于配置了副本。还可以在发送端引入事务机制来保证消息已经正确地发送并存储至RabbitMQ中。下面会说到这些。
  • 生产者确认:
    在使用RabbitMQ的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失。除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确的到达服务器呢?如果不进行特殊的设置,默认情况下发送消息的操作是不会返回任何信息给生产者的。如果消息在到达服务器之前已经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
    RabbitMQ针对这个问题,提供了两种解决方案:
    • 通过事务机制实现:
      RabbitMQ客户端与事务机制相关的方法有三个:channel.txSelect、channel.txCommint和channel.txRollback。channel.txSelect用于将当前的信道设置为事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。try { channel.txSelect(); channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); }事务确实能够解决消息发送发和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才会提交成功,否则便可以在捕获异常之后进行事务回滚,与此同时也可以进行消息重发。但是使用事务会导致RabbitMQ的性能大幅降低。有没有更好的方法既能够保证消息正确送达,又基本上不浪费性能的损失?
    • 通过发送方确认机制实现:
      生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含该消息的唯一ID)。这就使得生产者知晓消息已经正确到达目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号。
      事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同事继续发送下一条消息。
      注意:
      事务机制和publish confirm机制两者是互斥的,不能共存。
      事务机制和publish confirm机制确保的是消息能够正确的发送至RabbitMQ,这里的RabbitMQ指的发送到交换器。如果此交换器没有匹配的队列,那么消息也会丢失。

四:RabbitMQ高阶:

到目前为止,我们了解了RabbitMQ客户端的使用,服务端的管理了。不过还没有从原理层面来进一步分析,了解一些RabbitMQ的实现原理。

  • 存储机制:
    不管是持久化的信息还是非持久化的信息都可以被写入到磁盘。持久化的消息在到达队列时被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。这两种类型的消息的落盘处理都在RabbitMQ“持久层”中完成。
    持久层是一个逻辑概念,实际包含两个部分:队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)。队列索引负责维护队列中落盘消息的信息,包括消息的存储地点、是否已交付给消费者、是否已被消费者ack等。每个队列都有与之对应的一个队列索引。消息存储(rabbit_msg_store)以键值对的形式存储消息,它被所有队列共享,在每个节点有且只有一个。从技术层面上来说,rabbit_msg_store具体还可以分为msg_store_persistent和msg_store_transient。persistent负责持久化消息的持久化,重启后消息不会丢失,transient负责非持久化消息的持久化,重启后消息会丢失。
    消息(包括消息体、属性和headers)可以直接存储在rabbit_queue_index中,也可以被保存在rabbit_msg_store中。默认在$RABBITMQ_HOME/var/lib/mnesiz/rabbit@$HOSTNAME/路径下包含queues、msg_store_persistent、msg_store_transient这三个文件夹。
    最佳的配备是较小的消息存储在rabbit_queue_index中,而较大的消息存储在rabbit_msg_store中。这个消息大小的界定可以通过queue_index_msgs_below来配置,默认大小为4096B。当一个消息小于设定的大小阈值时就可以存储在rabbit_queue_index中,这样可以得到性能上的优化。rabbit_queue_index中以顺序(文件名以0开始累加)的段文件来进行存储,后缀为”.idx”,每个段文件中包含固定的SEGMENT_ENTRY_COUNT条记录,默认值为16384。
    经过rabbit_msg_store处理的消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制(file_size_limit)后,关闭这个文件再创创建一个新的文件以共新的消息写入。文件名(后缀是“.rdq”)从0开始累加,因此文件名最小的文件就是最老的文件。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storge)表中记录消息在文件中的位置映射(index)和文件的相关信息(fileSummary)。
    在读取消息的时候,先根据消息ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁定,则直接打开文件,从指定位置读取消息的内容,如果文件不存在或者被锁住了,则发送请求有rabbit_msg_store进行处理。
    消息的删除只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操作时,并不立即对在文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件夹中的有效数据可以合并到一个文件中,并且所有的垃圾数据的大小和所有文件(至少有三个文件存在的情况下)的数据大小的比值超过设置的阈值GARBAGE_FRACTION(默认值为0.5)时才会触发垃圾回收将两个文件合并。
    执行合并的两个文件一定是逻辑上相邻的两个文件。执行合并时首先锁定这两个文件,并先对前面文件中的有效数据进行整理,再将后面文件的有效数据写入到前面的文件,同时更新消息在ETS表中的记录,最后删除后面的文件。

    • 队列的结构:
      通常队列由rabbit_amqqueue_process和backing_queue这两部分组成。rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
      如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断的流动,消息的状态会不断发生变化。RabbitMQ中的队列消息可能会处于一下四种状态:
      • alpha:消息内容(包括消息体、属性和headers)和消息索引都存储在内存中。
      • beta:消息内容保存在磁盘中,消息索引保存着内存中。
      • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有。
      • delta:消息内容和索引都在磁盘中。
        对于持久化的消息,内容和索引都必须先保存在磁盘上,才会处于上述状态中的一种。而gamma状态的消息是只有持久化的消息才有的状态。
        RabbitMQ在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息可能会转移到beta、gamma或delta状态。区分这四种状态的主要作用是满足不同的内存和CPU需求。alpha状态最耗内存,但很少消耗CPU。delta状态基本不消耗内存,但是需要消耗更多的CPU和I/O操作。delta状态需要执行两次I/O操作才能读取到消息,一次是读消息索引,一次是读消息内容。beta和gamma状态都只需要一次I/O操作就可以读取到消息。
    • 惰性队列:
      RabbitMQ从3.6.0开始引入惰性队列(Lazy Queue)的概念。惰性队列会尽可能地将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目的是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机,或者由于维护而关闭等)致使长时间不能消费消息而造成堆积时,惰性队列就很有必要了。
      默认情况下,当生产者发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同事也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收到新的消息。
      惰性队列会将收到的消息直接存入文件系统中个,而不管是持久化或者是非持久化的,这样可以减少内存的消耗,但会增加I/O的使用,如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化的消息可谓是“最佳拍档”,
      队列具备两种模式:default和lazy。默认是default模式。lazy模式即为惰性队列模式,可以通过调用channel.queueDeclare方法的时候在参数中确认。Map<String, Object> args = new HashMap<>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare(queueName, false, false, false, args);惰性队列和普通队列相比,只有很小的内存开销。
      根据官方测试数据显示,对于普通队列,如果要发送1千万条消息,需要耗费801秒,平均发送速度约为13000条/秒。如果使用惰性队列,那么发送同样多的消息,耗时是421秒,平均速度为24000条/秒。出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。
  • 镜像队列:
    解决Broker单点问题:引入镜像队列。
    镜像队列机制,可以将队列镜像到集群中的其他Broker节点之上。如果集群中的一个节点失效了,队列能自动的切换到景象中的两一个节点上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列都包含一个主节点(master)和若干个从节点(slave)。slave会准确的按照master执行命令的顺序进行动作,故slave和master上维护的状态应该是相同的。如果master由于某种原因失败,那么资历最老(根据slave加入的瞬间排序)的slave会被提升为master。发送到镜像队列的所有消息会被同时发往master和所有的slave上,如果此时master挂了,消息还会在slave上。这样slave提升为master的时候消息也不会丢失。
    除发送消息(Basic.Publish)外的所有动作都只会向master发送,然后再由master将命令执行的结果广播给各个slave。

快掏出你的大手机扫我

快掏出你的大手机扫我