RabbitMQ

RabbitMQ

RabbitMq介绍

Mq作用

服务异步调用

  • 问题

    服务A如何保证异步请求一定能被服务B接收到并处理

    image

  • 解决

    image

削峰

  • 问题

    海量请求,如何实现削峰的效果,将请求全部放到一个队列中,慢慢的消费,这个队列怎么实现?

    image

  • 解决

    image

服务解耦

  • 问题

    如何尽量的降低服务之间的耦合问题,如果在订单服务与积分和商家服务解耦,需要一个队列,而这个队列依然需要实现上述两种情况功能。

    image

  • 解决

    image

RabbitMq介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

  • Erlang:Erlang是一种通用的面向并发的编程语言,Erlang充分发挥CPU的性能,延迟特别低,相比其他的MQ(Kafka,RocketMQ)延迟是最低的。
  • RabbitMQ支持多种语言通讯:Java,Python等都有相应的API。
  • RabbitMQ支持海量的插件去实现一些特殊功能,RabbitMQ自带了一款图形化界面,操作异常的简单。

RabbitMq安装

Docker安装RabbitMQ

  • docker-compose.yml

    1
    2
    3
    [root@localhost docker]# mkdir rabbitmq # 创建文件夹
    [root@localhost docker]# vim docker-compose.yml # 将下面配置信息复制进去
    [root@localhost docker]# docker-compose up -d # 后台执行启动程序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    version: "3.1"
    services:
    rabbitmq:
    image: daocloud.io/library/rabbitmq:3.8.5
    container_name: rabbitmq
    restart: always
    volumes:
    - ./data/:/var/lib/rabbitmq/
    ports:
    - 5672:5672
    - 15672:15672
  • 访问测试

    在Linux内部执行:curl localhost:5672,如下表示安装成功

    image

启动图形化界面

  • 启动management

    1
    2
    3
    4
    [root@localhost docker]# docker exec -it rabbitmq bash # 进入容器内部
    root@811781e25401:/# cd opt/rabbitmq/ # 进入rabbitmq目录
    root@811781e25401:/opt/rabbitmq# ls # 查看目录下的内容(这里可以看到两个目录一个是plugins一个是sbin)
    root@811781e25401:/opt/rabbitmq/sbin# ./rabbitmq-plugins enable rabbitmq_management # 进入sbin目录启动图形化界面

    image

  • 访问测试

    如果本地虚拟机访问不了,看下这篇文章是否网络配置不对,VMWare虚拟机配置

    1
    [root@localhost ~]# ifconfig # 查看IP

    image

    访问http://ip:15672​端口:默认的用户名和密码均为:guest

    image

RabbitMq架构

image

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Counsumer

消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。

Message

消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

Connection

每个Publisher(生产者)或者Consumer(消费者)要通过RabbitMQ发送与消费消息,首先就要与RabbitMQ建立连接,这个连接就是Connection。Connection是一个TCP长连接。

Channel

Connection与Channel之间的关系可以比作光纤电缆,如果把Connection比作一条光纤电缆,那么Channel就相当于是电缆中的一束光纤。

Channel是在Connection的基础上建立的虚拟连接,RabbitMQ中大部分的操作都是使用Channel完成的,比如:声明Queue、声明Exchange、发布消息、消费消息等。

Exchange

Exchange是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。Exchange的四种类型:directfanouttopicheaders

  • DIRECT

    DIRECT的意思是直接的,DIRECT类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当Publisher发送的消息的Routing key与某个Binding key相等时,消息才会被分发到对应的Queue上。

    image

  • FANOUT

    FANOUT是扇形的意思,该类型通常叫作广播类型。FANOUT类型的Exchange不处理Routing key,而是会将发送给它的消息路由到所有与它绑定的Queue上。

    image

  • TOPIC

    TOPIC的意思是主题,TOPIC类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上。通配符的匹配规则如下:

    • Routing key必须是一串字符串,每个单词用.分隔;
    • 符号#表示匹配一个或多个单词;
    • 符号*表示匹配一个单词。

    例如:“*.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;“#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”。

    image

  • HEADERS

    HEADERS类型的Exchange不依赖于Routing key与Binding key的匹配规则来路由消息,而是根据发送的消息内容中的HEADERS属性进行匹配。(HEADERS类型的交换器性能差,不实用,基本不会使用。)

Queue

Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。

Virtual Host

Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间相互隔离的效果。

RabbitMQ通讯方式

  • 导入依赖:amqp-client,junit
