什么是消息队列
在google的时候发现知乎一个很好的回答,很生动形象。
作者:ScienJus
原文链接
个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。使用场景的话,举个例子:假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:校验用户名等信息,如果没问题会在数据库中添加一个用户记录如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他发送给用户一个包含操作指南的系统通知等等……但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。
我理解的消息队列就是当你的一个接口访问量大或者需要进行的操作很多,就比如注册这些功能(里面有发邮件,插入数据各种操作),但是用户只需要知道是否注册成功,他们需要马上收到注册结果(提高用户体验性),所以当用户的信息插入到数据库中我们就可以直接返回结果,至于后面的操作可以放入消息队列异步慢慢处理。
SpringBoot整合RabbitMQ
maven配置
首先当然是maven配置了,springboot中为我们提供了amqp的start,amqp是一个消息队列的协议,RabbitMQ实现了这个协议。
1
2
3
4<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>springboot配置文件的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
template:
retry:
enabled: true
initial-interval: 1000ms
max-attempts: 3
max-interval: 10000ms
multiplier: 1.0除了host,port什么的其他都可以不写,springboot已经默认帮我们配置好了。
RabbitMQ的配置类
1
2
3
4
5
6
7
8
9
10
11
public class RabbitConfig {
public static final String QUEUE = "queue";
public Queue queue(){
return new Queue(QUEUE,true);
}
}消息发送者(提供者)
1
2
3
4
5public void send(Object message){
logger.info("send");
//标注需要发送的某个队列
amqpTemplate.convertAndSend(RabbitConfig.QUEUE,message);
}消息接收者(消费者)
1
2
3
4
5
6//@RabbitListener注解需要标注刚刚我们的队列名
//其作用就是监听那个队列是否有消息,有消息则接收
(queues = RabbitConfig.QUEUE)
public void receive(String message){
logger.info("receive:"+message);
}
RabbitMQ的四种模式
RabbitMQ分为四种模式
* Direct模式
* Topic模式
* Fanout模式
* Headers模式
Direct模式(直接交换模式)
刚刚我们的配置类,接受者和发送者实现的就是Direct模式,其流程就是发送者指定一个Queue来发送消息给那个队列,然后消费者一直监听那个队列,有消息则接收
Topic模式
首先是Topic的配置类,这里定义两个Queue,并且生成一个TopicExchange(Topic交换机)。
然后我们通过Binding来将A和B的消息队列绑定到我们刚刚生成的TopicExchange和指定routingKey中

1 |
|
然后是我们的消息发送者
1 | public void topicSend(Object message){ |
最后是我们的消息消费者(接收者)
1 | //接收很简单,只要指定队列就行 |
- Fanout模式
首先是Fanout的配置类,跟上面的topic查不到,我们先配置一个FanoutExchange类,然后通过Binding将队列绑定到那个Exchange中,我们可以绑定多个队列到Exchange中。

1 |
|
然后就是我们的发送者
1 | //我们只要将消息发送给交换机就好 |
最后是我们的消费者,这时候我们会发现这个Fanout模式就是类似于广播模式,我们将所有需要的队列都绑定上Fanout的Exchange中,然后我们发送者只需要将消息发送给Exchange,然后我们的消费者在所有的Queue中都能接收到消息。
1 | (queues = RabbitConfig.FANOUT_QUEUE_A) |
- Headers模式
首先是我们的配置类,这个Header跟上面几种有些不一样
1 |
|
然后是我们的发送者
1 | public void headersSend(String message){ |
最后是我们的接收者
1 | //这个时候我们接收的就是字符数组了,因为我们再发送者的方法中是将消息变成byte[]再作为参数进行入队的 |