RocketMQ

RocketMQ

RocketMq介绍

RocketMq消息模型

image

RocketMq部署模型

image

RocketMq基础概念

  • Producer(生产者) :负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMq 提供多种发送方式:同步发送、异步发送、顺序发送、单向发送。

  • Consumer(消费者) :负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费,推动式消费。

  • Topic(消息主题) :表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMq 进行消息订阅的基本单位。

  • Broker(代理服务器) :用于存储消息 Topic,是实际部署过程对应的代理服务器。Broker 主要负责消息的存储、投递和查询以及服务高可用保证。

    • 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
    • Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
    • Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
  • MessageQueue(消息队列) :为了消息写入能力的水平扩展,RocketMq 对 Topic 进行了分区,这种操作被称为队列。

  • ConsumerGroup(消费组) :为了消息消费能力的水平扩展,RocketMq 对 Consumer 进行了分组,这种操作被称为消费组。相同的 ConsumerGroup 下的消费者主要有两种负载均衡模式,即广播模式,集群模式(图中是最常用的集群模式)。

    • 集群模式:同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
    • 广播模式:同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
  • NameServer(命名服务器) :NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。NameServer 通常会有多个实例部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,客户端仍然可以向其它 NameServer 获取路由信息。主要包括两个功能:

    • Broker管理:NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
    • 路由信息管理:每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

Producer(生产者)

Message 可以设置的属性值包括:

字段名 默认值 必要性 说明
Topic null 必填 消息所属 topic 的名称
Body null 必填 消息体
Tags null 选填 消息标签,方便服务器过滤使用。目前只支持每个消息设置一个
Keys null 选填 代表这条消息的业务关键词
Flag 0 选填 完全由应用来设置,RocketMQ 不做干预
DelayTimeLevel 0 选填 消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
WaitStoreMsgOK true 选填 表示消息是否在服务器落盘后才返回应答

发送消息模式

  • 同步发送(Sync)

    同步发送是指消息发送方发出数据后,会阻塞直到收到服务端响应。这种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

  • 异步发送(Async)

    异步发送是指发送方发出数据后,不等接收方发回响应,接着进行下一次发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

  • 单向发送(Oneway)

    单向发送是指发送方只负责发送消息,不等待服务器响应且没有回调函数触发,即只发送请求不等待应答。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

Consumer(消费者)

消费模型

  • 集群消费(Clustering)

    消费者组(Consumer Group)内的实例分摊消费消息。如果一个 Consumer Group 有 3 个实例,Topic 有 6 个队列,那么每个实例消费 2 个队列。这是默认的消费模式,适用于大部分业务场景。

  • 广播消费(Broadcasting)

    消费者组内的每个实例都消费 Topic 的全量消息。适用于通知类消息,例如配置更新、缓存刷新等。

消费方式

  • 推模式(Push)

    Consumer 端注册 Listener,Broker 收到消息后立即主动推送给 Consumer。实际实现中,Push 模式通常是基于长轮询(Long Polling)机制实现的,即 Consumer 发送 Pull 请求,如果 Broker 没有新消息,会持有请求一段时间,待有新消息或超时后再返回。

  • 拉模式(Pull)

    Consumer 端主动发起请求到 Broker 拉取消息。Consumer 需要自己维护 Offset(消费进度),并根据业务逻辑控制拉取频率。

核心功能

顺序消息

RocketMQ 支持局部顺序消息,即保证同一个 MessageQueue 里的消息严格按照 FIFO 顺序进行消费。

  • 全局顺序:一个 Topic 只有一个 MessageQueue,吞吐量受限。
  • 分区顺序:一个 Topic 有多个 MessageQueue,通过 Sharding Key(如 OrderId)将同一业务逻辑的消息发送到同一个 MessageQueue。

延时消息

RocketMQ 不支持任意时间的延时,而是支持特定等级的延时消息。
默认支持 18 个等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h​。
发送消息时设置 delayTimeLevel 即可。

批量消息

Producer 可以将多条消息合并成一个批量消息进行发送,减少网络 IO 开销,提高吞吐量。批量消息的大小不能超过 4MB(默认)。

事务消息

RocketMQ 提供了分布式事务消息支持,采用 2PC(两阶段提交)+ 补偿机制。

  1. 发送半消息(Half Message) :Producer 发送消息到 Broker,但消息对 Consumer 不可见。
  2. 执行本地事务:Producer 执行本地业务逻辑。
  3. 提交/回滚:根据本地事务结果,Producer 向 Broker 发送 Commit 或 Rollback 指令。
  4. 回查机制:如果 Broker 长时间未收到 Commit/Rollback,会主动向 Producer 发起回查,检查本地事务状态。

消息过滤

  • Tag 过滤:Consumer 订阅时指定 Tag,Broker 端进行过滤。效率高,但仅支持字符串匹配。
  • SQL92 过滤:支持简单的 SQL 语法(如 a > 5 AND b = 'abc'),需要 Broker 开启支持。

消息存储

RocketMQ 采用文件系统进行消息存储,主要包含三个核心文件:

  • CommitLog

    消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容。消息内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,左边补零,剩余为起始偏移量。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

  • ConsumeQueue

    消息消费队列,引入目的是为了提高消息消费的性能。CommitLog 是顺序写入的,但消费时是随机读取的(根据 Topic 和 Queue)。ConsumeQueue 存储了消息在 CommitLog 中的物理偏移量(Offset)、消息大小和 Tag HashCode。每个 Topic 下的每个 MessageQueue 都有一个对应的 ConsumeQueue 文件。

  • IndexFile

    索引文件,提供了一种通过 key 或时间区间来查询消息的方法。

刷盘机制

  • 同步刷盘(Sync Flush) :消息写入内存后,立刻调用刷盘线程进行刷盘,刷盘成功后才返回响应。数据安全性高,吞吐量低。
  • 异步刷盘(Async Flush) :消息写入内存后,立刻返回响应,由后台线程异步刷盘。吞吐量高,但机器断电可能丢失数据。

高可用性

主从复制

  • 同步复制:Master 和 Slave 都写入成功才返回成功。数据不丢失,可用性高。
  • 异步复制:Master 写入成功即返回成功,Slave 异步同步。延迟低,Master 宕机可能丢失少量数据。

负载均衡

  • Producer 负载均衡:默认轮询发送到不同的 MessageQueue。

  • Consumer 负载均衡

    • 平均分配策略(AllocateMessageQueueAveragely)
    • 环形分配策略(AllocateMessageQueueAveragelyByCircle)
    • 一致性哈希策略(AllocateMessageQueueConsistentHash)
    • 机房优先策略(AllocateMachineRoomNearby)

消息重试与死信

消息重试

Consumer 消费失败后,RocketMQ 会进行重试。

  • 顺序消息:无限重试,直到成功(会阻塞后续消息)。
  • 无序消息:默认重试 16 次,重试间隔逐步增加(10s, 30s, 1m…)。

死信队列(DLQ)

如果消息重试 16 次后仍然失败,消息会被投递到死信队列(Dead Letter Queue)。死信队列的 Topic 格式为 %DLQ%ConsumerGroupName。需人工干预处理。