Skip to content

RabbitMQ消息队列

基础

什么是消息队列?

答案: 消息队列(Message Queue)是一种异步通信机制,用于系统间的解耦和削峰填谷。

核心概念

  • 生产者(Producer):发送消息
  • 消费者(Consumer):接收消息
  • 队列(Queue):存储消息
  • 交换机(Exchange):路由消息

优点

  • 解耦:生产者和消费者独立
  • 异步:提高响应速度
  • 削峰:缓冲高并发请求
  • 可靠性:消息持久化

缺点

  • 系统复杂度增加
  • 消息可能丢失
  • 消息顺序性问题
  • 消息重复消费

常见的消息队列对比?

答案:

特性RabbitMQRocketMQKafka
语言ErlangJavaScala
吞吐量万级十万级百万级
延迟微秒级毫秒级毫秒级
可用性高(主从)非常高(分布式)非常高(分布式)
功能丰富丰富简单
适用场景中小规模大规模日志、大数据

RabbitMQ核心概念

RabbitMQ的架构?

答案:

Producer → Exchange → Queue → Consumer

           Binding

核心组件

  • Broker:消息队列服务器
  • Virtual Host:虚拟主机,隔离不同应用
  • Exchange:交换机,接收消息并路由
  • Queue:队列,存储消息
  • Binding:绑定,Exchange和Queue的关系
  • Channel:信道,复用TCP连接

RabbitMQ的交换机类型?

答案:

1. Direct Exchange(直连交换机)

  • 根据routing key精确匹配
  • 一对一路由
java
// 发送消息
channel.basicPublish("direct_exchange", "info", null, message.getBytes());

// 绑定队列
channel.queueBind("queue_info", "direct_exchange", "info");

2. Fanout Exchange(扇出交换机)

  • 广播模式,忽略routing key
  • 发送到所有绑定的队列
java
// 发送消息
channel.basicPublish("fanout_exchange", "", null, message.getBytes());

// 绑定队列
channel.queueBind("queue1", "fanout_exchange", "");
channel.queueBind("queue2", "fanout_exchange", "");

3. Topic Exchange(主题交换机)

  • 根据routing key模糊匹配
  • 支持通配符:*(一个词)、#(零个或多个词)
java
// 发送消息
channel.basicPublish("topic_exchange", "user.info", null, message.getBytes());

// 绑定队列
channel.queueBind("queue1", "topic_exchange", "user.*");    // 匹配user.info
channel.queueBind("queue2", "topic_exchange", "user.#");    // 匹配user.info
channel.queueBind("queue3", "topic_exchange", "*.info");    // 匹配user.info

4. Headers Exchange(头交换机)

  • 根据消息头属性匹配
  • 很少使用

Spring Boot整合RabbitMQ

如何整合RabbitMQ?

答案:

1. 添加依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置连接

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3. 配置交换机和队列

java
@Configuration
public class RabbitConfig {

    @Bean
    public Queue queue() {
        return new Queue("test_queue", true);  // durable=true持久化
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("test_exchange");
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("test_key");
    }
}

4. 发送消息

java
@Service
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend("test_exchange", "test_key", message);
    }
}

5. 接收消息

java
@Component
public class Consumer {

    @RabbitListener(queues = "test_queue")
    public void receive(String message) {
        System.out.println("收到消息: " + message);
    }
}

高级特性

如何保证消息不丢失?

答案:

1. 生产者确认(Publisher Confirms)

yaml
spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启确认
    publisher-returns: true             # 开启退回
java
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 确认回调
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败: " + cause);
            }
        });

        // 退回回调
        template.setReturnsCallback(returned -> {
            System.out.println("消息被退回: " + returned.getMessage());
        });

        return template;
    }
}

2. 消息持久化

java
// 交换机持久化
DirectExchange exchange = new DirectExchange("test_exchange", true, false);

// 队列持久化
Queue queue = new Queue("test_queue", true);

// 消息持久化
rabbitTemplate.convertAndSend("test_exchange", "test_key", message,
    msg -> {
        msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return msg;
    });

3. 消费者确认(ACK)

yaml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认
java
@RabbitListener(queues = "test_queue")
public void receive(Message message, Channel channel) throws IOException {
    try {
        String msg = new String(message.getBody());
        System.out.println("收到消息: " + msg);

        // 业务处理
        // ...

        // 手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 拒绝消息,重新入队
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

如何避免消息重复消费?

答案:

1. 消息幂等性

  • 使用唯一ID(如订单号)
  • 数据库唯一索引
  • Redis去重
java
@RabbitListener(queues = "test_queue")
public void receive(String message) {
    String messageId = extractMessageId(message);

    // Redis去重
    if (redisTemplate.opsForValue().setIfAbsent(messageId, "1", 1, TimeUnit.HOURS)) {
        // 处理消息
        processMessage(message);
    } else {
        System.out.println("消息已处理,跳过");
    }
}

2. 数据库唯一索引

sql
CREATE TABLE message_log (
    message_id VARCHAR(64) PRIMARY KEY,
    content TEXT,
    create_time DATETIME
);

如何保证消息顺序性?

答案:

问题:多个消费者并发消费,无法保证顺序

解决方案

1. 单队列单消费者

  • 简单但性能低

2. 根据业务分队列

  • 相同订单的消息发到同一队列
  • 使用routing key区分
java
// 发送消息时指定routing key
String routingKey = "order_" + (orderId % 10);  // 根据订单ID取模
rabbitTemplate.convertAndSend("order_exchange", routingKey, message);

3. 消息分组

  • 使用RabbitMQ的一致性哈希交换机

如何处理死信队列?

答案:

死信(Dead Letter)产生原因

  • 消息被拒绝(basic.reject/basic.nack)且requeue=false
  • 消息TTL过期
  • 队列达到最大长度

配置死信队列

java
@Configuration
public class DeadLetterConfig {

    // 正常队列
    @Bean
    public Queue normalQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead_exchange");  // 死信交换机
        args.put("x-dead-letter-routing-key", "dead_key");    // 死信routing key
        args.put("x-message-ttl", 10000);  // 消息TTL 10秒
        return new Queue("normal_queue", true, false, false, args);
    }

    // 死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("dead_exchange");
    }

    // 死信队列
    @Bean
    public Queue deadQueue() {
        return new Queue("dead_queue", true);
    }

    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_key");
    }
}

消费死信

java
@RabbitListener(queues = "dead_queue")
public void receiveDeadLetter(String message) {
    System.out.println("收到死信: " + message);
    // 记录日志、发送告警等
}

如何实现延时队列?

答案:

方案1:TTL + 死信队列

java
// 发送延时消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", message,
    msg -> {
        msg.getMessageProperties().setExpiration("10000");  // 10秒后过期
        return msg;
    });

方案2:延时插件

bash
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
java
@Bean
public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
}

// 发送延时消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", message,
    msg -> {
        msg.getMessageProperties().setDelay(10000);  // 延时10秒
        return msg;
    });

练习题

  1. RabbitMQ如何实现高可用?
  2. RabbitMQ的消息堆积如何处理?
  3. RabbitMQ和Kafka如何选择?
  4. 如何监控RabbitMQ?

Released under the MIT License.