MQ学习概要

前言

罗列一些日常开发中常用的消息队列相关原理与知识点

ActiveMQ

相关知识点总结


一、基础概念与架构

  1. 什么是 ActiveMQ?它的核心作用是什么?

    • ActiveMQ 是 Apache 开源的 消息中间件(Message Broker),实现了 JMS(Java Message Service) 规范。
    • 核心作用:解耦生产者和消费者,实现异步通信、流量削峰、系统间可靠数据传输。
  2. JMS 的两种消息模型是什么?

    • 点对点(Queue):消息被一个消费者消费,队列持久化保证消息不丢失。
    • 发布订阅(Topic):消息广播给所有订阅者,默认非持久化(需客户端设置持久订阅)。
  3. ActiveMQ 的核心组件有哪些?

    • Broker:消息代理服务器,负责接收、存储和转发消息。
    • Transport Connector:通信协议(如 TCP、NIO、WebSocket)。
    • Persistence Adapter:持久化存储(如 KahaDB、JDBC)。
    • Network Connector:Broker 集群间消息路由。

二、消息传输与可靠性

  1. ActiveMQ 如何保证消息不丢失?

    • 持久化存储:消息保存到磁盘(如 KahaDB 或 MySQL)。
    • 生产者确认JMSProducer 使用 DeliveryMode.PERSISTENT 模式。
    • 消费者确认:通过 CLIENT_ACKNOWLEDGE 或事务会话手动确认。
  2. 消息确认机制(Acknowledge)有哪些模式?

    • AUTO_ACKNOWLEDGE:自动确认(消费后立即确认,可能丢失消息)。
    • CLIENT_ACKNOWLEDGE:手动调用 message.acknowledge() 确认。
    • DUPS_OK_ACKNOWLEDGE:延迟批量确认,允许重复消息(性能高)。
    • 事务会话:通过 session.commit() 提交事务时批量确认。
  3. 什么是死信队列(DLQ)?如何配置?

    • 消息重发多次失败后会被转移到死信队列(默认名 ActiveMQ.DLQ)。
    • 配置参数
      1
      2
      3
      4
      5
      <policyEntry queue=">">
      <deadLetterStrategy>
      <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
      </deadLetterStrategy>
      </policyEntry>

三、持久化与高可用

  1. ActiveMQ 的持久化存储方式有哪些?

    • KahaDB:默认的基于文件的存储(高性能,日志式存储)。
    • JDBC:消息存储到数据库(如 MySQL),支持事务但性能较低。
    • LevelDB(已弃用)/ RocksDB:高性能 KV 存储。
  2. 如何实现 ActiveMQ 的高可用?

    • 主从架构
      • 共享存储(Shared Storage):多个 Broker 共享同一存储(如 KahaDB)。
      • 基于 ZooKeeper 的主从选举:自动故障转移(推荐方案)。
    • 网络连接器(Network of Brokers):多个 Broker 组成集群,消息自动路由。
  3. 什么是消息重发(Redelivery)?如何控制重发策略?

    • 消费者处理失败时,消息会被重新投递。
    • 配置重发策略
      1
      2
      3
      RedeliveryPolicy policy = connection.getRedeliveryPolicy();
      policy.setMaximumRedeliveries(3); // 最大重试次数
      policy.setInitialRedeliveryDelay(1000); // 初始延迟

四、性能优化与监控

  1. 如何提升 ActiveMQ 的吞吐量?

    • 使用 NIO 协议(代替 TCP)提升并发连接性能。
    • 开启 生产者流量控制producerFlowControl)避免 Broker 过载。
    • 优化 持久化存储(如 KahaDB 的索引缓存)。
    • 合理配置 内存限制memoryLimit)防止内存溢出。
  2. ActiveMQ 的消息堆积如何处理?

    • 增加消费者数量或优化消费逻辑。
    • 开启 Pending Message Limit Strategy,丢弃旧消息或写入临时存储。
    • 监控并扩容 Broker。
  3. 如何监控 ActiveMQ 的运行状态?

    • JMX:通过 JConsole 或 VisualVM 查看队列、主题、内存等指标。
    • Web 控制台:默认端口 8161,提供实时监控。
    • 日志分析:监控 activemq.log 中的警告和错误信息。

