RocketMQ
RocketMQ
RocketMq介绍
RocketMq消息模型

RocketMq部署模型

RocketMq基础概念
Producer(生产者) :负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMq 提供多种发送方式:同步发送、异步发送、顺序发送、单向发送。
Consumer(消费者) :负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费,推动式消费。
Topic(消息主题) :表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMq 进行消息订阅的基本单位。
Broker(代理服务器) :用于存储消息 Topic,是实际部署过程对应的代理服务器。Broker 主要负责消息的存储、投递和查询以及服务高可用保证。
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
- Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
MessageQueue(消息队列) :为了
消息写入能力的水平扩展,RocketMq 对 Topic 进行了分区,这种操作被称为队列。ConsumerGroup(消费组) :为了
消息消费能力的水平扩展,RocketMq 对 Consumer 进行了分组,这种操作被称为消费组。相同的 ConsumerGroup 下的消费者主要有两种负载均衡模式,即广播模式,集群模式(图中是最常用的集群模式)。- 集群模式:同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
- 广播模式:同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
NameServer(命名服务器) :NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。NameServer 通常会有多个实例部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,客户端仍然可以向其它 NameServer 获取路由信息。主要包括两个功能:
- Broker管理:NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
- 路由信息管理:每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。
Producer(生产者)
Message 可以设置的属性值包括:
| 字段名 | 默认值 | 必要性 | 说明 |
|---|---|---|---|
| Topic | null | 必填 | 消息所属 topic 的名称 |
| Body | null | 必填 | 消息体 |
| Tags | null | 选填 | 消息标签,方便服务器过滤使用。目前只支持每个消息设置一个 |
| Keys | null | 选填 | 代表这条消息的业务关键词 |
| Flag | 0 | 选填 | 完全由应用来设置,RocketMQ 不做干预 |
| DelayTimeLevel | 0 | 选填 | 消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费 |
| WaitStoreMsgOK | true | 选填 | 表示消息是否在服务器落盘后才返回应答 |
发送消息模式
同步发送(Sync)
同步发送是指消息发送方发出数据后,会阻塞直到收到服务端响应。这种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
异步发送(Async)
异步发送是指发送方发出数据后,不等接收方发回响应,接着进行下一次发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
单向发送(Oneway)
单向发送是指发送方只负责发送消息,不等待服务器响应且没有回调函数触发,即只发送请求不等待应答。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
Consumer(消费者)
消费模型
集群消费(Clustering)
消费者组(Consumer Group)内的实例分摊消费消息。如果一个 Consumer Group 有 3 个实例,Topic 有 6 个队列,那么每个实例消费 2 个队列。这是默认的消费模式,适用于大部分业务场景。
广播消费(Broadcasting)
消费者组内的每个实例都消费 Topic 的全量消息。适用于通知类消息,例如配置更新、缓存刷新等。
消费方式
推模式(Push)
Consumer 端注册 Listener,Broker 收到消息后立即主动推送给 Consumer。实际实现中,Push 模式通常是基于长轮询(Long Polling)机制实现的,即 Consumer 发送 Pull 请求,如果 Broker 没有新消息,会持有请求一段时间,待有新消息或超时后再返回。
拉模式(Pull)
Consumer 端主动发起请求到 Broker 拉取消息。Consumer 需要自己维护 Offset(消费进度),并根据业务逻辑控制拉取频率。
核心功能
顺序消息
RocketMQ 支持局部顺序消息,即保证同一个 MessageQueue 里的消息严格按照 FIFO 顺序进行消费。
- 全局顺序:一个 Topic 只有一个 MessageQueue,吞吐量受限。
- 分区顺序:一个 Topic 有多个 MessageQueue,通过 Sharding Key(如 OrderId)将同一业务逻辑的消息发送到同一个 MessageQueue。
延时消息
RocketMQ 不支持任意时间的延时,而是支持特定等级的延时消息。
默认支持 18 个等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
发送消息时设置 delayTimeLevel 即可。
批量消息
Producer 可以将多条消息合并成一个批量消息进行发送,减少网络 IO 开销,提高吞吐量。批量消息的大小不能超过 4MB(默认)。
事务消息
RocketMQ 提供了分布式事务消息支持,采用 2PC(两阶段提交)+ 补偿机制。
- 发送半消息(Half Message) :Producer 发送消息到 Broker,但消息对 Consumer 不可见。
- 执行本地事务:Producer 执行本地业务逻辑。
- 提交/回滚:根据本地事务结果,Producer 向 Broker 发送 Commit 或 Rollback 指令。
- 回查机制:如果 Broker 长时间未收到 Commit/Rollback,会主动向 Producer 发起回查,检查本地事务状态。
消息过滤
- Tag 过滤:Consumer 订阅时指定 Tag,Broker 端进行过滤。效率高,但仅支持字符串匹配。
- SQL92 过滤:支持简单的 SQL 语法(如
a > 5 AND b = 'abc'),需要 Broker 开启支持。
消息存储
RocketMQ 采用文件系统进行消息存储,主要包含三个核心文件:
CommitLog
消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容。消息内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,左边补零,剩余为起始偏移量。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
ConsumeQueue
消息消费队列,引入目的是为了提高消息消费的性能。CommitLog 是顺序写入的,但消费时是随机读取的(根据 Topic 和 Queue)。ConsumeQueue 存储了消息在 CommitLog 中的物理偏移量(Offset)、消息大小和 Tag HashCode。每个 Topic 下的每个 MessageQueue 都有一个对应的 ConsumeQueue 文件。
IndexFile
索引文件,提供了一种通过 key 或时间区间来查询消息的方法。
刷盘机制
- 同步刷盘(Sync Flush) :消息写入内存后,立刻调用刷盘线程进行刷盘,刷盘成功后才返回响应。数据安全性高,吞吐量低。
- 异步刷盘(Async Flush) :消息写入内存后,立刻返回响应,由后台线程异步刷盘。吞吐量高,但机器断电可能丢失数据。
高可用性
主从复制
- 同步复制:Master 和 Slave 都写入成功才返回成功。数据不丢失,可用性高。
- 异步复制:Master 写入成功即返回成功,Slave 异步同步。延迟低,Master 宕机可能丢失少量数据。
负载均衡
Producer 负载均衡:默认轮询发送到不同的 MessageQueue。
Consumer 负载均衡:
- 平均分配策略(AllocateMessageQueueAveragely)
- 环形分配策略(AllocateMessageQueueAveragelyByCircle)
- 一致性哈希策略(AllocateMessageQueueConsistentHash)
- 机房优先策略(AllocateMachineRoomNearby)
消息重试与死信
消息重试
Consumer 消费失败后,RocketMQ 会进行重试。
- 顺序消息:无限重试,直到成功(会阻塞后续消息)。
- 无序消息:默认重试 16 次,重试间隔逐步增加(10s, 30s, 1m…)。
死信队列(DLQ)
如果消息重试 16 次后仍然失败,消息会被投递到死信队列(Dead Letter Queue)。死信队列的 Topic 格式为 %DLQ%ConsumerGroupName。需人工干预处理。