RabbitMQ
RabbitMq介绍
Mq作用
服务异步调用
问题
服务A如何保证异步请求一定能被服务B接收到并处理

解决

削峰
服务解耦
RabbitMq介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
- Erlang:Erlang是一种通用的面向并发的编程语言,Erlang充分发挥CPU的性能,延迟特别低,相比其他的MQ(Kafka,RocketMQ)延迟是最低的。
- RabbitMQ支持多种语言通讯:Java,Python等都有相应的API。
- RabbitMQ支持海量的插件去实现一些特殊功能,RabbitMQ自带了一款图形化界面,操作异常的简单。
RabbitMq安装
Docker安装RabbitMQ
启动图形化界面
RabbitMq架构

Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Counsumer
消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
Message
消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
Connection
每个Publisher(生产者)或者Consumer(消费者)要通过RabbitMQ发送与消费消息,首先就要与RabbitMQ建立连接,这个连接就是Connection。Connection是一个TCP长连接。
Channel
Connection与Channel之间的关系可以比作光纤电缆,如果把Connection比作一条光纤电缆,那么Channel就相当于是电缆中的一束光纤。
Channel是在Connection的基础上建立的虚拟连接,RabbitMQ中大部分的操作都是使用Channel完成的,比如:声明Queue、声明Exchange、发布消息、消费消息等。
Exchange
Exchange是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。Exchange的四种类型:direct、fanout、topic、headers。
DIRECT
DIRECT的意思是直接的,DIRECT类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当Publisher发送的消息的Routing key与某个Binding key相等时,消息才会被分发到对应的Queue上。

FANOUT
FANOUT是扇形的意思,该类型通常叫作广播类型。FANOUT类型的Exchange不处理Routing key,而是会将发送给它的消息路由到所有与它绑定的Queue上。

TOPIC
TOPIC的意思是主题,TOPIC类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上。通配符的匹配规则如下:
- Routing key必须是一串字符串,每个单词用
.分隔;
- 符号
#表示匹配一个或多个单词;
- 符号
*表示匹配一个单词。
例如:“*.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;“#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”。

HEADERS
HEADERS类型的Exchange不依赖于Routing key与Binding key的匹配规则来路由消息,而是根据发送的消息内容中的HEADERS属性进行匹配。(HEADERS类型的交换器性能差,不实用,基本不会使用。)
Queue
Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。
Virtual Host
Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间相互隔离的效果。
RabbitMQ通讯方式
1 2 3 4 5 6 7 8 9 10 11 12
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RabbitMQConnectionUtil {
public static final String RABBITMQ_HOST = "192.168.11.32";
public static final int RABBITMQ_PORT = 5672;
public static final String RABBITMQ_USERNAME = "guest";
public static final String RABBITMQ_PASSWORD = "guest";
public static final String RABBITMQ_VIRTUAL_HOST = "/";
public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RABBITMQ_HOST); factory.setPort(RABBITMQ_PORT); factory.setUsername(RABBITMQ_USERNAME); factory.setPassword(RABBITMQ_PASSWORD); factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
Connection connection = factory.newConnection(); return connection; } }
|
Hello World

使用默认exchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test;
public class Publisher {
public static final String QUEUE_NAME = "hello";
@Test public void publish() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送成功!"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import com.rabbitmq.client.*; import org.junit.Test;
import java.io.IOException;
public class Consumer {
@Test public void consume() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Publisher.QUEUE_NAME, false, false, false, null);
DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取到消息:" + new String(body, "UTF-8")); } }; channel.basicConsume(Publisher.QUEUE_NAME, true, callback); System.out.println("开始监听队列");
System.in.read(); } }
|
Work Queues

- 一个队列中的消息,只会被一个消费者成功消费
- 默认情况下,RabbitMq的队列会将消息以轮巡的方式交给不同的消费者消费
- 消费者拿到消息后,需要给RabbitMq一个ACK,RabbitMq则认为消费者拿到消息了
生产者和Hello World的形式是一样的,都是将消息推送到默认交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test;
public class Publisher {
public static final String QUEUE_NAME = "work";
@Test public void publish() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) { String message = "Hello World!" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } System.out.println("消息发送成功!"); } }
|
让消费者关闭自动ack,并且设置消息的流控,最终实现消费者可以尽可能去多消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| import com.rabbitmq.client.*; import org.junit.Test;
import java.io.IOException;
public class Consumer {
@Test public void consume1() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Publisher.QUEUE_NAME, false, false, false, null);
channel.basicQos(3);
DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者1号-获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(Publisher.QUEUE_NAME, false, callback); System.out.println("开始监听队列");
System.in.read(); }
@Test public void consume2() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Publisher.QUEUE_NAME, false, false, false, null);
channel.basicQos(3);
DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者2号-获取到消息:" + new String(body, "UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(Publisher.QUEUE_NAME, false, callback); System.out.println("开始监听队列");
System.in.read(); } }
|
Publish/Subscribe

