RabbitMQ消息队列
基础
什么是消息队列?
答案: 消息队列(Message Queue)是一种异步通信机制,用于系统间的解耦和削峰填谷。
核心概念:
- 生产者(Producer):发送消息
- 消费者(Consumer):接收消息
- 队列(Queue):存储消息
- 交换机(Exchange):路由消息
优点:
- 解耦:生产者和消费者独立
- 异步:提高响应速度
- 削峰:缓冲高并发请求
- 可靠性:消息持久化
缺点:
- 系统复杂度增加
- 消息可能丢失
- 消息顺序性问题
- 消息重复消费
常见的消息队列对比?
答案:
| 特性 | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|
| 语言 | Erlang | Java | Scala |
| 吞吐量 | 万级 | 十万级 | 百万级 |
| 延迟 | 微秒级 | 毫秒级 | 毫秒级 |
| 可用性 | 高(主从) | 非常高(分布式) | 非常高(分布式) |
| 功能 | 丰富 | 丰富 | 简单 |
| 适用场景 | 中小规模 | 大规模 | 日志、大数据 |
RabbitMQ核心概念
RabbitMQ的架构?
答案:
Producer → Exchange → Queue → Consumer
↓
Binding核心组件:
- Broker:消息队列服务器
- Virtual Host:虚拟主机,隔离不同应用
- Exchange:交换机,接收消息并路由
- Queue:队列,存储消息
- Binding:绑定,Exchange和Queue的关系
- Channel:信道,复用TCP连接
RabbitMQ的交换机类型?
答案:
1. Direct Exchange(直连交换机)
- 根据routing key精确匹配
- 一对一路由
java
// 发送消息
channel.basicPublish("direct_exchange", "info", null, message.getBytes());
// 绑定队列
channel.queueBind("queue_info", "direct_exchange", "info");2. Fanout Exchange(扇出交换机)
- 广播模式,忽略routing key
- 发送到所有绑定的队列
java
// 发送消息
channel.basicPublish("fanout_exchange", "", null, message.getBytes());
// 绑定队列
channel.queueBind("queue1", "fanout_exchange", "");
channel.queueBind("queue2", "fanout_exchange", "");3. Topic Exchange(主题交换机)
- 根据routing key模糊匹配
- 支持通配符:
*(一个词)、#(零个或多个词)
java
// 发送消息
channel.basicPublish("topic_exchange", "user.info", null, message.getBytes());
// 绑定队列
channel.queueBind("queue1", "topic_exchange", "user.*"); // 匹配user.info
channel.queueBind("queue2", "topic_exchange", "user.#"); // 匹配user.info
channel.queueBind("queue3", "topic_exchange", "*.info"); // 匹配user.info4. Headers Exchange(头交换机)
- 根据消息头属性匹配
- 很少使用
Spring Boot整合RabbitMQ
如何整合RabbitMQ?
答案:
1. 添加依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2. 配置连接
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /3. 配置交换机和队列
java
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("test_queue", true); // durable=true持久化
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("test_exchange");
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("test_key");
}
}4. 发送消息
java
@Service
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend("test_exchange", "test_key", message);
}
}5. 接收消息
java
@Component
public class Consumer {
@RabbitListener(queues = "test_queue")
public void receive(String message) {
System.out.println("收到消息: " + message);
}
}高级特性
如何保证消息不丢失?
答案:
1. 生产者确认(Publisher Confirms)
yaml
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启确认
publisher-returns: true # 开启退回java
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 确认回调
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败: " + cause);
}
});
// 退回回调
template.setReturnsCallback(returned -> {
System.out.println("消息被退回: " + returned.getMessage());
});
return template;
}
}2. 消息持久化
java
// 交换机持久化
DirectExchange exchange = new DirectExchange("test_exchange", true, false);
// 队列持久化
Queue queue = new Queue("test_queue", true);
// 消息持久化
rabbitTemplate.convertAndSend("test_exchange", "test_key", message,
msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});3. 消费者确认(ACK)
yaml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认java
@RabbitListener(queues = "test_queue")
public void receive(Message message, Channel channel) throws IOException {
try {
String msg = new String(message.getBody());
System.out.println("收到消息: " + msg);
// 业务处理
// ...
// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}如何避免消息重复消费?
答案:
1. 消息幂等性
- 使用唯一ID(如订单号)
- 数据库唯一索引
- Redis去重
java
@RabbitListener(queues = "test_queue")
public void receive(String message) {
String messageId = extractMessageId(message);
// Redis去重
if (redisTemplate.opsForValue().setIfAbsent(messageId, "1", 1, TimeUnit.HOURS)) {
// 处理消息
processMessage(message);
} else {
System.out.println("消息已处理,跳过");
}
}2. 数据库唯一索引
sql
CREATE TABLE message_log (
message_id VARCHAR(64) PRIMARY KEY,
content TEXT,
create_time DATETIME
);如何保证消息顺序性?
答案:
问题:多个消费者并发消费,无法保证顺序
解决方案:
1. 单队列单消费者
- 简单但性能低
2. 根据业务分队列
- 相同订单的消息发到同一队列
- 使用routing key区分
java
// 发送消息时指定routing key
String routingKey = "order_" + (orderId % 10); // 根据订单ID取模
rabbitTemplate.convertAndSend("order_exchange", routingKey, message);3. 消息分组
- 使用RabbitMQ的一致性哈希交换机
如何处理死信队列?
答案:
死信(Dead Letter)产生原因:
- 消息被拒绝(basic.reject/basic.nack)且requeue=false
- 消息TTL过期
- 队列达到最大长度
配置死信队列:
java
@Configuration
public class DeadLetterConfig {
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dead_exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dead_key"); // 死信routing key
args.put("x-message-ttl", 10000); // 消息TTL 10秒
return new Queue("normal_queue", true, false, false, args);
}
// 死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange("dead_exchange");
}
// 死信队列
@Bean
public Queue deadQueue() {
return new Queue("dead_queue", true);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_key");
}
}消费死信:
java
@RabbitListener(queues = "dead_queue")
public void receiveDeadLetter(String message) {
System.out.println("收到死信: " + message);
// 记录日志、发送告警等
}如何实现延时队列?
答案:
方案1:TTL + 死信队列
java
// 发送延时消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", message,
msg -> {
msg.getMessageProperties().setExpiration("10000"); // 10秒后过期
return msg;
});方案2:延时插件
bash
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchangejava
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
}
// 发送延时消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", message,
msg -> {
msg.getMessageProperties().setDelay(10000); // 延时10秒
return msg;
});练习题
- RabbitMQ如何实现高可用?
- RabbitMQ的消息堆积如何处理?
- RabbitMQ和Kafka如何选择?
- 如何监控RabbitMQ?