1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
  • 构建工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQConnectionUtil {

public static final String RABBITMQ_HOST = "192.168.11.32";

public static final int RABBITMQ_PORT = 5672;

public static final String RABBITMQ_USERNAME = "guest";

public static final String RABBITMQ_PASSWORD = "guest";

public static final String RABBITMQ_VIRTUAL_HOST = "/";

/**
* 构建RabbitMQ的连接对象
* @return
*/
public static Connection getConnection() throws Exception {
//1. 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();

//2. 设置RabbitMQ的连接信息
factory.setHost(RABBITMQ_HOST);
factory.setPort(RABBITMQ_PORT);
factory.setUsername(RABBITMQ_USERNAME);
factory.setPassword(RABBITMQ_PASSWORD);
factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);

//3. 返回连接对象
Connection connection = factory.newConnection();
return connection;
}
}

Hello World

image

  • 生产者

使用默认exchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

public class Publisher {

public static final String QUEUE_NAME = "hello";

@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//4. 发布消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功!");
}
}
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

public class Consumer {

@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建队列
channel.queueDeclare(Publisher.QUEUE_NAME, false, false, false, null);

//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
}
};
channel.basicConsume(Publisher.QUEUE_NAME, true, callback);
System.out.println("开始监听队列");

System.in.read();
}
}

Work Queues

image

  1. 一个队列中的消息,只会被一个消费者成功消费
  2. 默认情况下,RabbitMq的队列会将消息以轮巡的方式交给不同的消费者消费
  3. 消费者拿到消息后,需要给RabbitMq一个ACK,RabbitMq则认为消费者拿到消息了
  • 生产者

生产者和Hello World的形式是一样的,都是将消息推送到默认交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

public class Publisher {

public static final String QUEUE_NAME = "work";

@Test
public void publish() throws Exception {
// 1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

// 2. 构建Channel
Channel channel = connection.createChannel();

// 3. 构建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 4. 发布消息
for (int i = 0; i < 10; i++) {
String message = "Hello World!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("消息发送成功!");
}
}
  • 消费者

让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

public class Consumer {

@Test
public void consume1() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建队列
channel.queueDeclare(Publisher.QUEUE_NAME, false, false, false, null);

//3.5 设置消息的流控
channel.basicQos(3);

//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号-获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Publisher.QUEUE_NAME, false, callback);
System.out.println("开始监听队列");

System.in.read();
}

@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建队列
channel.queueDeclare(Publisher.QUEUE_NAME, false, false, false, null);

channel.basicQos(3);

//4. 监听消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号-获取到消息:" + new String(body, "UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Publisher.QUEUE_NAME, false, callback);
System.out.println("开始监听队列");

System.in.read();
}
}

Publish/Subscribe

image

构建一个自定义交换机,并指定类型是FANOUT,让交换机跟多个Queue绑定在一起

  • 生产者

自行构建Exchange并绑定指定队列类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

public class Publisher {

public static final String EXCHANGE_NAME = "pubsub";
public static final String QUEUE_NAME1 = "pubsub-one";
public static final String QUEUE_NAME2 = "pubsub-two";

@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

//4. 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

//5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");

//6. 发消息到交换机
channel.basicPublish(EXCHANGE_NAME, "", null, "publish/subscribe!".getBytes());
System.out.println("消息成功发送!");
}
}

Routing

image

  • 生产者

在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

public class Publisher {

public static final String EXCHANGE_NAME = "routing";
public static final String QUEUE_NAME1 = "routing-one";
public static final String QUEUE_NAME2 = "routing-two";

@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

//4. 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

//5. 绑定交换机和队列
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "ORANGE");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "BLACK");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "GREEN");

//6. 发消息到交换机
channel.basicPublish(EXCHANGE_NAME, "ORANGE", null, "大橙子!".getBytes());
channel.basicPublish(EXCHANGE_NAME, "BLACK", null, "黑布林大狸子".getBytes());
channel.basicPublish(EXCHANGE_NAME, "WHITE", null, "小白兔!".getBytes());
System.out.println("消息成功发送!");
}
}

Topic

image

  • 生产者

TOPIC类型可以编写带有特殊意义的routingKey的绑定方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;

