消息队列

想象一家生意火爆的奶茶店。如果顾客点了单、店员立刻做、做完立刻喊号——顾客挤满吧台,店员手忙脚乱。聪明的老板会在前台放一个取号牌架:顾客点单后拿号走开,店员按号做,做完叫号——大家各跑各的,互不阻塞。

这个”取号牌架”就是消息队列(Message Queue,MQ)。它是个中间人——生产者把消息丢进来就走,消费者按自己的节奏取。简单到不能再简单,却解决了三个让系统头疼的问题:异步、解耦、削峰。

这一章我们看消息队列为什么这么重要,RabbitMQ 和 Kafka 两大主流长什么样,以及 Java 怎么和它们打交道。

一、为什么需要消息队列

1.1 三大作用

异步 —— 用户注册时,要发欢迎邮件、发短信、送积分。如果同步调用,用户要等所有事情做完才返回;用消息队列,主流程只把”注册成功”消息丢进队列就返回,邮件/短信/积分各消费各的——用户感觉飞快。

解耦 —— 订单服务下单后要通知库存、物流、积分服务。如果直接调用,订单服务要依赖三个服务,任一挂了订单就失败。用消息队列,订单只管发消息,谁消费、怎么消费、何时消费,订单不关心——订单与下游服务解耦

削峰 —— 秒杀活动瞬时 10 万请求,数据库扛不住。请求先丢进消息队列,消费者按数据库能承受的速度慢慢处理——峰值被队列”削平”了。

1.2 代价

消息队列不是免费的午餐:

  • 复杂度上升——多一个组件,多一种故障可能。
  • 延迟增加——异步意味着不立即可见,用户要等消费者处理完。
  • 一致性问题——消息可能丢失、重复、乱序,需要专门处理。

二、RabbitMQ:灵巧的邮差

RabbitMQ 是 Erlang 写的消息中间件,基于 AMQP(Advanced Message Queuing Protocol)协议。它的模型像一个邮政系统——发件人不知道收件人是谁,只把信投到邮筒(Exchange),邮局按规则(Binding)把信投到信箱(Queue),收件人从信箱取信。

2.1 核心概念

Producer → Exchange ──(binding)──→ Queue → Consumer
  • Producer(生产者)——发消息的。
  • Exchange(交换机)——消息的”分拣中心”,按规则把消息路由到一个或多个队列。
  • Queue(队列)——消息存储的地方,FIFO。
  • Binding(绑定)——Exchange 和 Queue 之间的关联规则。
  • Consumer(消费者)——取消息的。

2.2 四种 Exchange 类型

类型路由规则场景
Direct按 routing key 完全匹配点对点定向
Fanout广播到所有绑定的队列群发
Topic按 routing key 模式匹配(通配符)订阅子集
Headers按 header 匹配(不用 routing key)复杂条件

Direct 例子——routing key 是 order.create,只有绑定了 order.create 的队列能收到。

Fanout 例子——广播日志,所有日志队列都收到同一条消息。

Topic 例子——routing key 是 order.create.paid,绑定模式 order.create.* 能匹配,order.# 也能匹配(* 匹配一段,# 匹配多段)。

2.3 Java 客户端示例

// 生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
     Channel channel = conn.createChannel()) {
    // 声明队列
    channel.queueDeclare("hello", false, false, false, null);
    // 发消息
    String message = "Hello RabbitMQ!";
    channel.basicPublish("", "hello", null, message.getBytes());
}

// 消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("hello", false, false, false, null);
DeliverCallback callback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("收到: " + message);
};
channel.basicConsume("hello", true, callback, consumerTag -> {});

2.4 Spring Boot 整合

Spring Boot 把 RabbitMQ 封装得极简——用注解消费,用 RabbitTemplate 发送:

// 生产者
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbit;

    public void createOrder(Order order) {
        rabbit.convertAndSend("order.exchange", "order.create", order);
    }
}

// 消费者
@Component
public class OrderConsumer {
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("order.queue"),
        exchange = @Exchange("order.exchange"),
        key = "order.create"
    ))
    public void handle(Order order) {
        System.out.println("处理订单: " + order.getId());
    }
}

三、Kafka:日志的巨人

Kafka 由 LinkedIn 开源,用 Scala 写。它的设计哲学和 RabbitMQ 完全不同——RabbitMQ 像邮局,Kafka 像日志。消息按顺序追加到一个不可变日志,消费者各自维护”读到哪了”(offset)。

3.1 核心概念

  • Topic(主题)——消息分类,类似 RabbitMQ 的 routing key。
  • Partition(分区)——一个 Topic 切成多个分区,分布在不同 broker,是 Kafka 水平扩展的关键。
  • Offset(偏移量)——消费者在分区中的位置,由消费者自己管理。
  • Consumer Group(消费者组)——同组消费者分担消费一个 Topic,每个分区只被组内一个消费者消费。
  • Broker(代理)——Kafka 集群中的一个服务器节点。
Topic: order-events (3 partitions)
  Partition 0: [msg0, msg1, msg4, msg7, ...]
  Partition 1: [msg2, msg3, msg5, msg8, ...]
  Partition 2: [msg6, msg9, msg10, ...]

3.2 Kafka vs RabbitMQ

维度RabbitMQKafka
模型队列(推/拉)分区日志(拉)
消息保留消费后删除按时间/大小保留
吞吐量万级 QPS百万级 QPS
延迟微秒级毫秒级
顺序队列内有序分区内有序
场景业务消息、任务分发日志、流处理、事件溯源

简单说——RabbitMQ 适合业务消息(订单、通知、任务),Kafka 适合大数据流(日志、监控、用户行为)。

3.3 Java 客户端示例

// 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("order-events", "order-1", "created"));
producer.close();

// 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-group");
props.put("key.deserializer", "StringDeserializer");
props.put("value.deserializer", "StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("order-events"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> r : records) {
        System.out.println("收到: " + r.value());
    }
}

3.4 Spring Boot 整合

// 生产者
@Service
public class EventPublisher {
    @Autowired
    private KafkaTemplate<String, String> kafka;

    public void publish(String event) {
        kafka.send("order-events", event);
    }
}

// 消费者
@Component
public class EventConsumer {
    @KafkaListener(topics = "order-events", groupId = "order-group")
    public void handle(String event) {
        System.out.println("消费: " + event);
    }
}

四、消息可靠性:不丢不重不乱

消息队列的三大难题——消息丢失、消息重复、消息乱序

4.1 生产者确认:消息不丢

生产者发消息,怎么知道 broker 收到了?

  • RabbitMQ——开启 publisher confirm,broker 收到后回 ack。
  • Kafka——配置 acks=all,所有副本都确认才算成功。
// Kafka 生产者配置
props.put("acks", "all");              // 所有副本确认
props.put("retries", Integer.MAX_VALUE); // 失败重试
props.put("enable.idempotence", "true"); // 幂等生产

4.2 消费者确认:处理完再确认

消费者取到消息,处理到一半崩了,消息算消费了吗?

  • RabbitMQ——手动 ACK,处理完才告诉 broker”我消费完了”。自动 ACK 会在消息一发出就确认,处理失败消息就丢了。
  • Kafka——手动提交 offset,处理完再 consumer.commitSync()
// RabbitMQ 手动 ACK
channel.basicConsume("queue", false, (tag, delivery) -> {
    try {
        process(delivery);          // 处理业务
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  // 确认
    } catch (Exception e) {
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); // 重新入队
    }
}, tag -> {});

4.3 死信队列:处理失败的消息

消息消费失败、过期、队列满了,会被丢到死信队列(Dead Letter Queue,DLQ)。死信队列可以被另一个消费者监听,做告警、重试、人工处理。