五、底层原理与高级特性

  1. ActiveMQ 的消息存储原理(以 KahaDB 为例)?

    • KahaDB 使用 日志追加方式 存储消息,索引文件(db.data)记录消息位置,数据文件(db-*.log)存储消息内容。
    • 通过 内存页缓存(Page Cache) 提升读写性能。
  2. 消息的异步发送与同步发送有什么区别?

    • 同步发送:生产者阻塞直到 Broker 确认收到消息(可靠但性能低)。
    • 异步发送:生产者非阻塞,Broker 异步确认(高性能,可能丢失消息)。
    • 配置方式:在连接 URI 中添加 jms.useAsyncSend=true
  3. 如何实现消息的延迟投递或定时投递?

    • 在消息头中设置属性:
      1
      message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000); // 延迟5秒
    • 需 Broker 开启调度器支持:在配置文件中添加 <broker ... schedulerSupport="true">

六、常见问题排查

  1. 消费者无法接收到消息的可能原因?

    • 检查消费者是否订阅正确的队列/主题。
    • 确认消息持久化配置与消费者订阅模式(如 Topic 需持久订阅)。
    • 网络问题或 Broker 宕机。
  2. ActiveMQ 内存溢出(OOM)如何解决?

    • 限制 Broker 内存:<systemUsage><memoryUsage limit="512mb"/></systemUsage>
    • 优化消费者处理速度,避免消息堆积。
    • 使用异步发送并调整生产者流量控制。

七、与其他中间件对比

  1. ActiveMQ 与 RabbitMQ、Kafka 的区别?
    • ActiveMQ:支持 JMS 规范,适合传统企业级异步通信,但吞吐量低于 Kafka。
    • RabbitMQ:基于 AMQP 协议,灵活的路由机制,社区活跃。
    • Kafka:高吞吐量、分布式日志系统,适合大数据场景,但消息延迟较高。

死信通常出现的原因及解决办法

在分布式消息队列(如ActiveMQ)中,私信队列(Dead Letter Queue,DLQ)是用于存储无法正常处理的消息的专用队列。以下是导致消息进入私信队列的常见原因及对应的解决方案:

一、消息进入私信队列的常见原因

  1. 消息处理失败
    • 原因:消费者在处理消息时抛出未捕获的异常(如业务逻辑错误、数据格式不匹配等)。
    • 解决方案
      • 优化消费者代码,增加异常处理(如try-catch块)。
      • 记录错误日志并触发告警,便于及时排查问题。
      • 使用事务性会话,确保消息处理失败时回滚并重新投递。
  2. 消息超时(TTL过期)
    • 原因:消息设置了timeToLive(TTL),在指定时间内未被消费。
    • 解决方案
      • 根据业务需求合理设置消息的TTL值(如producer.setTimeToLive())。
      • 优化消费者性能,避免因处理速度过慢导致消息堆积。
      • 监控队列积压情况,动态调整消费者数量(横向扩展)。
  3. 重试次数超限
    • 原因:ActiveMQ默认会重试6次(取决于配置),若重试后仍失败,消息会被移至DLQ。
    • 解决方案
      • 调整重试策略(RedeliveryPolicy):
        1
        2
        3
        RedeliveryPolicy policy = connection.getRedeliveryPolicy();
        policy.setMaximumRedeliveries(10); // 增加最大重试次数
        policy.setInitialRedeliveryDelay(1000); // 设置重试间隔(毫秒)
      • 对不可恢复的错误(如数据格式错误),在消费者中直接确认消息,避免无意义重试。
  4. 队列不存在或权限不足
    • 原因:生产者向不存在的队列发送消息,或消费者无权限访问队列。
    • 解决方案
      • 检查队列名称和权限配置(如ActiveMQ的activemq.xml中的授权配置)。
      • 使用管理接口(如ActiveMQ Web Console)确认队列是否被正确创建。
  5. 系统或网络故障
    • 原因:消费者因宕机、网络中断等临时故障无法处理消息。
    • 解决方案
      • 设计高可用架构(如集群部署消费者)。
      • 启用持久化消息,防止消息在Broker重启后丢失。

