查看原文
其他

SpringBoot中使用RabbitMQ

搜云库 搜云库技术团队 2019-04-07

RabbitMQ 简介

RabbitMQ是一个开源的 AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: PythonRuby、.NETJavaJMSCPHPActionScriptXMPPSTOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP,即 AdvancedmessageQueuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: PythonRuby、.NETJavaJMSCPHPActionScriptXMPPSTOMP等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

常用概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者, RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 ( Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

准备

环境安装

任选其一

CentOs7.3 搭建 RabbitMQ 3.6 单机服务与使用

http://www.ymq.io/2017/08/16/rabbit-install/

CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务与使用

http://www.ymq.io/2017/08/17/rabbit-install-cluster/

Github 代码

代码我已放到 Github ,导入 spring-boot-rabbitmq 项目

github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

添加依赖

在项目中添加 spring-boot-starter-amqp 依赖

  1. <dependency>

  2.    <groupId>org.springframework.boot</groupId>

  3.    <artifactId>spring-boot-starter-amqp</artifactId>

  4. </dependency>

参数配置

  1. spring.application.name=ymq-rabbitmq-spring-boot

  2. spring.rabbitmq.host=10.4.98.15

  3. spring.rabbitmq.port=5672

  4. spring.rabbitmq.username=admin

  5. spring.rabbitmq.password=admin

交换机(Exchange)

1.Direct Exchange 根据route key 直接找到队列
2.Topic Exchange 根据route key 匹配队列
3.Topic Exchange 不处理route key 全网发送,所有绑定的队列都发送

Direct Exchange

DirectExchange 是 RabbitMQ默认的交换机模式,也是最简单的模式,根据 key全文匹配去寻找队列。

任何发送到 DirectExchange的消息都会被转发到 RouteKey中指定的 Queue

1.一般情况可以使用 rabbitMQ自带的 Exchange""(该 Exchange的名字为空字符串,下文称其为 defaultExchange)。
2.这种模式下不需要将 Exchange进行任何绑定( binding)操作
3.消息传递时需要一个 RouteKey,可以简单的理解为要发送到的队列名字。
4.如果 vhost中不存在 RouteKey中指定的队列名,则该消息会被抛弃。

配置队列

  1. @Configuration

  2. public class RabbitDirectConfig {

  3.    @Bean

  4.    public Queue helloQueue() {

  5.        return new Queue("hello");

  6.    }

  7.    @Bean

  8.    public Queue directQueue() {

  9.        return new Queue("direct");

  10.    }

  11.    //-------------------配置默认的交换机模式,可以不需要配置以下-----------------------------------

  12.    @Bean

  13.    DirectExchange directExchange() {

  14.        return new DirectExchange("directExchange");

  15.    }

  16.    //绑定一个key "direct",当消息匹配到就会放到这个队列中

  17.    @Bean

  18.    Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {

  19.        return BindingBuilder.bind(directQueue).to(directExchange).with("direct");

  20.    }

  21.    // 推荐使用 helloQueue() 方法写法,这种方式在 Direct Exchange 模式 多此一举,没必要这样写

  22.    //---------------------------------------------------------------------------------------------

  23. }

监听队列

  1. @Component

  2. @RabbitListener(queues = "hello")

  3. public class helloReceiver {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 helloReceiver," + message);

  7.    }

  8. }

  1. @Component

  2. @RabbitListener(queues = "direct")

  3. public class DirectReceiver {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 DirectReceiver," + message);

  7.    }

  8. }

发送消息

  1. package io.ymq.rabbitmq.test;

  2. import io.ymq.rabbitmq.run.Startup;

  3. import org.junit.Test;

  4. import org.junit.runner.RunWith;

  5. import org.springframework.amqp.core.AmqpTemplate;

  6. import org.springframework.beans.factory.annotation.Autowired;

  7. import org.springframework.boot.test.context.SpringBootTest;

  8. import org.springframework.test.context.junit4.SpringRunner;

  9. /**

  10. * 描述: 默认的交换机模式

  11. *

  12. * @author: yanpenglei

  13. * @create: 2017/10/25 1:03

  14. */

  15. @RunWith(SpringRunner.class)

  16. @SpringBootTest(classes = Startup.class)

  17. public class RabbitDirectTest {

  18.    @Autowired

  19.    private AmqpTemplate rabbitTemplate;

  20.    @Test

  21.    public void sendHelloTest() {

  22.        String context = "此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到";

  23.        String routeKey = "hello";

  24.        context = "routeKey:" + routeKey + ",context:" + context;

  25.        System.out.println("sendHelloTest : " + context);

  26.        this.rabbitTemplate.convertAndSend(routeKey, context);

  27.    }

  28.    @Test

  29.    public void sendDirectTest() {

  30.        String context = "此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到";

  31.        String routeKey = "direct";

  32.        String exchange = "directExchange";

  33.        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

  34.        System.out.println("sendDirectTest : " + context);

  35.        // 推荐使用 sendHello() 方法写法,这种方式在 Direct Exchange 多此一举,没必要这样写

  36.        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);

  37.    }

  38. }

按顺序执行:响应

  1. 接收者 helloReceiver,routeKey:hello,context:此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到

  2. 接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到

Fanout Exchange

任何发送到 FanoutExchange 的消息都会被转发到与该 Exchange绑定 (Binding)的所有 Queue

1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将 Exchange与 Queue进行绑定,一个 Exchange可以绑定多个 Queue,一个 Queue可以同多个 Exchange进行绑定。
4.如果接受到消息的 Exchange没有与任何 Queue绑定,则消息会被抛弃。

配置队列

  1. @Configuration

  2. public class RabbitFanoutConfig {

  3.    final static String PENGLEI = "fanout.penglei.net";

  4.    final static String SOUYUNKU = "fanout.souyunku.com";

  5.    @Bean

  6.    public Queue queuePenglei() {

  7.        return new Queue(RabbitFanoutConfig.PENGLEI);

  8.    }

  9.    @Bean

  10.    public Queue queueSouyunku() {

  11.        return new Queue(RabbitFanoutConfig.SOUYUNKU);

  12.    }

  13.    /**

  14.     * 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。

  15.     */

  16.    @Bean

  17.    FanoutExchange fanoutExchange() {

  18.        return new FanoutExchange("fanoutExchange");

  19.    }

  20.    @Bean

  21.    Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {

  22.        return BindingBuilder.bind(queuePenglei).to(fanoutExchange);

  23.    }

  24.    @Bean

  25.    Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {

  26.        return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);

  27.    }

  28. }

监听队列

  1. @Component

  2. @RabbitListener(queues = "fanout.penglei.net")

  3. public class FanoutReceiver1 {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 FanoutReceiver1," + message);

  7.    }

  8. }

  1. @Component

  2. @RabbitListener(queues = "fanout.souyunku.com")

  3. public class FanoutReceiver2 {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 FanoutReceiver2," + message);

  7.    }

  8. }

发送消息

  1. package io.ymq.rabbitmq.test;

  2. import io.ymq.rabbitmq.run.Startup;

  3. import org.junit.Test;

  4. import org.junit.runner.RunWith;

  5. import org.springframework.amqp.core.AmqpTemplate;

  6. import org.springframework.beans.factory.annotation.Autowired;

  7. import org.springframework.boot.test.context.SpringBootTest;

  8. import org.springframework.test.context.junit4.SpringRunner;

  9. /**

  10. * 描述: 广播模式或者订阅模式队列

  11. *

  12. * @author: yanpenglei

  13. * @create: 2017/10/25 1:08

  14. */

  15. @RunWith(SpringRunner.class)

  16. @SpringBootTest(classes = Startup.class)

  17. public class RabbitFanoutTest {

  18.    @Autowired

  19.    private AmqpTemplate rabbitTemplate;

  20.    @Test

  21.    public void sendPengleiTest() {

  22.        String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";

  23.        String routeKey = "topic.penglei.net";

  24.        String exchange = "fanoutExchange";

  25.        System.out.println("sendPengleiTest : " + context);

  26.        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

  27.        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);

  28.    }

  29.    @Test

  30.    public void sendSouyunkuTest() {

  31.        String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";

  32.        String routeKey = "topic.souyunku.com";

  33.        String exchange = "fanoutExchange";

  34.        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

  35.        System.out.println("sendSouyunkuTest : " + context);

  36.        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);

  37.    }

  38. }

按顺序执行:响应

  1. 接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到

  2. 接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到

  3. 接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到

  4. 接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到

Topic Exchange

任何发送到 TopicExchange的消息都会被转发到所有关心 RouteKey中指定话题的 Queue

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个 标题``(RouteKey), Exchange会将消息转发到所有关注主题能与 RouteKey模糊匹配的队列。
2.这种模式需要 RouteKey,也许要提前绑定 Exchange与 Queue
3.在进行绑定时,要提供一个该队列关心的主题,如 #.log.#表示该队列关心所有涉及log的消息(一个RouteKey为 MQ.log.error的消息会被转发到该队列)。
4. #表示0个或若干个关键字, *表示一个关键字。如 topic.*能与 topic.warn匹配,无法与 topic.warn.timeout匹配;但是 topic.#能与上述两者匹配。
5.同样,如果 Exchange没有发现能够与 RouteKey匹配的 Queue,则会抛弃此消息。

配置队列

  1. @Configuration

  2. public class RabbitTopicConfig {

  3.    final static String MESSAGE = "topic.message";

  4.    final static String MESSAGES = "topic.message.s";

  5.    final static String YMQ = "topic.ymq";

  6.    @Bean

  7.    public Queue queueMessage() {

  8.        return new Queue(RabbitTopicConfig.MESSAGE);

  9.    }

  10.    @Bean

  11.    public Queue queueMessages() {

  12.        return new Queue(RabbitTopicConfig.MESSAGES);

  13.    }

  14.    @Bean

  15.    public Queue queueYmq() {

  16.        return new Queue(RabbitTopicConfig.YMQ);

  17.    }

  18.    /**

  19.     * 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息

  20.     */

  21.    @Bean

  22.    TopicExchange topicExchange() {

  23.        return new TopicExchange("topicExchange");

  24.    }

  25.    //綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受完全匹配 topic.message 的队列接受者可以收到消息

  26.    @Bean

  27.    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {

  28.        return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");

  29.    }

  30.    //綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者可以收到消息

  31.    @Bean

  32.    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {

  33.        return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");

  34.    }

  35.    //綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者可以收到消息

  36.    @Bean

  37.    Binding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {

  38.        return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");

  39.    }

  40. }

监听队列

  1. @Component

  2. @RabbitListener(queues = "topic.message")

  3. public class TopicReceiver1 {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 TopicReceiver1," + message);

  7.    }

  8. }

  1. @Component

  2. @RabbitListener(queues = "topic.message.s")

  3. public class TopicReceiver2 {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 TopicReceiver2," + message);

  7.    }

  8. }

  1. @Component

  2. @RabbitListener(queues = "topic.ymq")

  3. public class TopicReceiver3 {

  4.    @RabbitHandler

  5.    public void process(String message) {

  6.        System.out.println("接收者 TopicReceiver3," + message);

  7.    }

  8. }

发送消息

  1. package io.ymq.rabbitmq.test;

  2. import io.ymq.rabbitmq.run.Startup;

  3. import org.junit.Test;

  4. import org.junit.runner.RunWith;

  5. import org.springframework.amqp.core.AmqpTemplate;

  6. import org.springframework.beans.factory.annotation.Autowired;

  7. import org.springframework.boot.test.context.SpringBootTest;

  8. import org.springframework.test.context.junit4.SpringRunner;

  9. /**

  10. * 描述: 配置转发消息模式队列

  11. *

  12. * @author: yanpenglei

  13. * @create: 2017/10/25 1:20

  14. */

  15. @RunWith(SpringRunner.class)

  16. @SpringBootTest(classes = Startup.class)

  17. public class RabbitTopicTest {

  18.    @Autowired

  19.    private AmqpTemplate rabbitTemplate;

  20.    @Test

  21.    public void sendMessageTest() {

  22.        String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到";

  23.        String routeKey = "topic.message";

  24.        String exchange = "topicExchange";

  25.        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

  26.        System.out.println("sendMessageTest : " + context);

  27.        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);

  28.    }

  29.    @Test

  30.    public void sendMessagesTest() {

  31.        String context = "此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到";

  32.        String routeKey = "topic.message.s";

  33.        String exchange = "topicExchange";

  34.        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

  35.        System.out.println("sendMessagesTest : " + context);

  36.        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);

  37.    }

  38.    @Test

  39.    public void sendYmqTest() {

  40.        String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到";

  41.        String routeKey = "topic.ymq";

  42.        String exchange = "topicExchange";

  43.        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

  44.        System.out.println("sendYmqTest : " + context);

  45.        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);

  46.    }

  47. }

按顺序执行:响应

  1. 接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到

  2. 接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到

  3. 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到

  4. 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到

  5. 接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到

  6. 接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到

代码我已放到 Github ,导入 spring-boot-rabbitmq 项目

github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

Contact

  • 作者:鹏磊

  • 出处:http://www.ymq.io/2017/10/26/rabbitmq-spring-boot-example

  • Email:admin@souyunku.com

  • 版权归作者所有,转载请注明出处

  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存