// RabbitMQ 死信队列: 绑定时指定 x-dead-letter-exchange
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("main.queue", false, false, false, args);

4.4 幂等:消息不重复处理

网络抖动会让消息重复投递——同一条订单消息消费两次,库存扣两次就糟了。幂等(Idempotent)——同一操作执行多次结果一致。

常用方案:

  • 业务唯一键——消费前查”这条消息处理过没”,处理过就跳过。
  • 数据库唯一约束——用消息 ID 作为唯一键,重复插入会失败。
  • Redis 去重——SETNX msgId 1,已存在就跳过。
@Transactional
public void consume(OrderMessage msg) {
    if (redis.setNx("msg:" + msg.getId(), "1", 86400)) {
        // 第一次处理
        orderService.process(msg);
    } else {
        log.info("消息 {} 已处理, 跳过", msg.getId());
    }
}

4.5 顺序:分区内有序

Kafka 保证分区内有序——同一 key 的消息进同一分区,消费时按 offset 顺序。跨分区不保证。

RabbitMQ 队列内 FIFO——天然有序。但多消费者分担消费时,处理快慢不同,最终顺序可能乱——需要单消费者或顺序锁。

五、其他常见消息队列

  • RocketMQ——阿里开源,Java 写,事务消息支持好,国内电商常用。
  • ActiveMQ——老牌,JMS 规范实现,逐渐被取代。
  • Pulsar——Yahoo 开源,存算分离,云原生新秀。

六、实战:用 Java SE 模拟消息队列

Piston 在线环境跑不了 RabbitMQ/Kafka 服务器。我们用 Java SE 的 BlockingQueue 模拟一个迷你消息队列——支持 Topic 订阅、多消费者、消息确认、死信处理,演示消息队列的核心机制。

Java · 在线运行

观察重点:消费者处理失败会重试,超过 3 次进死信队列;同 ID 消息被幂等去重,不会重复处理;削峰场景下,生产者瞬时发 20 条,消费者按自己节奏慢慢消费——队列吸收了峰值。

七、消息队列的取舍

选 MQ 时要考虑:

需求推荐
业务消息、任务分发、延迟队列RabbitMQ
高吞吐日志、流处理、事件溯源Kafka
事务消息、电商订单RocketMQ
简单内部异步Redis Stream / Java BlockingQueue

不要为了用 MQ 而用——内部简单异步用 BlockingQueue 就够了,引入 MQ 等于引入运维负担。

八、本章小结

知识点要点
三大作用异步、解耦、削峰
RabbitMQ 模型Exchange → Binding → Queue
四种 ExchangeDirect(点对点)/ Fanout(广播)/ Topic(通配)/ Headers
Kafka 模型Topic → Partition → Offset
Kafka vs RabbitMQRabbitMQ 业务消息,Kafka 大数据流
可靠性生产者确认 + 消费者手动 ACK + 死信队列
幂等业务唯一键 / 数据库唯一约束 / Redis SETNX
顺序Kafka 分区内有序,RabbitMQ 队列内有序

记忆口诀

  • 三作用——异(异步)解(解耦)削(削峰)。
  • RabbitMQ 四路由——直(Direct)广(Fanout)题(Topic)头(Headers)。
  • Kafka 三件套——主(Topic)分(Partition)偏(Offset)。
  • 可靠性三招——生产确认、消费 ACK、死信兜底。
  • 幂等三法——唯一键、唯一约束、SETNX 去重。

结语

消息队列让系统从”同步紧耦合”走向”异步松耦合”——生产者和消费者各跑各的,靠队列这座桥传递消息。它换来了吞吐和韧性,代价是延迟、复杂度、一致性的权衡。

下一章我们看微服务——当单体应用大到无法维护时,怎么把它拆成多个独立服务,每个服务各自部署、各自演进。Spring Cloud、Nacos、Gateway、Sentinel,这是云原生时代的全家桶。