SpringBoot 从入门到光头 第十二章 SpringBoot 与消息


SpringBoot 从入门到光头 —— 第十二章 SpringBoot 与消息


1. 概述

  1. 大多数应用中,可通过消息服务中间件来提升系统异步通信、拓展解耦能力

  2. 消息服务中的两个重要概念:

    • 消息代理(Message broker)

    • 目的地(Destination)

      当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地

  3. 消息队列主要有两种形式的目的地:

    1. 队列(Queue): 点对点消息通信(Point-To-Point)
    2. 主题(Topic): 发布(Publish)/订阅(Subscribe)消息通信

1.1. 异步通信

AsynchronousProcessing

1.2. 应用解耦

ApplicationDecoupling

1.3. 流量削峰

FlowPeakClipping

1.4. 点对点式

  • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
  • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者

1.5. 发布订阅式

  • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

1.6. JMS (Java Message Service) JAVA消息服务

  • 基于 JVM 消息代理的规范。ActiveMQ、HornetMQ 是 JMS 实现

1.7. AMQP (Advanced Message Queuing Protocol)

  • 高级消息队列协议,也是一个消息代理的规范,兼容 JMS
  • RabbitMQ是AMQP的实现

1.8. JMS 与 AMQP 的区别

JMS AMQP
定义 Java API 网络线级协议
跨语言
跨平台
Model 提供两种消息模型:
1. Peer-2-Peer
2. Pub / Sub
提供五种消息模型:
1. Direct Exchange
2. Fanout Exchange
3. Topic Exchange
4. Headers Exchange
5. System Exchange
本质来讲,后四种和 JMS 的 Pub / Sub 模型没有太大差别,仅是在路由机制上做了更详细的划分
支持的消息类型 多种消息类型:
1. Text Message
2. Map Message
3. Bytes Message
4. Stream Message
5. Object Message
6. Message(只有消息头和属性)
Byte[]
当实际应用时,有复杂的消息,可以将消息序列化后发送
综合评价 JMS 定义了 Java API 层面的标准;在 Java 体系中,多个 Cient 均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差 AMQP定义了 Wire-Level 层的协议标准:天然具有跨平台、跨语言特性

1.9. Spring 对消息队列的支持

  • Spring-JMS 提供了对 JMS 的支持
  • Spring-Rabbit 提供了对 AMQP 的支持
  • 需要 ConnectionFactory 的实现;来连接消息代理
  • 提供 JmsTemplateRabbitTemplate 来发送消息
  • @JmsListener(JMS)、@RabbitListener(AMQP)、注解在方法上监听消息代理发布的消息
  • @EbableJms@EnableRabbit 开启支持

1.10. SpringBoot 的自动配置

  • JmsAutoConfiuration
  • RabbitAutoConfiguration

2. RabbitMQ 简介

简介:RabbitMQ 是一个由 Erlang 开发的 AMQP(Advanced Message Queuing Protocol)的开源实现

核心概念:

  • Message: 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 Routing-key(路由键)、Priority(相对于其他消息的优先权)、Delivery-mode(指出消息可能需要持久性储存)等。
  • Publisher: 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange: 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange 有4种类型:Direct(默认)、Fanout、Topic 和 Headers,不同类型的 Exchange 转发消息的策略有所区别。
  • Queue: 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列中。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Binding: 绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机的消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定路由构成的路由表。
  • Connection: 网络连接,比如一个 TCP 连接。
  • Channel: 信道。多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMOP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer: 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host: 虛拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 VHost 本质上就是一个 Mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。VHost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 VHost是 /
  • Broker: 表示消息队列服务器实体

RabbitMQBasicFramework

3. RabbitMQ 的运行机制

AMQP 中的消息路由

  • AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 ExchangeBinding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到哪个队列。

RabbitMQBasicFramework

3.1. Exchange 类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型: Direct、Fanout、Topic、Headers 。Headers 匹配 AMQP 消息的 Header 而不是路由键,Headers 交换器和 Direct 交换器完全一致,但性能差很多,目前几乎不会用到。

3.1.1. Direct Exchange

消息中的路由键(Routing key)如果和 Binding 中的 Binging key 一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换器的要求路由键为 dog,则只转发 Routing key 标记为 dog 的信息,不会转发 dog.puppy,也不会转发 dog.guard 等等。它是完全匹配,单播的模式

DirectExchange

3.1.2. Fanout Exchange

每个 Fanout 类型交换器的消息都会分到所有绑定的队列上去。Fanout 交换器不处理路由键,只是简单地将队列绑定到交换器上,每个发送的交换器的消息都会被转发到与该交换器绑定的所有队列上。类似于子网广播,美泰滋网内的主机都获得了一份复制的信息。Fanout 类型转发消息是最快的。

FanoutExchange

3.1.3. Topic Exchange

Topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词, 这些单词之间用点隔开 。它同样也会识别两个通配符:符号 # 和符号 *# 匹配 0 或者是多个单词,* 匹配一个单词。

TopicExchange

4. RabbitMQ 整合

  1. 部署 RabbitMQ 环境(使用 Docker 部署)

  2. 引入 spring-boot-starter-amqp

    pom.xml

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. application.yaml 配置

    application.yaml

    spring:
      rabbitmq:
        host: 127.0.0.1
        username: guest
        password: guest
        port: 5672
    
  4. 测试 RabbitMQ

    1. AmqpAdmin管理组件(创建和删除 QueueExchangeBinding
    2. RabbitTemplate消息发送处理组件

自动配置:

  1. 自动配置类: RabbitAutoConfiguration
  2. 自动配置连接工厂:ConnectionFactory
  3. RabbitProperties封装了 RabbitMQ 的配置
  4. AmqpAdminRabbitMQ 系统管理功能组件

示例代码:

com.yourname.amqp.config.MyAmqpConfig

/**
 * @author gregPerlinLi
 * @since 2022-01-19
 */
@Configuration
public class MyAmqpConfig &#123;

    @Bean
    public MessageConverter messageConverter() &#123;
        return new Jackson2JsonMessageConverter();
    &#125;
&#125;

com.yourname.amqp.bean.Book


/**
 * @author gregPerlinLi
 * @since 2022-01-19
 */
public class Book &#123;
    private String bookName;
    private String author;

    public Book() &#123;
    &#125;
    public Book(String bookName, String author) &#123;
        this.bookName = bookName;
        this.author = author;
    &#125;
    public String getBookName() &#123;
        return bookName;
    &#125;
    public void setBookName(String bookName) &#123;
        this.bookName = bookName;
    &#125;
    public String getAuthor() &#123;
        return author;
    &#125;
    public void setAuthor(String author) &#123;
        this.author = author;
    &#125;
    @Override
    public String toString() &#123;
        return "Book&#123;" +
                "bookName='" + bookName + '\'' +
                ", author='" + author + '\'' +
                '&#125;';
    &#125;
&#125;

com.yourname.amqp.service.BookService

/**
 * @author gregPerlinLi
 * @since 2022-01-19
 */
public interface BookService &#123;
    /**
     * receive book
     *
     * @param book book
     */
    void receive(Book book);
    /**
     * receive message
     *
     * @param message message
     */
    void receive2(Message message);
&#125;

com.yourname.amqp.service.impl.BookServiceImpl

/**
 * @author gregPerlinLi
 * @since 2022-01-19
 */
@Service
public class BookServiceImpl implements BookService &#123;
    @Override
    @RabbitListener(queues = "example.news")
    public void receive(Book book) &#123;
        System.out.println("Receive message: " + book);
    &#125;
    @Override
    @RabbitListener(queues = "example")
    public void receive2(Message message) &#123;
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    &#125;
&#125;

com.yourname.amqp.AmqpApplication

/**
 * Auto configuration<br/>
 * 1. &#123;@code RabbitAutoConfiguration&#125;<br/>
 * 2. The connection factory &#123;@code ConnectionFactory&#125; is automatically configured<br/>
 * 3. &#123;@code RabbitProperties&#125;: Encapsulates all configurations of RabbitMQ<br/>
 * 4. &#123;@code RabbitTemplate&#125;: Send and receive messages to RabbitMQ<br/>
 * 5. &#123;@code AmqpAdmin&#125;: System management component of RabbitMQ<br/>
 * 6. &#123;@code @EnableRabbit&#125; + &#123;@code RabbitListener&#125; Listen for message queue content
 *
 * &#123;@code @EnableRabbit&#125; Enable annotation based RabbitMQ<br/>
 * @author gregperlinli
 */
@EnableRabbit
@SpringBootApplication
public class AmqpApplication &#123;

    public static void main(String[] args) &#123;
        SpringApplication.run(AmqpApplication.class, args);
    &#125;

&#125;

com.yourname.amqp.test.AmqpTest

/**
 * @author gregperlinli 
 */
@SpringBootTest
class AmqpTests &#123;
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Autowired
    AmqpAdmin amqpAdmin;

    /**
     * 1. Unicast (Point to point)
     */
    @Test
    void contextLoads() &#123;
        // Messages need to be constructed by yourself, Define message body content and message header
        // rabbitTemplate.send(exchange, routeKey, message);

        // Object is the message body by default, Just pass the object to be sent, automatically serialize, save and send it to RabbitMQ
        // rabbitTemplate.convertAndSend(exchange, routeKey, object);

        Map<String, Object> map = new HashMap<>();
        map.put("msg", "This is the first message");
        map.put("data", Arrays.asList("Hello World", 123, true));
        // The object is serialized by default and sent out
        // rabbitTemplate.convertAndSend("exchange.direct", "example.news", map);
        rabbitTemplate.convertAndSend("exchange.direct", "example.news", new Book("Book1", "author1"));
    &#125;
    /**
     * Receive data
     */
    @Test
    void receive() &#123;
        Object o = rabbitTemplate.receiveAndConvert("example.news");
        System.out.println(o.getClass());
        System.out.println(o);
    &#125;
    /**
     * Broadcast
     */
    @Test
    void sendMsg() &#123;
        rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("Book2", "author2"));
    &#125;
    @Test
    void createTest() &#123;
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        System.out.println("Declare exchange completed...");

        amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
        System.out.println("Declare queue completed...");

        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.hello", null));
        System.out.println("Declare binding completed...");
    &#125;
    @Test
    void deleteTest() &#123;
        amqpAdmin.removeBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.hello", null));
        System.out.println("Remove bing completed...");

        amqpAdmin.deleteQueue("amqpadmin.queue");
        System.out.println("Delete queue completed...");

        amqpAdmin.deleteExchange("amqpadmin.exchange");
        System.out.println("Delete exchange completed...");
    &#125;
&#125;


文章作者: gregPerlinLi
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 gregPerlinLi !
  目录