MQ学习概要
前言
罗列一些日常开发中常用的消息队列相关原理与知识点
ActiveMQ
相关知识点总结
一、基础概念与架构
什么是 ActiveMQ?它的核心作用是什么?
- ActiveMQ 是 Apache 开源的 消息中间件(Message Broker),实现了 JMS(Java Message Service) 规范。
- 核心作用:解耦生产者和消费者,实现异步通信、流量削峰、系统间可靠数据传输。
JMS 的两种消息模型是什么?
- 点对点(Queue):消息被一个消费者消费,队列持久化保证消息不丢失。
- 发布订阅(Topic):消息广播给所有订阅者,默认非持久化(需客户端设置持久订阅)。
ActiveMQ 的核心组件有哪些?
- Broker:消息代理服务器,负责接收、存储和转发消息。
- Transport Connector:通信协议(如 TCP、NIO、WebSocket)。
- Persistence Adapter:持久化存储(如 KahaDB、JDBC)。
- Network Connector:Broker 集群间消息路由。
二、消息传输与可靠性
ActiveMQ 如何保证消息不丢失?
- 持久化存储:消息保存到磁盘(如 KahaDB 或 MySQL)。
- 生产者确认:
JMSProducer
使用DeliveryMode.PERSISTENT
模式。 - 消费者确认:通过
CLIENT_ACKNOWLEDGE
或事务会话手动确认。
消息确认机制(Acknowledge)有哪些模式?
- AUTO_ACKNOWLEDGE:自动确认(消费后立即确认,可能丢失消息)。
- CLIENT_ACKNOWLEDGE:手动调用
message.acknowledge()
确认。 - DUPS_OK_ACKNOWLEDGE:延迟批量确认,允许重复消息(性能高)。
- 事务会话:通过
session.commit()
提交事务时批量确认。
什么是死信队列(DLQ)?如何配置?
- 消息重发多次失败后会被转移到死信队列(默认名
ActiveMQ.DLQ
)。 - 配置参数:
1
2
3
4
5<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
- 消息重发多次失败后会被转移到死信队列(默认名
三、持久化与高可用
ActiveMQ 的持久化存储方式有哪些?
- KahaDB:默认的基于文件的存储(高性能,日志式存储)。
- JDBC:消息存储到数据库(如 MySQL),支持事务但性能较低。
- LevelDB(已弃用)/ RocksDB:高性能 KV 存储。
如何实现 ActiveMQ 的高可用?
- 主从架构:
- 共享存储(Shared Storage):多个 Broker 共享同一存储(如 KahaDB)。
- 基于 ZooKeeper 的主从选举:自动故障转移(推荐方案)。
- 网络连接器(Network of Brokers):多个 Broker 组成集群,消息自动路由。
- 主从架构:
什么是消息重发(Redelivery)?如何控制重发策略?
- 消费者处理失败时,消息会被重新投递。
- 配置重发策略:
1
2
3RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setMaximumRedeliveries(3); // 最大重试次数
policy.setInitialRedeliveryDelay(1000); // 初始延迟
四、性能优化与监控
如何提升 ActiveMQ 的吞吐量?
- 使用 NIO 协议(代替 TCP)提升并发连接性能。
- 开启 生产者流量控制(
producerFlowControl
)避免 Broker 过载。 - 优化 持久化存储(如 KahaDB 的索引缓存)。
- 合理配置 内存限制(
memoryLimit
)防止内存溢出。
ActiveMQ 的消息堆积如何处理?
- 增加消费者数量或优化消费逻辑。
- 开启 Pending Message Limit Strategy,丢弃旧消息或写入临时存储。
- 监控并扩容 Broker。
如何监控 ActiveMQ 的运行状态?
- JMX:通过 JConsole 或 VisualVM 查看队列、主题、内存等指标。
- Web 控制台:默认端口 8161,提供实时监控。
- 日志分析:监控
activemq.log
中的警告和错误信息。
五、底层原理与高级特性
ActiveMQ 的消息存储原理(以 KahaDB 为例)?
- KahaDB 使用 日志追加方式 存储消息,索引文件(
db.data
)记录消息位置,数据文件(db-*.log
)存储消息内容。 - 通过 内存页缓存(Page Cache) 提升读写性能。
- KahaDB 使用 日志追加方式 存储消息,索引文件(
消息的异步发送与同步发送有什么区别?
- 同步发送:生产者阻塞直到 Broker 确认收到消息(可靠但性能低)。
- 异步发送:生产者非阻塞,Broker 异步确认(高性能,可能丢失消息)。
- 配置方式:在连接 URI 中添加
jms.useAsyncSend=true
。
如何实现消息的延迟投递或定时投递?
- 在消息头中设置属性:
1
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000); // 延迟5秒
- 需 Broker 开启调度器支持:在配置文件中添加
<broker ... schedulerSupport="true">
。
- 在消息头中设置属性:
六、常见问题排查
消费者无法接收到消息的可能原因?
- 检查消费者是否订阅正确的队列/主题。
- 确认消息持久化配置与消费者订阅模式(如 Topic 需持久订阅)。
- 网络问题或 Broker 宕机。
ActiveMQ 内存溢出(OOM)如何解决?
- 限制 Broker 内存:
<systemUsage><memoryUsage limit="512mb"/></systemUsage>
。 - 优化消费者处理速度,避免消息堆积。
- 使用异步发送并调整生产者流量控制。
- 限制 Broker 内存:
七、与其他中间件对比
- ActiveMQ 与 RabbitMQ、Kafka 的区别?
- ActiveMQ:支持 JMS 规范,适合传统企业级异步通信,但吞吐量低于 Kafka。
- RabbitMQ:基于 AMQP 协议,灵活的路由机制,社区活跃。
- Kafka:高吞吐量、分布式日志系统,适合大数据场景,但消息延迟较高。
死信通常出现的原因及解决办法
在分布式消息队列(如ActiveMQ)中,私信队列(Dead Letter Queue,DLQ)是用于存储无法正常处理的消息的专用队列。以下是导致消息进入私信队列的常见原因及对应的解决方案:
一、消息进入私信队列的常见原因
- 消息处理失败
- 原因:消费者在处理消息时抛出未捕获的异常(如业务逻辑错误、数据格式不匹配等)。
- 解决方案:
- 优化消费者代码,增加异常处理(如
try-catch
块)。 - 记录错误日志并触发告警,便于及时排查问题。
- 使用事务性会话,确保消息处理失败时回滚并重新投递。
- 优化消费者代码,增加异常处理(如
- 消息超时(TTL过期)
- 原因:消息设置了
timeToLive
(TTL),在指定时间内未被消费。 - 解决方案:
- 根据业务需求合理设置消息的TTL值(如
producer.setTimeToLive()
)。 - 优化消费者性能,避免因处理速度过慢导致消息堆积。
- 监控队列积压情况,动态调整消费者数量(横向扩展)。
- 根据业务需求合理设置消息的TTL值(如
- 原因:消息设置了
- 重试次数超限
- 原因:ActiveMQ默认会重试6次(取决于配置),若重试后仍失败,消息会被移至DLQ。
- 解决方案:
- 调整重试策略(
RedeliveryPolicy
):1
2
3RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setMaximumRedeliveries(10); // 增加最大重试次数
policy.setInitialRedeliveryDelay(1000); // 设置重试间隔(毫秒) - 对不可恢复的错误(如数据格式错误),在消费者中直接确认消息,避免无意义重试。
- 调整重试策略(
- 队列不存在或权限不足
- 原因:生产者向不存在的队列发送消息,或消费者无权限访问队列。
- 解决方案:
- 检查队列名称和权限配置(如ActiveMQ的
activemq.xml
中的授权配置)。 - 使用管理接口(如ActiveMQ Web Console)确认队列是否被正确创建。
- 检查队列名称和权限配置(如ActiveMQ的
- 系统或网络故障
- 原因:消费者因宕机、网络中断等临时故障无法处理消息。
- 解决方案:
- 设计高可用架构(如集群部署消费者)。
- 启用持久化消息,防止消息在Broker重启后丢失。
二、ActiveMQ私信队列的配置与管理
- 自定义私信队列名称
- 在
activemq.xml
中配置私信队列策略:1
2
3<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
- 在
- 禁用私信队列(不推荐)
1
2
3<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy> - 处理私信队列中的消息
- 手动处理:通过管理工具(如ActiveMQ Web Console)查看DLQ中的消息,重新投递或删除。
- 自动处理:编写DLQ消费者程序,根据错误类型决定重试或记录日志。
三、最佳实践
- 监控与告警
- 监控DLQ中的消息数量,触发阈值告警。
- 使用工具(如Prometheus + Grafana)可视化监控队列状态。
- 合理设置重试策略
- 根据业务容忍度调整重试次数和间隔,避免消息积压。
- 区分可恢复与不可恢复错误
- 对可恢复错误(如依赖服务超时)增加重试次数。
- 对不可恢复错误(如数据校验失败)直接记录日志并确认消息。
- 使用事务和ACK机制
- 在消费者中使用事务或手动ACK,确保消息正确处理后才确认。
RocketMQ
相关知识点
一、基础概念与核心组件
RocketMQ 是什么?它的核心优势是什么?
- RocketMQ 是阿里开源的分布式消息中间件,支持高吞吐、低延迟、高可用、海量消息堆积。
- 核心优势:
- 严格的消息顺序保证。
- 事务消息、延迟消息等高级特性。
- 分布式架构(支持水平扩展和故障自动恢复)。
RocketMQ 的核心组件有哪些?
- NameServer:轻量级注册中心,管理 Broker 元数据(无状态,可集群部署)。
- Broker:消息存储与转发节点(分主从角色,支持同步/异步复制)。
- Producer:消息生产者(支持同步/异步/单向发送)。
- Consumer:消息消费者(支持集群消费和广播消费)。
RocketMQ 与 Kafka 的主要区别?
- 消息模型:RocketMQ 基于 Topic + Queue,Kafka 基于 Partition。
- 消息顺序:RocketMQ 支持严格顺序消息(通过队列锁),Kafka 仅分区内有序。
- 延迟消息:RocketMQ 内置延迟消息支持,Kafka 需自行实现。
- 事务消息:RocketMQ 提供完整的事务消息机制,Kafka 需外部协调。
二、消息存储与高可用
RocketMQ 的消息存储机制(CommitLog + ConsumeQueue)?
- CommitLog:所有消息按顺序追加写入日志文件(顺序写高性能)。
- ConsumeQueue:逻辑队列,记录消息在 CommitLog 中的偏移量(消费者按队列拉取)。
- IndexFile:基于消息 Key 的哈希索引,用于快速查询。
如何保证 Broker 的高可用?
- 主从架构:
- 同步复制(Master-Slave):主节点写入后同步到从节点(强一致,性能较低)。
- 异步复制:主节点写入后异步复制到从节点(高吞吐,可能丢数据)。
- 故障转移:NameServer 检测 Broker 存活状态,自动切换消费者到从节点。
- 主从架构:
Broker 的刷盘机制(同步刷盘 vs 异步刷盘)?
- 同步刷盘:消息写入内存后,立即刷到磁盘(数据可靠,性能低)。
- 异步刷盘:消息写入内存后返回成功,由后台线程批量刷盘(高性能,宕机可能丢失数据)。
三、消息可靠性
如何保证消息不丢失?
- 生产者端:
- 使用同步发送 + 重试机制。
- 开启事务消息(最终一致性)。
- Broker 端:同步刷盘 + 主从同步复制。
- 消费者端:消费成功后返回
CONSUME_SUCCESS
,失败则重试。
- 生产者端:
RocketMQ 的事务消息实现原理?
- 半事务消息:
- Producer 发送半消息(对消费者不可见)。
- 执行本地事务并通知 Broker 提交/回滚。
- Broker 根据事务状态提交消息或丢弃。
- 事务回查:若 Producer 未提交状态,Broker 主动回查事务状态。
- 半事务消息:
顺序消息如何实现?
- 全局顺序:单 Topic 仅一个队列(牺牲并发能力)。
- 分区顺序:同一业务 Key 的消息发送到同一队列(如订单 ID)。
- 消费者端:单线程顺序消费队列。
四、消息消费
PushConsumer 和 PullConsumer 的区别?
- PushConsumer:Broker 主动推送消息(默认推荐,内部封装拉取逻辑)。
- PullConsumer:需手动调用
pull
方法拉取消息(灵活性高,复杂度高)。
如何避免消息重复消费?
- 幂等处理:业务逻辑保证重复消息的幂等性(如数据库唯一键)。
- 去重表:记录已处理消息的唯一 ID(如订单号)。
- RocketMQ 消息 ID:同一消息在重试时 ID 不变,可据此去重。
消息重试机制(Retry Queue)与死信队列(DLQ)?
- 重试队列:消费失败的消息进入重试队列,按延迟级别(如 10s、30s)重新投递。
- 死信队列:重试 16 次后仍失败的消息进入死信队列(需人工处理)。
五、性能优化
如何提升 RocketMQ 的吞吐量?
- 生产者端:
- 批量发送消息(
sendBatch
)。 - 开启异步发送(
sendAsync
)。
- 批量发送消息(
- Broker 端:
- 异步刷盘 + 异步复制。
- 调整
sendMessageThreadPoolNums
(发送线程数)。
- 消费者端:
- 增加消费者实例数(集群消费模式)。
- 提高并发消费线程数(
consumeThreadMin
/Max
)。
- 生产者端:
零拷贝(Zero-Copy)技术如何应用?
- MappedByteBuffer:通过内存映射文件(MMAP)减少数据从内核态到用户态的拷贝。
- FileChannel.transferTo:直接传输文件数据到网络通道(避免多次拷贝)。
如何优化消息堆积(Backlog)?
- 增加消费者实例:水平扩展消费者集群。
- 跳过非关键消息:动态降低消费延迟(如日志场景)。
- 预分区扩容:提前增加 Topic 队列数(需 Producer 配合重哈希)。
六、高级特性与底层原理
延迟消息的实现原理?
- RocketMQ 内置 18 个延迟级别(1s~2h),消息存入
SCHEDULE_TOPIC_XXXX
Topic。 - 定时任务扫描到期消息,重新投递到目标 Topic。
- RocketMQ 内置 18 个延迟级别(1s~2h),消息存入
消息过滤的两种方式?
- Tag 过滤:消费者订阅时指定 Tag(服务端过滤,效率高)。
- SQL 过滤:基于消息属性编写 SQL 表达式(需 Broker 开启
enablePropertyFilter=true
)。
NameServer 的作用与高可用实现?
- 作用:维护 Broker 路由信息(无状态,不参与消息传输)。
- 高可用:多节点部署,客户端轮询连接,单点故障不影响整体服务。
七、实战与排查
消息发送慢的可能原因?
- 网络延迟或 Broker 负载过高。
- 同步刷盘或同步复制配置导致性能瓶颈。
- Producer 未开启线程池优化或批量发送。
消费者收不到消息的可能原因?
- 订阅关系不一致(Topic/Tag 不匹配)。
- 消费者组未正确配置(集群模式 vs 广播模式)。
- Broker 权限问题或 NameServer 路由信息未更新。
八、与其他中间件对比
- RocketMQ 与 Kafka 的适用场景?
- RocketMQ:金融级事务消息、顺序消息、延迟消息(电商、交易系统)。
- Kafka:日志采集、流处理、大数据场景(高吞吐,但功能较单一)。
RocketMQ与ActiveMQ比较
以下是 RocketMQ 与 ActiveMQ 的对比分析,涵盖核心特性、架构设计、适用场景等关键点:
一、核心定位与设计理念
特性 | RocketMQ | ActiveMQ |
---|---|---|
定位 | 分布式、高吞吐、低延迟、金融级可靠性 | 传统企业级消息中间件,遵循 JMS 规范 |
设计目标 | 支持海量消息堆积、严格顺序、事务消息 | 提供标准消息模型(Queue/Topic)和基础功能 |
协议支持 | 自定义协议(Remoting) | 支持 JMS、AMQP、MQTT、STOMP 等多协议 |
二、架构与核心组件
特性 | RocketMQ | ActiveMQ |
---|---|---|
核心架构 | 分布式架构:NameServer + Broker 集群 | 单节点或主从集群 |
元数据管理 | NameServer(轻量级,无状态) | 无独立组件,依赖 ZooKeeper(可选) |
存储模型 | CommitLog(顺序写) + ConsumeQueue | KahaDB(日志存储)或 JDBC(数据库存储) |
高可用方案 | 主从同步/异步复制 + 自动故障转移 | 共享存储(如 KahaDB)或 ZooKeeper 主从 |
三、性能与可靠性
特性 | RocketMQ | ActiveMQ |
---|---|---|
吞吐量 | 单机百万级 TPS(高并发场景) | 万级 TPS(适合中小规模场景) |
消息堆积能力 | 支持海量消息堆积(TB 级) | 堆积能力有限(依赖存储空间) |
消息可靠性 | 同步刷盘 + 主从同步复制(金融级可靠) | 异步刷盘 + 主从异步复制(可能丢数据) |
顺序消息 | 严格保证分区顺序(同一队列) | 需额外配置(如 Exclusive Consumer) |
四、功能特性
特性 | RocketMQ | ActiveMQ |
---|---|---|
事务消息 | 完整事务消息机制(半事务 + 回查) | 支持 JMS 事务(功能较弱) |
延迟消息 | 内置 18 个延迟级别(无需插件) | 需通过插件(如 activemq-scheduler ) |
消息过滤 | Tag 过滤(高效) + SQL 属性过滤 | 基于 Selector(JMS 标准) |
消息轨迹 | 内置消息轨迹追踪 | 需插件或自定义实现 |
五、适用场景
场景 | RocketMQ | ActiveMQ |
---|---|---|
推荐场景 | - 高并发交易系统(电商、金融) - 大数据日志采集 - 严格顺序消息(如订单) |
- 传统企业应用(ERP、CRM) - 轻量级异步通信 - 多协议兼容需求 |
不适用场景 | - 需要多协议支持的异构系统 | - 超高吞吐(如日志流处理) - 海量消息堆积场景 |
六、优缺点对比
维度 | RocketMQ | ActiveMQ |
---|---|---|
优点 | - 高吞吐、低延迟 - 严格消息顺序保证 - 分布式架构易扩展 - 完善的监控和运维工具 |
- 协议支持丰富 - 轻量级部署简单 - 社区成熟,文档完善 |
缺点 | - 功能复杂度高(需深入调优) - 对 JMS 规范支持较弱 |
- 性能瓶颈明显(单机架构) - 海量消息堆积能力不足 - 高可用实现复杂 |
七、综合建议
选择 RocketMQ:
- 需要高吞吐、低延迟、海量消息堆积能力。
- 业务场景涉及金融级事务消息或严格顺序消息。
- 系统需分布式架构和水平扩展能力。
选择 ActiveMQ:
- 传统企业应用,需遵循 JMS 规范。
- 系统复杂度低,无需超高并发或海量消息堆积。
- 需要支持多种消息协议(如 AMQP、MQTT)。