应10年php大佬推荐,记录一下rabbitmq的使用,最后一节以RabbitMQ和Redis实现秒杀Demo,推荐和隔壁篇《redis总结与在java中的简单使用》共同食用。
一、基础篇
1.基本概念
1-1.MQ的作用
以下取自维基百科:
在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。[1]
一个 WIMP 环境像是 Microsoft Windows,借由优先的某些形式(通常是事件的时间或是重要性的顺序)来存储用户产生的事件到一个 事件贮列 中。系统把每个事件从事件贮列中传递给目标的应用程序。
一般用作:解耦、存储、扩展、削峰、缓冲、异步等。
1-2.关于RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ官网:www.rabbitmq.com
- 虚拟主机:RabbitMQ有一个默认的虚拟主机,每个虚拟主机都有多个交换机,队列。
- 队列:如上,消息的暂存。
- 交换机:交换机与队列绑定,用于向队列转发消息。若没有绑定队列,直接丢弃消息。
- 路由键:至于交换机如何知道向哪个队列转发消息,就以路由键控制。
2.准备工作
2-1.安装(windows)
官网下载rabbitmq-server,运行安装程序会检测Erlang环境:

点击是前往下载OTP xx版本 Windows 64-bit Binary File,安装完Erlang继续安装RabbitMQ。
安装完成后进入sbin路径下打开cmd,输入 rabbitmq-plugins.bat enable rabbitmq_management 命令安装可视化界面:
完成后,可以通过 http://127.0.0.1:15672/ 查看详细信息。默认账号密码都是guest。若想恢复配置或想清空队列,可输入rabbitmqctl stop_app,rabbitmqctl reset、rabbitmqctl start_app三条命令以重置。
2-2.创建测试项目(springboot)

创建时选择引入MabbitMQ依赖,这里包括生产者消费者两个模块。配置文件:
1 2 3 4
| spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
|
2-3.模型
官网说明提供了6种模型,除了RPC,分别为基本消息模型、work消息模型、Fanout订阅模型、Direct订阅模型、Topic订阅模型。
3.基本使用
3-1.一对一基本消息模型
producer下新建配置类:
1 2 3 4 5 6 7
| @Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("Queue_test1"); } }
|
新建一个名为“Queue_test1”的队列,新建测试方法send1():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @SpringBootTest class ProducerApplicationTests {
@Test void contextLoads() { } @Autowired RabbitTemplate rabbitTemplate;
@Test void send1(){ String msg = "msg_test1"; rabbitTemplate.convertAndSend("Queue_test1", msg); } }
|
运行一下send1()方法。打开15672可视化界面:

发现多了一个队列,点击它,并查看message:

留意框住部分,在测试方法中我们并没有指定交换机,然而这个队列自动与默认的交换机绑定了。查询资料得知,当新增了一个Queue时,会生成一个与Queue同名的路由键RoutingKey并告知默认交换机。
接着在consumer新建接收类Listener:
1 2 3 4 5 6 7 8
| @Component public class Listener {
@RabbitListener(queues = "Queue_test1") private void receiver(String msg){ System.out.println("接收到信息:" + msg); } }
|
运行后控制台输出了msg_test1,查看可视化界面,该队列的消息也被消费了。
注意:使用@RabbitListener时必须保证所指定的队列存在,比如上面@RabbitListener(queues = “Queue_test1”)必须保证存在”Queue_test1”队列,否则启动报错。所以,若先启动consumer消费者模块(即监听方)的话,要把新建队列的配置类(即上面的RabbitConfig)放在consumer下。此处仅为了显示可视化界面的队列消息而先启动生产者,后面都会先启动消费者。
3-2.一对多的work模型
consumer新建队列:
1 2 3 4 5 6 7
| @Configuration public class RabbitConfig { @Bean public Queue Queue2() { return new Queue("Queue_test2"); } }
|
consumer新建两个接收者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component public class Listener {
@RabbitListener(queues = "Queue_test1") private void receiver(String msg){ System.out.println("接收到信息:" + msg); } @RabbitListener(queues = "Queue_test2") private void receiver1(String msg){ System.out.println("接收者1号接收到信息:" + msg); }
@RabbitListener(queues = "Queue_test2") private void receiver2(String msg){ System.out.println("接收者2号接收到信息:" + msg); } }
|
producer新建测试方法send2(),直接生成20条消息:
1 2 3 4 5 6 7
| @Test void send2(){ String msg = "msg_test2_"; for (int i = 1; i <= 20; i++){ rabbitTemplate.convertAndSend("Queue_test2", msg + i); } }
|
首先运行consumer项目,再运行producer的send2()方法,接收者结果:
结果每个接收者都消费了10条消息,测试多次均为消息平均分配。
3-3.Topic模式
在开始前先理一下各个关系。首先我们有一个TopicExchange交换机,它有可能跟多个队列绑定,生产者发送消息首先到达交换机,交换机根据绑定配置匹配到对应的队列,然后向这些队列转发消息,消费者监听的队列一旦有了消息,就可以开始消费。
consumer的RabbitConfig类新建两个队列,一个交换机并按规则绑定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Configuration public class RabbitConfig { @Bean public Queue Queue2() { return new Queue("Queue_test2"); }
@Bean public Queue topicQueue1() { return new Queue("topic.aaa"); }
@Bean public Queue topicQueue2() { return new Queue("topic.bbb"); }
@Bean TopicExchange topicExchange() { return new TopicExchange("topic"); }
@Bean Binding bindingExchangeA(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1).to(topicExchange).with("topic.*"); }
@Bean Binding bindingExchangeB(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#"); } }
|
Listener类新增两个接收者3和4:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "topic.aaa") private void receiver3(String msg){ System.out.println("接收者3号接收到信息:" + msg); }
@RabbitListener(queues = "topic.bbb") private void receiver4(String msg){ System.out.println("接收者4号接收到信息:" + msg); }
|
producer新增两个测试方法:
1 2 3 4 5 6 7 8 9 10 11
| @Test void send3(){ String msg = "msg_test3"; rabbitTemplate.convertAndSend("topic","topic.abcd", msg); }
@Test void send4(){ String msg = "msg_test4"; rabbitTemplate.convertAndSend("topic","topic.abcd.efg", msg); }
|
首先启动消费者consumer,在运行producer的send3()方法和send4()方法:

证明msg_test3消息发向名为topic的交换机时,交换机根据匹配规则转发到两个队列,于是分别监听这两个队列的消费者就都能消费到这条消息。但msg_test4这条消息只会被交换机转发到一个队列。结果与预期相符。
3-4.Fanout模型
Fanout类似于广播-订阅模式,队列只要跟某个交换机绑定了,生产者只要向此交换机发送消息(广播),监听(订阅)这些队列的消费者都可以收到消息。
RabbitConfig类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Bean public Queue fanoutQueue1() { return new Queue("fanout.aaa"); }
@Bean public Queue fanoutQueue2() { return new Queue("fanout.bbb"); }
@Bean public Queue fanoutQueue3() { return new Queue("fanout.ccc"); }
@Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanout"); }
@Bean Binding bindingExchange1(Queue fanoutQueue1,FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }
@Bean Binding bindingExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); }
@Bean Binding bindingExchange3(Queue fanoutQueue3, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange); }
|
Listener类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RabbitListener(queues = "fanout.aaa") private void receiver5(String msg){ System.out.println("接收者5号接收到信息:" + msg); }
@RabbitListener(queues = "fanout.bbb") private void receiver6(String msg){ System.out.println("接收者6号接收到信息:" + msg); }
@RabbitListener(queues = "fanout.bbb") private void receiver7(String msg){ System.out.println("接收者7号接收到信息:" + msg); }
|
测试方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test void send5(){ String msg = "msg_test5"; rabbitTemplate.convertAndSend("fanout","", msg); }
@Test void send6(){ String msg = "msg_test6"; rabbitTemplate.convertAndSend("fanout","fanout.aaa", msg); }
@Test void send7(){ String msg = "msg_test7"; rabbitTemplate.convertAndSend("fanout","xxx", msg); }
|
运行3个测试方法结果:

可以看到,此模式下,无论生产者有没有指定了路由键,甚至指向一个错误的路由键,都不影响交换机向绑定的队列转发消息,即队列一旦绑定(订阅)了这个交换机,就会收到消息。
3-5.Direct模型
既然Fanout不指定路由键,那么能不能指定呢,用Direct模型可以在Fanout的基础上加上指定路由键。
RabbitConfig类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Bean public Queue directQueue1() { return new Queue("direct.aaa"); }
@Bean public Queue directQueue2() { return new Queue("direct.bbb"); }
@Bean DirectExchange directExchange() { return new DirectExchange("direct"); }
@Bean Binding bindingExchange4(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder.bind(directQueue1).to(directExchange).with("aaa"); }
@Bean Binding bindingExchange5(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder.bind(directQueue2).to(directExchange).with("bbb"); }
|
Listener类:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "direct.aaa") private void receiver8(String msg){ System.out.println("接收者8号接收到信息:" + msg); }
@RabbitListener(queues = "direct.bbb") private void receiver9(String msg){ System.out.println("接收者9号接收到信息:" + msg); }
|
测试方法:
1 2 3 4 5 6 7 8 9 10 11
| @Test void send8(){ String msg = "msg_test8"; rabbitTemplate.convertAndSend("direct","xxx", msg); }
@Test void send9(){ String msg = "msg_test9"; rabbitTemplate.convertAndSend("direct","aaa", msg); }
|
运行8,9测试方法结果:

测试8因为指定了路由键为“xxx”,交换机无法找到与”xxx”对应的队列,所以丢弃了消息。而路由键“aaa”的消息被交换机根据配置转发到了名为direct.aaa的队列,于是可以被接收者8号接收到。
RabbitMQ的基本用法如上,当然,每种用法都有其适用的场景,我们将在下一节进行实战。
二、实战篇
此篇举例适用场景。每个场景不一定只用到了MQ的某一个特点,只是选择其中一个点进行分析。
在实际场景中,消息生产者和消费者很大可能不在一个模块或一个项目中,如上面的基础篇示例。而为了简化编程,以下示例不再分为producer和consumer,直接写在同一模块。
1.异步
模拟用户登录和修改昵称,记录用户操作。用户操作记录不属于业务流程,而且在业务代码上加入处理会增加耗时,所以可以利用消息队列进行异步操作。
新建log模块,创建用户操作类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Data public class UserOperation implements Serializable {
private String username;
private String ip;
private String operation;
private Date date;
public UserOperation(String username, String ip, String operation, Date date) { this.username = username; this.ip = ip; this.operation = operation; this.date = date; } }
|
注意若用mq传输对象需要序列化。
创建RabbitConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class RabbitConfig {
@Bean public Queue logQueue() { return new Queue("queue.log"); }
@Bean DirectExchange logExchange() { return new DirectExchange("exchange.log"); }
@Bean Binding bindingLogExchange(Queue logQueue, DirectExchange logExchange) { return BindingBuilder.bind(logQueue).to(logExchange).with("log"); } }
|
创建接收类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Component public class LogListener {
@RabbitListener(queues = "queue.log") private void logReceiver(UserOperation userOperation){ System.out.println("日志接收者接收到信息:" + userOperation.toString()); writingLog(userOperation); }
private void writingLog(UserOperation userOperation){ System.out.println("写入数据库成功"); } }
|
创建测试控制器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @RestController public class UserController {
@Autowired RabbitTemplate rabbitTemplate;
@PostMapping("/login") private String login(String username, String password, HttpServletRequest httpServletRequest){ if (login(username)){ UserOperation userOperation = new UserOperation(username, httpServletRequest.getRemoteAddr(), "用户登录", new Date()); rabbitTemplate.convertAndSend("exchange.log","log", userOperation); return "login success"; } return "login fail"; }
@PutMapping("/nickname") private String modifyNickname(String username, String nickname, HttpServletRequest httpServletRequest){ if (modifyNickname(nickname)){ UserOperation userOperation = new UserOperation(username, httpServletRequest.getRemoteAddr(), "修改昵称", new Date()); rabbitTemplate.convertAndSend("exchange", "log", userOperation); return "modify nickname success"; } return "modify nickname fail"; }
private boolean login(String username){ return !StringUtils.isEmpty(username); }
private boolean modifyNickname(String nickname){ return !StringUtils.isEmpty(nickname); } }
|
启动项目,用postman发起登录和修改昵称请求,观察控制台输出:

整个过程请求接口的处理逻辑中只包含业务流程,并不需要等待操作记录完成数据库写入才返回结果,达到了异步的效果。同样在“账号注册后邮箱验证”这一场景也十分适用这种方法,客户端不必等待邮件发送成功才返回注册结果。
2.解耦
订单系统与库存系统解耦。若订单系统直接与库存系统通讯,当库存系统出现故障,那么订单系统也会跟着出现等待甚至崩溃的现象,然而在很多情况下(比如库存充足),订单系统没必要在接收订单时访问库存系统后才返回下订结果。
新建order模块,直接show code:
1 2 3 4 5 6 7 8 9
| @Data public class Order implements Serializable {
private Integer orderId;
private Integer shopId;
private Integer count; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class RabbitConfig {
@Bean public Queue orderQueue() { return new Queue("queue.order"); }
@Bean DirectExchange orderExchange() { return new DirectExchange("exchange.order"); }
@Bean Binding bindingOrderExchange(Queue orderQueue, DirectExchange orderExchange) { return BindingBuilder.bind(orderQueue).to(orderExchange).with("order"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class OrderListener {
@RabbitListener(queues = "queue.order") private void orderReceiver(Order order){ System.out.println("订单接收者接收到信息:" + order.toString()); handle(order); }
private void handle(Order order){ try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("库存系统已扣减"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @RestController public class OrderController {
@Autowired RabbitTemplate rabbitTemplate;
@PostMapping("/order") private String order(Order order){ for (int i = 1; i <= 5; i++){ order.setOrderId(i); rabbitTemplate.convertAndSend("exchange.order", "order", order); System.out.println("已生成订单,订单号:" + order.getOrderId()); } return "order success"; } }
|
访问接口,结果:

订单同时就生成出来了,并没有等待库存系统的模拟5秒延时操作。若库存系统崩溃,也不会影响到订单系统的正常服务。
3.流量削峰
流量削峰顾名思义就是在高并发的请求下,用消息队列作为一个缓冲器,其实上个实例中也体现了缓冲的作用。流量削峰在秒杀业务里是一个重要的环节,下面示例将会体现到。
4.综合应用(完整demo)
以下使用RabbitMQ + Redis实现一个简易秒杀Demo,仅分析核心代码,完整代码见文末。
购买的实际场景比较复杂,本次Demo主要完成以下目标:
4-1.使用redis进行用户重复秒杀验证
用户参与秒杀时,先判断该用户是否已参与过本活动。
利用redisson实现分布式锁实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Override public boolean isAlreadySeckill(String userId, String goodsId){ String key = userId + "-" + goodsId; RLock lock = redissonClient.getLock(key); try { lock.lock(); return !redisUtil.setnx(key, "1", 100); } catch (Exception e){ log.error("系统错误:{}", e.getMessage()); return true; } finally { lock.unlock(); } }
|
4-2.下单消息放进队列等待消费
首先定义好交换机、队列和他们的绑定关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Bean public Queue orderQueue(){ return new Queue(orderQueueName); }
@Bean public DirectExchange orderDirectExchange(){ return new DirectExchange(orderExchangeName); }
@Bean public Binding simpleBinding(Queue orderQueue, DirectExchange orderDirectExchange){ return BindingBuilder.bind(orderQueue).to(orderDirectExchange).with(orderRoutingKey); }
|
上述示例直接使用@RabbitListener注解声明一个方法以处理消息的消费,实际场景一般定义一个Listener继承ChannelAwareMessageListener,然后重写onMessage()方法来处理消费逻辑,这样可以手动确认消费,更加灵活:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Slf4j @Component public class OrderListener implements ChannelAwareMessageListener {
@Autowired SeckillService seckillService;
@Override public void onMessage(Message message, Channel channel) throws Exception { try{ long tag = message.getMessageProperties().getDeliveryTag(); String str = new String(message.getBody(),"utf-8"); log.info("消费者接收到的消息:{}",str); JSONObject obj = JSONObject.parseObject(str); Order order = JSONObject.toJavaObject(obj, Order.class); if (seckillService.order(order) == 1){ log.info("下单成功,订单id:{}", order.getId()); } else { log.info("下单失败,库存不足"); } channel.basicAck(tag,true); }catch(Exception e){ log.error("消费者消费消息发生异常:{}",e.getMessage()); } } }
|
下单对应的数据库操作:
1 2 3 4 5 6 7 8 9 10 11
| @Override public int order(Order order) { int updates = goodsMapper.reduceStock(order.getGoodsId()); if (updates > 0) { orderMapper.insert(order); return 1; } else { return 0; } }
|
然后,在用户下单时,向队列发送消息即可,不需要等待数据库操作,至于到底能不能抢到,就看消费者能不能消费成功了。(可以继续考虑消息没有被成功消费的后续处理)
4-3.延时支付——死信队列
成功创建了订单不代表该订单一定有效。一般的业务是,用户需在限定时间内完成支付,不支付视为放弃,此时需要把订单作废,商品库存要重新添加回来。
修改下单代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public int order(Order order) { int updates = goodsMapper.reduceStock(order.getGoodsId()); if (updates > 0) { orderMapper.insert(order); rabbitTemplate.convertAndSend(payExchangeName, payRoutingKey, order); return 1; } else { return 0; } }
|
增加了下单后向支付交换机发送消息。
但是,我们知道交换机不会保留消息,它会根据路由键马上把消息转发到对应队列,显然不足以应付延时支付的业务需求,于是引入了一个难点——死信队列,下面逐步分析。
首先,标准的模式不会变,永远是这样:

可以看到,消息一旦被生产出来放到交换机上,交换机根据路由键立马就会把消息给到对应的队列,消费者一直监听着队列,队列一有消息就会被消费。那死信队列的作用?消息加入死信队列后,可以通过配置,设置过期时间,当到达了过期时间,根据设定的规则把消息丢弃,或者加到某个交换机上。(我个人理解为死信队列充当了一次生产者?不知道这样理解对不对)
于是,对于以上模式,我们做点手脚:

那么,在这种改造下,队列2就是死信队列了。接下来就是该死信队列的设置和如何与交换机2绑定的问题了,这个好办。先完善一下最终的图示:

即最终需要两个交换机,两个队列。
指定死信队列的x-dead-letter-exchange,x-dead-letter-routing-key,x-message-ttl三个属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Bean public Queue deadLetterQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-dead-letter-exchange", deadLetterExchangeName); map.put("x-dead-letter-routing-key", payRoutingKey); map.put("x-message-ttl", ttl); return new Queue(deadLetterQueueName, true, false, false, map); }
|
绑定照旧:
1 2 3 4 5 6 7 8
|
@Bean public Binding payExAndDeadLetterQueueBinding(Queue deadLetterQueue, DirectExchange payDirectExchange){ return BindingBuilder.bind(deadLetterQueue).to(payDirectExchange).with(payRoutingKey); }
|
经过ttl设定的时间后,消费到达死信交换机,死信交换机把消息转发到支付队列,消费者开始消费:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Override public void onMessage(Message message, Channel channel) throws Exception { try { Long tag = message.getMessageProperties().getDeliveryTag(); String str = new String(message.getBody(), "utf-8"); log.info("消费者接收到的消息:{}",str); JSONObject obj = JSONObject.parseObject(str); String orderId = obj.getString("id"); if (payService.checkPay(orderId) == 1){ log.info("交易成功,订单id:{}", orderId); } else { log.info("交易失败,订单id:{}未支付", orderId); } channel.basicAck(tag, true); }catch(Exception e){ log.error("消费者消费消息发生异常:{}",e.getMessage()); } }
|
checkPay()方法检查订单是否已支付,订单的支付状态会在用户支付后改变。
测试一下:


源码地址: