前面一篇文章松哥和大家聊了 MQ 高可用之如何确保消息成功发送,高可功消各种配置齐上阵,何确最终确保了消息的保消成功发送,甚至在一些极端情况下还可能发生同一条消息重复发送的息成情况,不管怎么样,高可功消消息总算发送出去了,何确如果小伙伴们还没看过上篇文章,保消建议先看看,息成再来学习本文: 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?高可功消 今天我们就来聊一聊消息消费的问题,看看如何确保消息消费成功,何确并且确保幂等性。保消 RabbitMQ 的息成消息消费,整体上来说有两种不同的高可功消思路: 两种方式我都举个例子看下。 先来看推(push): 这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下: 当监听的队列中有消息时,就会触发该方法。 再来看拉(pull): 调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。 这是消息两种不同的消费模式。 如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是服务器租用单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。 在上篇文章中,我们想尽办法确保消息能够发送成功,对于消息消费成功,其实官方提供了相关的机制,我们一起来看下。 为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的源码库消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。 我们来看一张图: 如上图所示,在 RabbitMQ 的 web 管理页面: 这是我们可以从 UI 层面观察消息的消费情况确认情况。 当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分: 换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。 综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。 当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式: 消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步: 调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。 需要注意的是,basicReject 方法一次只能拒绝一条消息。 消息确认分为自动确认和手动确认,我们分别来看。 先来看看自动确认,在 Spring Boot 中,默认情况下,消息消费就是自动确认的。 我们来看如下一个消息消费方法: 通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。 手动确认我又把它分为两种:推模式手动确认与拉模式手动确认。 4.2.1 推模式手动确认 要开启手动确认,需要我们首先关闭自动确认,关闭方式如下: 这个配置表示将消息的确认模式改为手动确认。 接下来我们来看下消费者中的代码: 将消费者要做的事情放到一个 try..catch 代码块中。 如果消息正常消费成功,则执行 basicAck 完成确认。 如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。 这里涉及到两个方法: 当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题,这个松哥以后再专门写文章和大家细聊。 4.2.2 拉模式手动确认 拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下: 这里涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述。 最后我们再来说说消息的幂等性问题。 大家设想下面一个场景: 消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次(参见四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?)。种种原因导致我们在消费消息时,一定要处理好幂等性问题。 幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。 采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下: 如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。 极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。 当然这只是一个简单思路供大家参考。 松哥在 vhr 项目中也处理了消息幂等性问题,感兴趣的小伙伴可以查看 vhr 源码(https://github.com/lenve/vhr),代码在 mailserver 中。 好啦,今天就和小伙伴们聊了下 RabbitMQ 中和消息消费相关的几个话题,感兴趣的小伙伴可以实践下哦~ 本文转载自微信公众号「江南一点雨」,可以通过以下二维码关注。转载本文请联系江南一点雨公众号。1. 两种消费思路 2. 确保消费成功两种思路 3. 消息拒绝 4. 消息确认 4.1 自动确认 4.2 手动确认 5. 幂等性问题 6. 小结
1. 两种消费思路
2. 确保消费成功两种思路
3. 消息拒绝
4. 消息确认
4.1 自动确认
4.2 手动确认
5. 幂等性问题
6. 小结