RabbitMQ基本用法与场景实例

应10年php大佬推荐,记录一下rabbitmq的使用,最后一节以RabbitMQ和Redis实现秒杀Demo,推荐和隔壁篇《redis总结与在java中的简单使用》共同食用。

一、基础篇


1.基本概念

1-1.MQ的作用

以下取自维基百科:

计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。[1]

一个 WIMP 环境像是 Microsoft Windows,借由优先的某些形式(通常是事件的时间或是重要性的顺序)来存储用户产生的事件到一个 事件贮列 中。系统把每个事件从事件贮列中传递给目标的应用程序。

一般用作:解耦、存储、扩展、削峰、缓冲、异步等。

1-2.关于RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端

RabbitMQ官网:www.rabbitmq.com

  • 虚拟主机:RabbitMQ有一个默认的虚拟主机,每个虚拟主机都有多个交换机,队列。
  • 队列:如上,消息的暂存。
  • 交换机:交换机与队列绑定,用于向队列转发消息。若没有绑定队列,直接丢弃消息。
  • 路由键:至于交换机如何知道向哪个队列转发消息,就以路由键控制。

2.准备工作

2-1.安装(windows)

官网下载rabbitmq-server,运行安装程序会检测Erlang环境:

点击是前往下载OTP xx版本 Windows 64-bit Binary File,安装完Erlang继续安装RabbitMQ。

安装完成后进入sbin路径下打开cmd,输入 rabbitmq-plugins.bat enable rabbitmq_management 命令安装可视化界面:

完成后,可以通过 http://127.0.0.1:15672/ 查看详细信息。默认账号密码都是guest。若想恢复配置或想清空队列,可输入rabbitmqctl stop_app,rabbitmqctl reset、rabbitmqctl start_app三条命令以重置。

2-2.创建测试项目(springboot)

创建时选择引入MabbitMQ依赖,这里包括生产者消费者两个模块。配置文件:

1
2
3
4
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2-3.模型

官网说明提供了6种模型,除了RPC,分别为基本消息模型、work消息模型、Fanout订阅模型、Direct订阅模型、Topic订阅模型。

3.基本使用

3-1.一对一基本消息模型

producer下新建配置类:

1
2
3
4
5
6
7
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
return new Queue("Queue_test1");
}
}

新建一个名为“Queue_test1”的队列,新建测试方法send1():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@SpringBootTest
class ProducerApplicationTests {

@Test
void contextLoads() {
}

@Autowired
RabbitTemplate rabbitTemplate;

@Test
void send1(){
String msg = "msg_test1";
rabbitTemplate.convertAndSend("Queue_test1", msg);
}
}

运行一下send1()方法。打开15672可视化界面:

发现多了一个队列,点击它,并查看message:

留意框住部分,在测试方法中我们并没有指定交换机,然而这个队列自动与默认的交换机绑定了。查询资料得知,当新增了一个Queue时,会生成一个与Queue同名的路由键RoutingKey并告知默认交换机。

接着在consumer新建接收类Listener:

1
2
3
4
5
6
7
8
@Component
public class Listener {

@RabbitListener(queues = "Queue_test1")
private void receiver(String msg){
System.out.println("接收到信息:" + msg);
}
}

运行后控制台输出了msg_test1,查看可视化界面,该队列的消息也被消费了。

注意:使用@RabbitListener时必须保证所指定的队列存在,比如上面@RabbitListener(queues = “Queue_test1”)必须保证存在”Queue_test1”队列,否则启动报错。所以,若先启动consumer消费者模块(即监听方)的话,要把新建队列的配置类(即上面的RabbitConfig)放在consumer下。此处仅为了显示可视化界面的队列消息而先启动生产者,后面都会先启动消费者。

3-2.一对多的work模型

consumer新建队列:

1
2
3
4
5
6
7
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue2() {
return new Queue("Queue_test2");
}
}

consumer新建两个接收者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class Listener {

@RabbitListener(queues = "Queue_test1")
private void receiver(String msg){
System.out.println("接收到信息:" + msg);
}

@RabbitListener(queues = "Queue_test2")
private void receiver1(String msg){
System.out.println("接收者1号接收到信息:" + msg);
}

@RabbitListener(queues = "Queue_test2")
private void receiver2(String msg){
System.out.println("接收者2号接收到信息:" + msg);
}
}

producer新建测试方法send2(),直接生成20条消息:

1
2
3
4
5
6
7
@Test
void send2(){
String msg = "msg_test2_";
for (int i = 1; i <= 20; i++){
rabbitTemplate.convertAndSend("Queue_test2", msg + i);
}
}

首先运行consumer项目,再运行producer的send2()方法,接收者结果:

结果每个接收者都消费了10条消息,测试多次均为消息平均分配。

3-3.Topic模式

在开始前先理一下各个关系。首先我们有一个TopicExchange交换机,它有可能跟多个队列绑定,生产者发送消息首先到达交换机,交换机根据绑定配置匹配到对应的队列,然后向这些队列转发消息,消费者监听的队列一旦有了消息,就可以开始消费。

consumer的RabbitConfig类新建两个队列,一个交换机并按规则绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue2() {
return new Queue("Queue_test2");
}

@Bean
public Queue topicQueue1() {
// 创建一个名为topic.aaa的队列
return new Queue("topic.aaa");
}

@Bean
public Queue topicQueue2() {
// 创建一个名为topic.bbb的队列
return new Queue("topic.bbb");
}

@Bean
TopicExchange topicExchange() {
// 创建一个名为topic的交换机,类型为TopicExchange模型
return new TopicExchange("topic");
}

@Bean
Binding bindingExchangeA(Queue topicQueue1, TopicExchange topicExchange) {
// 生产者向名为topic.xxx的路由键发送的消息都可以转发到topicQueue1队列
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("topic.*");
}

@Bean
Binding bindingExchangeB(Queue topicQueue2, TopicExchange topicExchange) {
// 生产者向名为topic.xxx、topic.xxx.xxx等等的路由键发送的消息都可以转发到topicQueue2队列
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#");
}
}

Listener类新增两个接收者3和4:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "topic.aaa")
private void receiver3(String msg){
System.out.println("接收者3号接收到信息:" + msg);
}

@RabbitListener(queues = "topic.bbb")
private void receiver4(String msg){
System.out.println("接收者4号接收到信息:" + msg);
}

producer新增两个测试方法:

1
2
3
4
5
6
7
8
9
10
11
@Test
void send3(){
String msg = "msg_test3";
rabbitTemplate.convertAndSend("topic","topic.abcd", msg);
}

@Test
void send4(){
String msg = "msg_test4";
rabbitTemplate.convertAndSend("topic","topic.abcd.efg", msg);
}

首先启动消费者consumer,在运行producer的send3()方法和send4()方法:

证明msg_test3消息发向名为topic的交换机时,交换机根据匹配规则转发到两个队列,于是分别监听这两个队列的消费者就都能消费到这条消息。但msg_test4这条消息只会被交换机转发到一个队列。结果与预期相符。

3-4.Fanout模型

Fanout类似于广播-订阅模式,队列只要跟某个交换机绑定了,生产者只要向此交换机发送消息(广播),监听(订阅)这些队列的消费者都可以收到消息。

RabbitConfig类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.aaa");
}

@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.bbb");
}

@Bean
public Queue fanoutQueue3() {
return new Queue("fanout.ccc");
}

@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout");
}

@Bean
Binding bindingExchange1(Queue fanoutQueue1,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

@Bean
Binding bindingExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

@Bean
Binding bindingExchange3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}

Listener类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RabbitListener(queues = "fanout.aaa")
private void receiver5(String msg){
System.out.println("接收者5号接收到信息:" + msg);
}

@RabbitListener(queues = "fanout.bbb")
private void receiver6(String msg){
System.out.println("接收者6号接收到信息:" + msg);
}

@RabbitListener(queues = "fanout.bbb")
private void receiver7(String msg){
System.out.println("接收者7号接收到信息:" + msg);
}

测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
void send5(){
String msg = "msg_test5";
rabbitTemplate.convertAndSend("fanout","", msg);
}

@Test
void send6(){
String msg = "msg_test6";
rabbitTemplate.convertAndSend("fanout","fanout.aaa", msg);
}

@Test
void send7(){
String msg = "msg_test7";
rabbitTemplate.convertAndSend("fanout","xxx", msg);
}

运行3个测试方法结果:

可以看到,此模式下,无论生产者有没有指定了路由键,甚至指向一个错误的路由键,都不影响交换机向绑定的队列转发消息,即队列一旦绑定(订阅)了这个交换机,就会收到消息。