public class Publisher {

public static final String EXCHANGE_NAME = "topic";
public static final String QUEUE_NAME1 = "topic-one";
public static final String QUEUE_NAME2 = "topic-two";

@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQConnectionUtil.getConnection();

//2. 构建Channel
Channel channel = connection.createChannel();

//3. 构建交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//4. 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

//5. 绑定交换机和队列,
// TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey
// 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.orange.*");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "lazy.#");

//6. 发消息到交换机
channel.basicPublish(EXCHANGE_NAME, "big.orange.rabbit", null, "大橙兔子!".getBytes());
channel.basicPublish(EXCHANGE_NAME, "small.white.rabbit", null, "小白兔".getBytes());
channel.basicPublish(EXCHANGE_NAME, "lazy.dog.dog.dog.dog.dog.dog", null, "懒狗狗狗狗狗狗".getBytes());
System.out.println("消息成功发送!");
}
}

SpringBoot集成RabbitMq

基础依赖以及配置

  • 导入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置yml文件
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.11.32
port: 5672
username: guest
password: guest
virtual-host: /
  • 声明交换机&队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

public static final String EXCHANGE = "boot-exchange";
public static final String QUEUE = "boot-queue";
public static final String ROUTING_KEY = "*.black.*";


@Bean
public Exchange bootExchange(){
// channel.DeclareExchange
return ExchangeBuilder.topicExchange(EXCHANGE).build();
}

@Bean
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE).build();
}

@Bean
public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class PublisherTest {

@Autowired
public RabbitTemplate rabbitTemplate;

@Test
public void publish() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message");
System.out.println("消息发送成功");
}

@Test
public void publishWithProps() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProps", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setCorrelationId("123");
return message;
}
});
System.out.println("消息发送成功");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.rabbitmq.client.Channel;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class ConsumeListener {

@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("队列的消息为:" + msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一标识为:" + correlationId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

RabbitMQ保证消息可靠性

生产者保证消息可靠性

Confirm机制

保证消息一定送达到Exchange

  • SpringMVC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 开启confirms
channel.confirmSelect();

// 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功的发送到Exchange!");
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
}
});
  • SpringBoot
  1. 配置文件开启Confirm机制
1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 新版本
publisher-confirms: true # 老版本
  1. 在发送消息时,配置RabbitTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void publishWithConfirms() throws IOException {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已经送达到交换机!!");
} else {
System.out.println("消息没有送达到Exchange,需要做一些补偿操作!");
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message");
System.out.println("消息发送成功");

System.in.read();
}

Return机制

保证Exchange上的消息一定送达到Queue

  • SpringMVC
1
2
3
4
5
6
7
8
9
10
11
// 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到指定队列,做其他的补偿措施!");
}
});

// 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
channel.basicPublish("", "confirms", true, props, message.getBytes());
  • SpringBoot
  1. 配置文件开启Return机制
1
2
3
spring:
rabbitmq:
publisher-returns: true # 开启Return机制
  1. 在发送消息时,配置RabbitTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void publishWithReturn() throws IOException {
// 新版本用 setReturnsCallback ,老版本用setReturnCallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
String msg = new String(returned.getMessage().getBody());
System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message");
System.out.println("消息发送成功");

System.in.read();
}

持久化消息

保证消息持久化,重启MQ服务后队里依然有消息存在

  • SpringMVC
1
2
3
4
5
6
7
8
// 设置消息持久化,DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.deliveryMode(2)
.build();

// 发布消息
channel.basicPublish("", "confirms", true, props, message.getBytes());
  • SpringBoot
  1. 在发送消息时,配置RabbitTemplate
1
2
3
4
5
6
7
8
9
10
11
public void publishWithBasicProperties() throws IOException {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息的持久化!
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
System.out.println("消息发送成功");
}

消费者保证消息可靠性

ACK确认

消息确认消费后手动ack确认

  • SpringMVC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));

// 开启ACK确认消息被成功消费
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
  • SpringBoot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RabbitListener(queues = "test.confirms")
@RabbitHandler
public void processForAck(String msg, Channel channel, Message message) throws IOException {
try {
// deliveryTag是消息传送的次数,我这里是为了让消息队列的第一个消息到达的时候抛出异常,处理异常让消息重新回到队列,然后再次抛出异常,处理异常拒绝让消息重回队列
if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2) {
throw new RuntimeException("test exception");
}

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收" + msg);
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理" + msg);
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}

消息的幂等性

幂等(idempotence)实际上是一个数学与计算机学概念,在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。通俗点说就是:同样的参数或者数据去调用同一个接口,无论重复调用多少次,总能保证数据的正确性,不能出错,这就是接口的幂等性。这里“数据的正确性”和具体的业务相关,不同的业务,对于幂等性的定义是不一样的。