构建一个自定义交换机,并指定类型是FANOUT,让交换机跟多个Queue绑定在一起
自行构建Exchange并绑定指定队列类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test;
public class Publisher {
public static final String EXCHANGE_NAME = "pubsub"; public static final String QUEUE_NAME1 = "pubsub-one"; public static final String QUEUE_NAME2 = "pubsub-two";
@Test public void publish() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, ""); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
channel.basicPublish(EXCHANGE_NAME, "", null, "publish/subscribe!".getBytes()); System.out.println("消息成功发送!"); } }
|
Routing

在绑定Exchange和Queue时,需要指定好routingKey,同时在发送消息时,也指定routingKey,只有routingKey一致时,才会把指定的消息路由到指定的Queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test;
public class Publisher {
public static final String EXCHANGE_NAME = "routing"; public static final String QUEUE_NAME1 = "routing-one"; public static final String QUEUE_NAME2 = "routing-two";
@Test public void publish() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "ORANGE"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "BLACK"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "GREEN");
channel.basicPublish(EXCHANGE_NAME, "ORANGE", null, "大橙子!".getBytes()); channel.basicPublish(EXCHANGE_NAME, "BLACK", null, "黑布林大狸子".getBytes()); channel.basicPublish(EXCHANGE_NAME, "WHITE", null, "小白兔!".getBytes()); System.out.println("消息成功发送!"); } }
|
Topic

TOPIC类型可以编写带有特殊意义的routingKey的绑定方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test;
public class Publisher {
public static final String EXCHANGE_NAME = "topic"; public static final String QUEUE_NAME1 = "topic-one"; public static final String QUEUE_NAME2 = "topic-two";
@Test public void publish() throws Exception { Connection connection = RabbitMQConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.orange.*"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "lazy.#");
channel.basicPublish(EXCHANGE_NAME, "big.orange.rabbit", null, "大橙兔子!".getBytes()); channel.basicPublish(EXCHANGE_NAME, "small.white.rabbit", null, "小白兔".getBytes()); channel.basicPublish(EXCHANGE_NAME, "lazy.dog.dog.dog.dog.dog.dog", null, "懒狗狗狗狗狗狗".getBytes()); System.out.println("消息成功发送!"); } }
|
SpringBoot集成RabbitMq
基础依赖以及配置
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.11.32 port: 5672 username: guest password: guest virtual-host: /
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
public static final String EXCHANGE = "boot-exchange"; public static final String QUEUE = "boot-queue"; public static final String ROUTING_KEY = "*.black.*";
@Bean public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE).build(); }
@Bean public Queue bootQueue(){ return QueueBuilder.durable(QUEUE).build(); }
@Bean public Binding bootBinding(Exchange bootExchange,Queue bootQueue){ return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs(); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import org.junit.jupiter.api.Test; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest public class PublisherTest {
@Autowired public RabbitTemplate rabbitTemplate;
@Test public void publish() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message"); System.out.println("消息发送成功"); }
@Test public void publishWithProps() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "messageWithProps", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setCorrelationId("123"); return message; } }); System.out.println("消息发送成功"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.rabbitmq.client.Channel; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class ConsumeListener {
@RabbitListener(queues = RabbitMQConfig.QUEUE) public void consume(String msg, Channel channel, Message message) throws IOException { System.out.println("队列的消息为:" + msg); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("唯一标识为:" + correlationId); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
|
RabbitMQ保证消息可靠性
生产者保证消息可靠性
Confirm机制
保证消息一定送达到Exchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息成功的发送到Exchange!"); }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!"); } });
|
- 配置文件开启Confirm机制
1 2 3 4
| spring: rabbitmq: publisher-confirm-type: correlated publisher-confirms: true
|
- 在发送消息时,配置RabbitTemplate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public void publishWithConfirms() throws IOException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息已经送达到交换机!!"); } else { System.out.println("消息没有送达到Exchange,需要做一些补偿操作!"); } } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message"); System.out.println("消息发送成功");
System.in.read(); }
|
Return机制
保证Exchange上的消息一定送达到Queue
1 2 3 4 5 6 7 8 9 10 11
| channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消息没有路由到指定队列,做其他的补偿措施!"); } });
channel.basicPublish("", "confirms", true, props, message.getBytes());
|
- 配置文件开启Return机制
1 2 3
| spring: rabbitmq: publisher-returns: true
|
- 在发送消息时,配置RabbitTemplate
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public void publishWithReturn() throws IOException { rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { String msg = new String(returned.getMessage().getBody()); System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!"); } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message"); System.out.println("消息发送成功");
System.in.read(); }
|
持久化消息
保证消息持久化,重启MQ服务后队里依然有消息存在
1 2 3 4 5 6 7 8
| AMQP.BasicProperties props = new AMQP.BasicProperties() .builder() .deliveryMode(2) .build();
channel.basicPublish("", "confirms", true, props, message.getBytes());
|
- 在发送消息时,配置RabbitTemplate
1 2 3 4 5 6 7 8 9 10 11
| public void publishWithBasicProperties() throws IOException { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; } }); System.out.println("消息发送成功"); }
|
消费者保证消息可靠性
ACK确认
消息确认消费后手动ack确认
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| DefaultConsumer callback = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者获取到消息:" + new String(body,"UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RabbitListener(queues = "test.confirms") @RabbitHandler public void processForAck(String msg, Channel channel, Message message) throws IOException { try { if (message.getMessageProperties().getDeliveryTag() == 1 || message.getMessageProperties().getDeliveryTag() == 2) { throw new RuntimeException("test exception"); }
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.out.println("消息已重复处理失败,拒绝再次接收" + msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息即将再次返回队列处理" + msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
|
消息的幂等性
幂等(idempotence)实际上是一个数学与计算机学概念,在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。通俗点说就是:同样的参数或者数据去调用同一个接口,无论重复调用多少次,总能保证数据的正确性,不能出错,这就是接口的幂等性。这里“数据的正确性”和具体的业务相关,不同的业务,对于幂等性的定义是不一样的。
基于幂等性的要求,我们需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果这得结合具体的业务来考虑:
MQ消息队列的重复消费问题的通用解决办法以及幂等性的原理
死信队列
前提是已经配置了死信队列以及做了绑定,死信队列跟普通队列没两样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMqConfig {
public static final String NORMAL_EXCHANGE = "normal-exchange";
public static final String NORMAL_QUEUE = "normal-queue";
public static final String NORMAL_ROUTING_KEY = "normal.#";
public static final String DEAD_EXCHANGE = "dead-exchange";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead.#";
@Bean public Exchange normalExchange() { return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build(); }
@Bean public Queue normalQueue() { return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build(); }
@Bean public Binding normalBinding(Queue normalQueue, Exchange normalExchange) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs(); }
@Bean public Exchange deadExchange() { return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); }
@Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); }
@Bean public Binding deadBinding(Queue deadQueue, Exchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }
|
- 消息被消费者拒绝(reject)或未确认(nack),且requeue设置为false的消息进入死信队列
1 2 3 4 5 6
| @RabbitListener(queues = RabbitMqConfig.NORMAL_QUEUE) public void consumer(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到normal队列的消息:" + msg); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); }
|
- 指定消息或者队列所有消息的生存时间,生存时间到了还没被消费消息进入死信队列
1 2 3 4 5 6 7 8 9 10 11
| @Test public void publishExpire() { String msg = "dead letter expire"; rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); return message; } }); }
|
1 2 3 4
| @Bean public Queue normalQueue() { return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").ttl(10000).build(); }
|
- 队列已经达到消息的最大长度后,再路由过来的消息消息进入死信队列
1 2 3 4
| @Bean public Queue normalQueue() { return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").maxLength(1).build(); }
|

延迟交换机
死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。下载地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Configuration public class DelayedConfig {
public static final String DELAYED_EXCHANGE = "delayed-exchange"; public static final String DELAYED_QUEUE = "delayed-queue"; public static final String DELAYED_ROUTING_KEY = "delayed.#";
@Bean public Exchange delayedExchange() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "topic"); Exchange exchange = new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, arguments); return exchange; }
@Bean public Queue delayedQueue() { return QueueBuilder.durable(DELAYED_QUEUE).build(); }
@Bean public Binding delayedBinding(Queue delayedQueue, Exchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
|
1 2 3 4 5 6 7 8 9 10
| @Test public void publish() { rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(30000); return message; } }); }
|