3-5.Direct模型

既然Fanout不指定路由键,那么能不能指定呢,用Direct模型可以在Fanout的基础上加上指定路由键。

RabbitConfig类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Bean
public Queue directQueue1() {
return new Queue("direct.aaa");
}

@Bean
public Queue directQueue2() {
return new Queue("direct.bbb");
}

@Bean
DirectExchange directExchange() {
return new DirectExchange("direct");
}

@Bean
Binding bindingExchange4(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("aaa");
}

@Bean
Binding bindingExchange5(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("bbb");
}

Listener类:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "direct.aaa")
private void receiver8(String msg){
System.out.println("接收者8号接收到信息:" + msg);
}

@RabbitListener(queues = "direct.bbb")
private void receiver9(String msg){
System.out.println("接收者9号接收到信息:" + msg);
}

测试方法:

1
2
3
4
5
6
7
8
9
10
11
@Test
void send8(){
String msg = "msg_test8";
rabbitTemplate.convertAndSend("direct","xxx", msg);
}

@Test
void send9(){
String msg = "msg_test9";
rabbitTemplate.convertAndSend("direct","aaa", msg);
}

运行8,9测试方法结果:

测试8因为指定了路由键为“xxx”,交换机无法找到与”xxx”对应的队列,所以丢弃了消息。而路由键“aaa”的消息被交换机根据配置转发到了名为direct.aaa的队列,于是可以被接收者8号接收到。

RabbitMQ的基本用法如上,当然,每种用法都有其适用的场景,我们将在下一节进行实战。

二、实战篇


此篇举例适用场景。每个场景不一定只用到了MQ的某一个特点,只是选择其中一个点进行分析。

在实际场景中,消息生产者和消费者很大可能不在一个模块或一个项目中,如上面的基础篇示例。而为了简化编程,以下示例不再分为producer和consumer,直接写在同一模块。

1.异步

模拟用户登录和修改昵称,记录用户操作。用户操作记录不属于业务流程,而且在业务代码上加入处理会增加耗时,所以可以利用消息队列进行异步操作。

新建log模块,创建用户操作类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Data
public class UserOperation implements Serializable {

private String username;

private String ip;

private String operation;

private Date date;

public UserOperation(String username, String ip, String operation, Date date) {
this.username = username;
this.ip = ip;
this.operation = operation;
this.date = date;
}
}

注意若用mq传输对象需要序列化。

创建RabbitConfig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RabbitConfig {

@Bean
public Queue logQueue() {
return new Queue("queue.log");
}

@Bean
DirectExchange logExchange() {
return new DirectExchange("exchange.log");
}

@Bean
Binding bindingLogExchange(Queue logQueue, DirectExchange logExchange) {
return BindingBuilder.bind(logQueue).to(logExchange).with("log");
}
}

创建接收类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class LogListener {

@RabbitListener(queues = "queue.log")
private void logReceiver(UserOperation userOperation){
System.out.println("日志接收者接收到信息:" + userOperation.toString());
writingLog(userOperation);
}

private void writingLog(UserOperation userOperation){
// 模拟写入数据库
System.out.println("写入数据库成功");
}
}

创建测试控制器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RestController
public class UserController {

@Autowired
RabbitTemplate rabbitTemplate;

@PostMapping("/login")
private String login(String username, String password, HttpServletRequest httpServletRequest){
if (login(username)){
UserOperation userOperation = new UserOperation(username, httpServletRequest.getRemoteAddr(), "用户登录", new Date());
rabbitTemplate.convertAndSend("exchange.log","log", userOperation);
return "login success";
}
return "login fail";
}

@PutMapping("/nickname")
private String modifyNickname(String username, String nickname, HttpServletRequest httpServletRequest){
if (modifyNickname(nickname)){
UserOperation userOperation = new UserOperation(username, httpServletRequest.getRemoteAddr(), "修改昵称", new Date());
rabbitTemplate.convertAndSend("exchange", "log", userOperation);
return "modify nickname success";
}
return "modify nickname fail";
}

private boolean login(String username){
// 模拟数据库操作
return !StringUtils.isEmpty(username);
}

private boolean modifyNickname(String nickname){
// 模拟数据库操作
return !StringUtils.isEmpty(nickname);
}
}

启动项目,用postman发起登录和修改昵称请求,观察控制台输出:

整个过程请求接口的处理逻辑中只包含业务流程,并不需要等待操作记录完成数据库写入才返回结果,达到了异步的效果。同样在“账号注册后邮箱验证”这一场景也十分适用这种方法,客户端不必等待邮件发送成功才返回注册结果。

2.解耦

订单系统与库存系统解耦。若订单系统直接与库存系统通讯,当库存系统出现故障,那么订单系统也会跟着出现等待甚至崩溃的现象,然而在很多情况下(比如库存充足),订单系统没必要在接收订单时访问库存系统后才返回下订结果。

新建order模块,直接show code:

1
2
3
4
5
6
7
8
9
@Data
public class Order implements Serializable {

private Integer orderId;

private Integer shopId;

private Integer count;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RabbitConfig {

@Bean
public Queue orderQueue() {
return new Queue("queue.order");
}

@Bean
DirectExchange orderExchange() {
return new DirectExchange("exchange.order");
}

@Bean
Binding bindingOrderExchange(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with("order");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class OrderListener {

@RabbitListener(queues = "queue.order")
private void orderReceiver(Order order){
System.out.println("订单接收者接收到信息:" + order.toString());
handle(order);
}

private void handle(Order order){
// 模拟写入数据库
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("库存系统已扣减");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
public class OrderController {

@Autowired
RabbitTemplate rabbitTemplate;

@PostMapping("/order")
private String order(Order order){
for (int i = 1; i <= 5; i++){
order.setOrderId(i);
rabbitTemplate.convertAndSend("exchange.order", "order", order);
System.out.println("已生成订单,订单号:" + order.getOrderId());
}
return "order success";
}
}

访问接口,结果:

订单同时就生成出来了,并没有等待库存系统的模拟5秒延时操作。若库存系统崩溃,也不会影响到订单系统的正常服务。

3.流量削峰

流量削峰顾名思义就是在高并发的请求下,用消息队列作为一个缓冲器,其实上个实例中也体现了缓冲的作用。流量削峰在秒杀业务里是一个重要的环节,下面示例将会体现到。

4.综合应用(完整demo)

以下使用RabbitMQ + Redis实现一个简易秒杀Demo,仅分析核心代码,完整代码见文末。

购买的实际场景比较复杂,本次Demo主要完成以下目标:

  • 限制同一用户不能多次参与秒杀
  • 下单
  • 支付
4-1.使用redis进行用户重复秒杀验证

用户参与秒杀时,先判断该用户是否已参与过本活动。

利用redisson实现分布式锁实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public boolean isAlreadySeckill(String userId, String goodsId){
// 以userId和goodsId保证唯一
String key = userId + "-" + goodsId;
RLock lock = redissonClient.getLock(key);
try {
lock.lock();
// set进缓存成功,代表没参加过
return !redisUtil.setnx(key, "1", 100);
} catch (Exception e){
log.error("系统错误:{}", e.getMessage());
return true;
} finally {
lock.unlock();
}
}
4-2.下单消息放进队列等待消费

首先定义好交换机、队列和他们的绑定关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* order队列
* @return
*/
@Bean
public Queue orderQueue(){
return new Queue(orderQueueName);
}

/**
* order交换机
* @return
*/
@Bean
public DirectExchange orderDirectExchange(){
return new DirectExchange(orderExchangeName);
}

/**
* order队列交换机绑定
* @return
*/
@Bean
public Binding simpleBinding(Queue orderQueue, DirectExchange orderDirectExchange){
return BindingBuilder.bind(orderQueue).to(orderDirectExchange).with(orderRoutingKey);
}

上述示例直接使用@RabbitListener注解声明一个方法以处理消息的消费,实际场景一般定义一个Listener继承ChannelAwareMessageListener,然后重写onMessage()方法来处理消费逻辑,这样可以手动确认消费,更加灵活:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Component
public class OrderListener implements ChannelAwareMessageListener {

@Autowired
SeckillService seckillService;

@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
// 获取tag
long tag = message.getMessageProperties().getDeliveryTag();
String str = new String(message.getBody(),"utf-8");
log.info("消费者接收到的消息:{}",str);
JSONObject obj = JSONObject.parseObject(str);
Order order = JSONObject.toJavaObject(obj, Order.class);
// 下单
if (seckillService.order(order) == 1){
log.info("下单成功,订单id:{}", order.getId());
} else {
log.info("下单失败,库存不足");
}
// 消费确认
channel.basicAck(tag,true);
}catch(Exception e){
log.error("消费者消费消息发生异常:{}",e.getMessage());
}
}
}

下单对应的数据库操作:

1
2
3
4
5
6
7
8
9
10
11
@Override
public int order(Order order) {
int updates = goodsMapper.reduceStock(order.getGoodsId());
if (updates > 0) {
// 库存-1成功
orderMapper.insert(order);
return 1;
} else {
return 0;
}
}

然后,在用户下单时,向队列发送消息即可,不需要等待数据库操作,至于到底能不能抢到,就看消费者能不能消费成功了。(可以继续考虑消息没有被成功消费的后续处理)

4-3.延时支付——死信队列

成功创建了订单不代表该订单一定有效。一般的业务是,用户需在限定时间内完成支付,不支付视为放弃,此时需要把订单作废,商品库存要重新添加回来。

修改下单代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public int order(Order order) {
int updates = goodsMapper.reduceStock(order.getGoodsId());
if (updates > 0) {
// 库存-1成功
orderMapper.insert(order);
// 生产支付消息(该消息会延时一段时间才会被消费者监听到,消费消息时检验在这段时间内是否已支付)
rabbitTemplate.convertAndSend(payExchangeName, payRoutingKey, order);
return 1;
} else {
return 0;
}
}

增加了下单后向支付交换机发送消息。

但是,我们知道交换机不会保留消息,它会根据路由键马上把消息转发到对应队列,显然不足以应付延时支付的业务需求,于是引入了一个难点——死信队列,下面逐步分析。

首先,标准的模式不会变,永远是这样:

可以看到,消息一旦被生产出来放到交换机上,交换机根据路由键立马就会把消息给到对应的队列,消费者一直监听着队列,队列一有消息就会被消费。那死信队列的作用?消息加入死信队列后,可以通过配置,设置过期时间,当到达了过期时间,根据设定的规则把消息丢弃,或者加到某个交换机上。(我个人理解为死信队列充当了一次生产者?不知道这样理解对不对)

于是,对于以上模式,我们做点手脚:

那么,在这种改造下,队列2就是死信队列了。接下来就是该死信队列的设置和如何与交换机2绑定的问题了,这个好办。先完善一下最终的图示:

即最终需要两个交换机,两个队列。

指定死信队列的x-dead-letter-exchange,x-dead-letter-routing-key,x-message-ttl三个属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 死信队列
* 消息在里面过期后会传给指定的交换机
* @return
*/
@Bean
public Queue deadLetterQueue() {
Map<String, Object> map = new HashMap<>(16);
// 死信交换机
map.put("x-dead-letter-exchange", deadLetterExchangeName);
// 死信routingkey
map.put("x-dead-letter-routing-key", payRoutingKey);
// 死信队列中的消息过期时间
map.put("x-message-ttl", ttl);
// 死信队列
return new Queue(deadLetterQueueName, true, false, false, map);
}

绑定照旧:

1
2
3
4
5
6
7
8
/**
* pay交换机与死信队列绑定
* @return
*/
@Bean
public Binding payExAndDeadLetterQueueBinding(Queue deadLetterQueue, DirectExchange payDirectExchange){
return BindingBuilder.bind(deadLetterQueue).to(payDirectExchange).with(payRoutingKey);
}

经过ttl设定的时间后,消费到达死信交换机,死信交换机把消息转发到支付队列,消费者开始消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
Long tag = message.getMessageProperties().getDeliveryTag();
String str = new String(message.getBody(), "utf-8");
log.info("消费者接收到的消息:{}",str);
JSONObject obj = JSONObject.parseObject(str);
String orderId = obj.getString("id");
// 确认该订单是否付款
if (payService.checkPay(orderId) == 1){
log.info("交易成功,订单id:{}", orderId);
} else {
log.info("交易失败,订单id:{}未支付", orderId);
}
// 无论是否支付,都消费此消息
channel.basicAck(tag, true);
}catch(Exception e){
log.error("消费者消费消息发生异常:{}",e.getMessage());
}
}

checkPay()方法检查订单是否已支付,订单的支付状态会在用户支付后改变。

测试一下:

  • 用户下单并在限定时间内支付:

  • 用户下单后没有支付:

源码地址: