SpringBoot整合RabbitMQ以及四种交换模式

什么是消息队列

在google的时候发现知乎一个很好的回答,很生动形象。

作者:ScienJus
原文链接

个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。使用场景的话,举个例子:假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:校验用户名等信息,如果没问题会在数据库中添加一个用户记录如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他发送给用户一个包含操作指南的系统通知等等……但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。

我理解的消息队列就是当你的一个接口访问量大或者需要进行的操作很多,就比如注册这些功能(里面有发邮件,插入数据各种操作),但是用户只需要知道是否注册成功,他们需要马上收到注册结果(提高用户体验性),所以当用户的信息插入到数据库中我们就可以直接返回结果,至于后面的操作可以放入消息队列异步慢慢处理。

SpringBoot整合RabbitMQ

  • maven配置

    首先当然是maven配置了,springboot中为我们提供了amqp的start,amqp是一个消息队列的协议,RabbitMQ实现了这个协议。

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • springboot配置文件的配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
    simple:
    concurrency: 10
    max-concurrency: 10
    prefetch: 1
    auto-startup: true
    default-requeue-rejected: true
    template:
    retry:
    enabled: true
    initial-interval: 1000ms
    max-attempts: 3
    max-interval: 10000ms
    multiplier: 1.0

    除了host,port什么的其他都可以不写,springboot已经默认帮我们配置好了。

  • RabbitMQ的配置类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Configuration
    public class RabbitConfig {

    public static final String QUEUE = "queue";

    @Bean
    public Queue queue(){
    return new Queue(QUEUE,true);
    }

    }
  • 消息发送者(提供者)

    1
    2
    3
    4
    5
    public void send(Object message){
    logger.info("send");
    //标注需要发送的某个队列
    amqpTemplate.convertAndSend(RabbitConfig.QUEUE,message);
    }
  • 消息接收者(消费者)

    1
    2
    3
    4
    5
    6
    //@RabbitListener注解需要标注刚刚我们的队列名
    //其作用就是监听那个队列是否有消息,有消息则接收
    @RabbitListener(queues = RabbitConfig.QUEUE)
    public void receive(String message){
    logger.info("receive:"+message);
    }

RabbitMQ的四种模式

RabbitMQ分为四种模式

* Direct模式
* Topic模式
* Fanout模式
* Headers模式
  • Direct模式(直接交换模式)

    刚刚我们的配置类,接受者和发送者实现的就是Direct模式,其流程就是发送者指定一个Queue来发送消息给那个队列,然后消费者一直监听那个队列,有消息则接收

  • Topic模式

首先是Topic的配置类,这里定义两个Queue,并且生成一个TopicExchange(Topic交换机)。

然后我们通过Binding来将A和B的消息队列绑定到我们刚刚生成的TopicExchange和指定routingKey中

RabbitMQ的topic模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Bean
public Queue topicQueueA(){
return new Queue(TOPIC_QUEUE_A, true);
}

@Bean
public Queue topicQueueB(){
return new Queue(TOPIC_QUEUE_B, true);
}

@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}

@Bean
public Binding topicBindingA(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with("topic.keyA");
}

@Bean
public Binding topicBindingB(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with("topic.#");
}

然后是我们的消息发送者

1
2
3
4
5
6
7
8
9
public void topicSend(Object message){
logger.info("topic send");
//我们指定某个routingkey发送到这个交换机上
//然后这个topic交换机会将我们的routingKey审核
//如果这个key和某个队列绑定的key匹配那么这个消息就会发送到这个队列里
//在队列绑定routingKey的时候可以使用#,*这些通配符,所以就会出现我发送一个消息,两个队列都收到了,我发送两个消息都是一个队列收到等情况,如图所示。
amqpTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "topic.keyA", message);
amqpTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "topic.keyB", message);
}

最后是我们的消息消费者(接收者)

1
2
3
4
5
6
7
8
9
10
//接收很简单,只要指定队列就行
@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_A)
public void topicReceiveA(String message){
logger.info("receive:" + message);
}

@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_B)
public void topicReceiveB(String message){
logger.info("receive:" + message);
}
  • Fanout模式

首先是Fanout的配置类,跟上面的topic查不到,我们先配置一个FanoutExchange类,然后通过Binding将队列绑定到那个Exchange中,我们可以绑定多个队列到Exchange中。

RabbitMQ的topic模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Bean
public Queue fanoutQueueA(){
return new Queue(FANOUT_QUEUE_A, true);
}

@Bean
public Queue fanoutQueueB(){
return new Queue(FANOUT_QUEUE_B, true);
}

@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}

@Bean
public Binding fanoutBindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}

@Bean
public Binding fanoutBindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}

然后就是我们的发送者

1
2
3
4
5
//我们只要将消息发送给交换机就好
public void fanoutSend(Object message){
logger.info("fanout send");
amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", message);
}

最后是我们的消费者,这时候我们会发现这个Fanout模式就是类似于广播模式,我们将所有需要的队列都绑定上Fanout的Exchange中,然后我们发送者只需要将消息发送给Exchange,然后我们的消费者在所有的Queue中都能接收到消息。

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_A)
public void fanoutReceiveA(String message){
logger.info("fanoutA receive:"+message);
}

@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_B)
public void fanoutReceiveB(String message){
logger.info("fanoutB receive:"+message);
}
  • Headers模式

首先是我们的配置类,这个Header跟上面几种有些不一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public Queue headersQueue(){
return new Queue(HEADERS_QUEUE);
}

@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADERS_EXCHANGE);
}

//前面两个和上述三种都差不多,这里的Binding有些不一样
@Bean
public Binding headersBinding(){
Map<String, Object> map = new HashMap<>(5);
map.put("HeadersAKey","HeadersAValue");
map.put("HeadersBKey","HeadersBValue");
//我们需要创建一个map存入键值对,然后绑定队列到交换机上,并且有个要求就是where什么什么加入map需要匹配,我们先看消息发送者
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
}

然后是我们的发送者

1
2
3
4
5
6
7
8
9
10
11
12
public void headersSend(String message){
logger.info("headers send");
//这里的发送者的发送方法需要的是一个交换机名称,还有一个就是Message对象
//这个Message对象存入了真正消息的字符数组和消息的配置类,这里的消息配置类就是MessageProperties
//我们需要对这个MessageProperties设置头,这个头就是键值对,就是我们配置类设置的map
//我们发送消息给队列,这个消息的头被设置为我配置类书写的那两个map才行,因为我设置了whereAll.match,所以我发送消息给交换机,这个交换机会帮我去匹配请求头和我设置的符合的Queue并把消息存入那个队列中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("HeadersAKey","HeadersAValue");
messageProperties.setHeader("HeadersBKey","HeadersBValue");
Message headersMessage = new Message(message.getBytes(),messageProperties);
amqpTemplate.convertAndSend(RabbitConfig.HEADERS_EXCHANGE, "", headersMessage);
}

最后是我们的接收者

1
2
3
4
5
//这个时候我们接收的就是字符数组了,因为我们再发送者的方法中是将消息变成byte[]再作为参数进行入队的
@RabbitListener(queues = RabbitConfig.HEADERS_QUEUE)
public void headersReceive(byte[] message){
logger.info("headers receive:"+new String(message));
}
-------------本文结束感谢阅读-------------