二、ActiveMQ私信队列的配置与管理

  1. 自定义私信队列名称
    • activemq.xml中配置私信队列策略:
      1
      2
      3
      <deadLetterStrategy>
      <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
      </deadLetterStrategy>
  2. 禁用私信队列(不推荐)
    1
    2
    3
    <deadLetterStrategy>
    <sharedDeadLetterStrategy processNonPersistent="true" />
    </deadLetterStrategy>
  3. 处理私信队列中的消息
    • 手动处理:通过管理工具(如ActiveMQ Web Console)查看DLQ中的消息,重新投递或删除。
    • 自动处理:编写DLQ消费者程序,根据错误类型决定重试或记录日志。

三、最佳实践

  1. 监控与告警
    • 监控DLQ中的消息数量,触发阈值告警。
    • 使用工具(如Prometheus + Grafana)可视化监控队列状态。
  2. 合理设置重试策略
    • 根据业务容忍度调整重试次数和间隔,避免消息积压。
  3. 区分可恢复与不可恢复错误
    • 对可恢复错误(如依赖服务超时)增加重试次数。
    • 对不可恢复错误(如数据校验失败)直接记录日志并确认消息。
  4. 使用事务和ACK机制
    • 在消费者中使用事务或手动ACK,确保消息正确处理后才确认。

RocketMQ

相关知识点


一、基础概念与核心组件

  1. RocketMQ 是什么?它的核心优势是什么?

    • RocketMQ 是阿里开源的分布式消息中间件,支持高吞吐、低延迟、高可用、海量消息堆积。
    • 核心优势
      • 严格的消息顺序保证。
      • 事务消息、延迟消息等高级特性。
      • 分布式架构(支持水平扩展和故障自动恢复)。
  2. RocketMQ 的核心组件有哪些?

    • NameServer:轻量级注册中心,管理 Broker 元数据(无状态,可集群部署)。
    • Broker:消息存储与转发节点(分主从角色,支持同步/异步复制)。
    • Producer:消息生产者(支持同步/异步/单向发送)。
    • Consumer:消息消费者(支持集群消费和广播消费)。
  3. RocketMQ 与 Kafka 的主要区别?

    • 消息模型:RocketMQ 基于 Topic + Queue,Kafka 基于 Partition。
    • 消息顺序:RocketMQ 支持严格顺序消息(通过队列锁),Kafka 仅分区内有序。
    • 延迟消息:RocketMQ 内置延迟消息支持,Kafka 需自行实现。
    • 事务消息:RocketMQ 提供完整的事务消息机制,Kafka 需外部协调。

二、消息存储与高可用

  1. RocketMQ 的消息存储机制(CommitLog + ConsumeQueue)?

    • CommitLog:所有消息按顺序追加写入日志文件(顺序写高性能)。
    • ConsumeQueue:逻辑队列,记录消息在 CommitLog 中的偏移量(消费者按队列拉取)。
    • IndexFile:基于消息 Key 的哈希索引,用于快速查询。
  2. 如何保证 Broker 的高可用?

    • 主从架构
      • 同步复制(Master-Slave):主节点写入后同步到从节点(强一致,性能较低)。
      • 异步复制:主节点写入后异步复制到从节点(高吞吐,可能丢数据)。
    • 故障转移:NameServer 检测 Broker 存活状态,自动切换消费者到从节点。
  3. Broker 的刷盘机制(同步刷盘 vs 异步刷盘)?

    • 同步刷盘:消息写入内存后,立即刷到磁盘(数据可靠,性能低)。
    • 异步刷盘:消息写入内存后返回成功,由后台线程批量刷盘(高性能,宕机可能丢失数据)。

