消息重复消费解决方案

前言

在实际生产环境下,消息被消费者重复消费是经常遇到问题之一。如何避免消息被重复消费,是消息队列领域的基本问题,也是在设计系统架构时必须考虑的问题。

RabbitMQ、RocketMQ、Kafka,各种消息中间件都有可能出现消息重复消费的问题,需要开发者来保证消息不被重复消费。

怎么保证消息不被重复消费?(以Kafka为例)

生产者在向Kafka写数据时,每条消息会有一个offset,表示消息写入顺序的序号。当消费者消费后,每隔一段时间会把自己已消费消息的offset通过Zookeeper提交给Kafka,告知Kafka自己offset的位置。这样一来,如果消费者重启,则会从Kafka记录的offset之后的数据开始消费,从而避免重复消费。

但是,可能出现一种意外情况。由于消费者提交offset是定期的,当消费者处理了某些消息,但还未来及提交offset时,此时如果重启消费者,则会出现消息的重复消费

例:数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,假设分配的 offset 依次是 152/153/154。消费者从 Kafka 消费时,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 Zookeeper,此时消费者进程被重启了。那么此时消费过的数据1和数据2的 offset 并没有提交,Kafka 也就不知道你已经消费了 offset=153 这条数据。此时当消费者重启后,消费者会找 Kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据1和数据2会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

如上图,可能出现数据1和数据2插入数据库两遍的问题。

其实重复消费消息并不可怕,重要的是在发生重复消费后,如何保证消息消费时的幂等性。如果消费者可以在消费消息时先判断一下,自己是否已经消费了该消息,如果是就不消费,那么就可以保证系统的幂等性。

一条数据被消费者重复消费两次,但数据库中只有一条数据,这就保证了系统幂等性。

简单来说,保证系统幂等性就是确保消息重复发送后数据库中数据的正确性

那么,如何保证消息队列的幂等性?

  1. 向数据库insert数据时,先根据主键查询,若数据存在则不insert,改为update
  2. 向Redis中写数据可以用set去重,天然保证幂等性
  3. 生产者发送每条消息时,增加一个全局唯一id(类似订单id),消费者消费到时,先根据这个id去Redis中查询是否消费过该消息。如果没有消费过,就处理,将id写入Redis;如果消费过了,那么就不处理,保证不重复处理相同消息。
  4. 基于数据库的唯一键约束来保证不会插入重复的数据,当消费者企图插入重复数据到数据库时,会报错。

总结

  • Kafka采取类似断点续传的策略保证消息不被重复消费。具体是通过每隔一段时间把已消费消息的offset通过Zookeeper提交给Kafka实现的。
  • 但是当消费者处理完成但尚未提交offset的时间段宕机或重启等意外情况发生时,还是可能出现消息被重复消费。
  • 保证消息不被重复消费(保证消息消费时的幂等性)其实是保证数据库中数据的正确性。几种保证系统幂等性的思路:通过主键查询,若存在则update;Redis天然set去重;根据全局id查询,若已消费则不处理;唯一键约束保证不插入重复数据等。

注:本系列文章是来自中华石杉老师课程的整理和总结。






欢迎关注微信公众号,一起交流技术↓

{mywechat}

0%