基于幂等性的要求,我们需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果这得结合具体的业务来考虑:

  • 只读或者更新的业务

    如果一个业务是只读业务,或者是更新的业务,那么多次读取或者多次更新相同的数据基本上都没什么问题。

  • 需要插入数据的业务

    如果业务需要插入数据,需要在消息里面增加全局唯一的ID,以便消费者用于确定是否重复消费。

    • 强校验

      可以新建一张消息消费表,或者在已有业务表中增加消息全局唯一ID字段,消费者每次消费时拿全局唯一ID去数据库中查找数据是否已经消费过,是则丢弃,不是则进行业务处理。“强校验”类型会一定程度上影响数据库性能,通常涉及到金钱的都需要强校验。

    • 弱校验

      可以将全局唯一ID写入Redis中,生产者可先将ID作为key存入到Redis中,消费者消费时先判断缓存中是否存在ID,存在则消费,且消费完后删除key。这种对于发送短信的业务比较合适,多发送两条,或者发送失败都没关系。

MQ消息队列的重复消费问题的通用解决办法以及幂等性的原理

死信队列

前提是已经配置了死信队列以及做了绑定,死信队列跟普通队列没两样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

// 普通队列Exchange
public static final String NORMAL_EXCHANGE = "normal-exchange";

// 普通队列Queue
public static final String NORMAL_QUEUE = "normal-queue";

// 普通队列RoutingKey
public static final String NORMAL_ROUTING_KEY = "normal.#";

// 死信队列Exchange
public static final String DEAD_EXCHANGE = "dead-exchange";

// 死信队列Queue
public static final String DEAD_QUEUE = "dead-queue";

// 死信队列RoutingKey
public static final String DEAD_ROUTING_KEY = "dead.#";

/**
* 构造topic类型的普通队列Exchange
*
* @return 普通队列Exchange
*/
@Bean
public Exchange normalExchange() {
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
}

/**
* 1. 构造普通队列Queue,并做持久化处理
*
* 2. 绑定死信队列Exchange,并设置绑定死信队列Exchange的RoutingKey为dead.abc
*
* @return 普通队列Queue
*/
@Bean
public Queue normalQueue() {
// 这里需要重新绑定一下RoutingKey,因为原先的消息的RoutingKey是normal.#,如果不重新配置,消息达到不了死信队列Queue
return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
}

/**
* 设置普通队列Queue与普通队列Exchange的绑定,并设置RoutingKey为NORMAL_ROUTING_KEY
*
* @param normalQueue 普通队列Queue
* @param normalExchange 普通队列Exchange
* @return 绑定Binding
*/
@Bean
public Binding normalBinding(Queue normalQueue, Exchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}

/**
* 构造topic类型的死信队列Exchange
*
* @return 死信队列Exchange
*/
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
}

/**
* 构造死信队列Queue
*
* @return 死信队列Queue
*/
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}

/**
* 设置死信队列Queue与死信队列Exchange的绑定,并设置RoutingKey为DEAD_ROUTING_KEY
*
* @param deadQueue 死信队列Queue
* @param deadExchange 死信队列Exchange
* @return 绑定Binding
*/
@Bean
public Binding deadBinding(Queue deadQueue, Exchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
  1. 消息被消费者拒绝(reject)或未确认(nack),且requeue设置为false的消息进入死信队列
1
2
3
4
5
6
@RabbitListener(queues = RabbitMqConfig.NORMAL_QUEUE)
public void consumer(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到normal队列的消息:" + msg);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
  1. 指定消息或者队列所有消息的生存时间,生存时间到了还没被消费消息进入死信队列
  • 给消息设置生存时间
1
2
3
4
5
6
7
8
9
10
11
@Test
public void publishExpire() {
String msg = "dead letter expire";
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
});
}
  • 给队列设置消息的生存时间
1
2
3
4
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build();
}
  1. 队列已经达到消息的最大长度后,再路由过来的消息消息进入死信队列
1
2
3
4
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build();
}

image

延迟交换机

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。下载地址

  • 构建延迟交换机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class DelayedConfig {

public static final String DELAYED_EXCHANGE = "delayed-exchange";
public static final String DELAYED_QUEUE = "delayed-queue";
public static final String DELAYED_ROUTING_KEY = "delayed.#";

@Bean
public Exchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "topic");
Exchange exchange = new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, arguments);
return exchange;
}

@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE).build();
}

@Bean
public Binding delayedBinding(Queue delayedQueue, Exchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
  • 发送消息
1
2
3
4
5
6
7
8
9
10
@Test
public void publish() {
rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(30000);
return message;
}
});
}