三、消息可靠性

  1. 如何保证消息不丢失?

    • 生产者端
      • 使用同步发送 + 重试机制。
      • 开启事务消息(最终一致性)。
    • Broker 端:同步刷盘 + 主从同步复制。
    • 消费者端:消费成功后返回 CONSUME_SUCCESS,失败则重试。
  2. RocketMQ 的事务消息实现原理?

    • 半事务消息
      1. Producer 发送半消息(对消费者不可见)。
      2. 执行本地事务并通知 Broker 提交/回滚。
      3. Broker 根据事务状态提交消息或丢弃。
    • 事务回查:若 Producer 未提交状态,Broker 主动回查事务状态。
  3. 顺序消息如何实现?

    • 全局顺序:单 Topic 仅一个队列(牺牲并发能力)。
    • 分区顺序:同一业务 Key 的消息发送到同一队列(如订单 ID)。
    • 消费者端:单线程顺序消费队列。

四、消息消费

  1. PushConsumer 和 PullConsumer 的区别?

    • PushConsumer:Broker 主动推送消息(默认推荐,内部封装拉取逻辑)。
    • PullConsumer:需手动调用 pull 方法拉取消息(灵活性高,复杂度高)。
  2. 如何避免消息重复消费?

    • 幂等处理:业务逻辑保证重复消息的幂等性(如数据库唯一键)。
    • 去重表:记录已处理消息的唯一 ID(如订单号)。
    • RocketMQ 消息 ID:同一消息在重试时 ID 不变,可据此去重。
  3. 消息重试机制(Retry Queue)与死信队列(DLQ)?

    • 重试队列:消费失败的消息进入重试队列,按延迟级别(如 10s、30s)重新投递。
    • 死信队列:重试 16 次后仍失败的消息进入死信队列(需人工处理)。

五、性能优化

  1. 如何提升 RocketMQ 的吞吐量?

    • 生产者端
      • 批量发送消息(sendBatch)。
      • 开启异步发送(sendAsync)。
    • Broker 端
      • 异步刷盘 + 异步复制。
      • 调整 sendMessageThreadPoolNums(发送线程数)。
    • 消费者端
      • 增加消费者实例数(集群消费模式)。
      • 提高并发消费线程数(consumeThreadMin/Max)。
  2. 零拷贝(Zero-Copy)技术如何应用?

    • MappedByteBuffer:通过内存映射文件(MMAP)减少数据从内核态到用户态的拷贝。
    • FileChannel.transferTo:直接传输文件数据到网络通道(避免多次拷贝)。
  3. 如何优化消息堆积(Backlog)?

    • 增加消费者实例:水平扩展消费者集群。
    • 跳过非关键消息:动态降低消费延迟(如日志场景)。
    • 预分区扩容:提前增加 Topic 队列数(需 Producer 配合重哈希)。

六、高级特性与底层原理

  1. 延迟消息的实现原理?

    • RocketMQ 内置 18 个延迟级别(1s~2h),消息存入 SCHEDULE_TOPIC_XXXX Topic。
    • 定时任务扫描到期消息,重新投递到目标 Topic。
  2. 消息过滤的两种方式?

    • Tag 过滤:消费者订阅时指定 Tag(服务端过滤,效率高)。
    • SQL 过滤:基于消息属性编写 SQL 表达式(需 Broker 开启 enablePropertyFilter=true)。
  3. NameServer 的作用与高可用实现?

    • 作用:维护 Broker 路由信息(无状态,不参与消息传输)。
    • 高可用:多节点部署,客户端轮询连接,单点故障不影响整体服务。

七、实战与排查

  1. 消息发送慢的可能原因?

    • 网络延迟或 Broker 负载过高。
    • 同步刷盘或同步复制配置导致性能瓶颈。
    • Producer 未开启线程池优化或批量发送。
  2. 消费者收不到消息的可能原因?

    • 订阅关系不一致(Topic/Tag 不匹配)。
    • 消费者组未正确配置(集群模式 vs 广播模式)。
    • Broker 权限问题或 NameServer 路由信息未更新。

八、与其他中间件对比

  1. RocketMQ 与 Kafka 的适用场景?
    • RocketMQ:金融级事务消息、顺序消息、延迟消息(电商、交易系统)。
    • Kafka:日志采集、流处理、大数据场景(高吞吐,但功能较单一)。

RocketMQ与ActiveMQ比较

以下是 RocketMQActiveMQ 的对比分析,涵盖核心特性、架构设计、适用场景等关键点:


一、核心定位与设计理念

特性 RocketMQ ActiveMQ
定位 分布式、高吞吐、低延迟、金融级可靠性 传统企业级消息中间件,遵循 JMS 规范
设计目标 支持海量消息堆积、严格顺序、事务消息 提供标准消息模型(Queue/Topic)和基础功能
协议支持 自定义协议(Remoting) 支持 JMS、AMQP、MQTT、STOMP 等多协议

二、架构与核心组件

特性 RocketMQ ActiveMQ
核心架构 分布式架构:NameServer + Broker 集群 单节点或主从集群
元数据管理 NameServer(轻量级,无状态) 无独立组件,依赖 ZooKeeper(可选)
存储模型 CommitLog(顺序写) + ConsumeQueue KahaDB(日志存储)或 JDBC(数据库存储)
高可用方案 主从同步/异步复制 + 自动故障转移 共享存储(如 KahaDB)或 ZooKeeper 主从

三、性能与可靠性

特性 RocketMQ ActiveMQ
吞吐量 单机百万级 TPS(高并发场景) 万级 TPS(适合中小规模场景)
消息堆积能力 支持海量消息堆积(TB 级) 堆积能力有限(依赖存储空间)
消息可靠性 同步刷盘 + 主从同步复制(金融级可靠) 异步刷盘 + 主从异步复制(可能丢数据)
顺序消息 严格保证分区顺序(同一队列) 需额外配置(如 Exclusive Consumer)

四、功能特性

特性 RocketMQ ActiveMQ
事务消息 完整事务消息机制(半事务 + 回查) 支持 JMS 事务(功能较弱)
延迟消息 内置 18 个延迟级别(无需插件) 需通过插件(如 activemq-scheduler
消息过滤 Tag 过滤(高效) + SQL 属性过滤 基于 Selector(JMS 标准)
消息轨迹 内置消息轨迹追踪 需插件或自定义实现

五、适用场景

场景 RocketMQ ActiveMQ
推荐场景 - 高并发交易系统(电商、金融)
- 大数据日志采集
- 严格顺序消息(如订单)
- 传统企业应用(ERP、CRM)
- 轻量级异步通信
- 多协议兼容需求
不适用场景 - 需要多协议支持的异构系统 - 超高吞吐(如日志流处理)
- 海量消息堆积场景

六、优缺点对比

维度 RocketMQ ActiveMQ
优点 - 高吞吐、低延迟
- 严格消息顺序保证
- 分布式架构易扩展
- 完善的监控和运维工具
- 协议支持丰富
- 轻量级部署简单
- 社区成熟,文档完善
缺点 - 功能复杂度高(需深入调优)
- 对 JMS 规范支持较弱
- 性能瓶颈明显(单机架构)
- 海量消息堆积能力不足
- 高可用实现复杂

七、综合建议

  • 选择 RocketMQ

    • 需要高吞吐、低延迟、海量消息堆积能力。
    • 业务场景涉及金融级事务消息或严格顺序消息。
    • 系统需分布式架构和水平扩展能力。
  • 选择 ActiveMQ

    • 传统企业应用,需遵循 JMS 规范。
    • 系统复杂度低,无需超高并发或海量消息堆积。
    • 需要支持多种消息协议(如 AMQP、MQTT)。

RabbitMQ