SpringBoot-消息队列

发布时间:2023-07-23 08:30

消息 :

  • 消息的发送方
    • 生产者
  • 消息接收方
    • 消费者

同步消息 : 收到响应消息之后才能继续发送

异步消息 : 不需要响应消息就可以继续发送


企业级应用中广泛使用的三种异步消息传递技术

  • JMS: 一个规范 , 消息开发的API
    • \"SpringBoot-消息队列_第1张图片\"
  • AMQP: 一个协议 , 规范了数据的传输格式
    • \"SpringBoot-消息队列_第2张图片\"
  • MQTT:
    • \"image-20220409184718434\"

SpringBoot整合各种消息技术 :


ActiveMQ:

下载安装:

下载地址 : https://activemq.apache.org/components/classic/download/

\"SpringBoot-消息队列_第3张图片\"

  • 第一个是启动服务
  • 第二个是安装服务
  • 第三个是卸载服务

直接双击第一个 , 启动之后 , 访问 http://127.0.0.1:8161/ ,会显示一个管理后台页面 , 默认用户名密码是admin

\"SpringBoot-消息队列_第4张图片\"


使用:

  • 添加依赖:

  • <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-activemqartifactId>
    dependency>
    
  • 配置 yml 文件

  • spring:
      activemq:
        #指定本机的端口号
        broker-url: tcp://localhost:61616
      #指定消息的存储空间
      jms:
        template:
          default-destination: sichen
    
  • 编写实现类 :

  • @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Override
    public void sendMessage(String id) {
        System.out.println(\"待发送短信的订单已经纳入处理队列 , id:\" + id);
        //将id存入消息队列里边 , 先转换在发送
        jmsMessagingTemplate.convertAndSend(\"order.queue.id\",id);
        //指定消息队列 , 相当于key值
    }
    
    @Override
    public String doMessage() {
        //从消息队列里边取出id , 取出并转换 , 指定为String类型的
    String id = jmsMessagingTemplate.receiveAndConvert(\"order.queue.id\",String.class);
        System.out.println(\"以完成短信发送业务 id :\" + id);
        return id;
    }
    
  • 编写监听器 , 设置只要队列中有消息就消费

  • @Component
    //需要被Spring管控 
    public class MessageListener {
        @JmsListener(destination = \"order.queue.id\") 
        //指定监听器操作的位置 , 只要消息队列中有消息 , 就会一直运行
        @SendTo(\"下一个消息队列\")
        //将处理后的返回值 , 发送到下一个消息队列里边
        public void receive(String id){
            //设置消息的处理业务
            System.out.println(\"以完成短信发送业务 id :\" + id);
        }
    }
    

设置发布订阅模型 ,

默认的处理模型是点对点的模型 , 一个消息只能有一个消费者消费 , 或超时

切换模型 . 要在配置文件中设置 , 即可

pub-sub-domain: true ;

RabbitMQ:

下载安装:

RabbitMQ是基于Erlang语言编写的 , 需要安装Erlang

下载地址: https://www.erlang.org/downloads

安装一键式安装 , 安装完毕需要重启 , 需要依赖windows组件

环境变量 :

  • ERLANG_HOME = 安装路径
  • PATH = %ERLANG_HOME%\\bin

安装RabbitMQ

https://rabbitmq.com/install-windows.html


启动:

在安装目录下 , 有一个sbin目录 , 这个目录下就是RabbitMQ的可执行文件

\"SpringBoot-消息队列_第5张图片\"

启动命令 :

注意需要要使用管理员权限启动

rabbitmq-service.bat start

\"image-20220409201126336\"

关闭使用 :

rabbitmq-service.bat stop

\"image-20220409201211099\"

开启管理后台页面 :

在控制台 输入 :可以查看携带的所有插件的状态

rabbitmq-plugins.bat list

使用的是这个插件 :

\"image-20220409201904434\"

启动这个插件 :

rabbitmq-plugins.bat enable rabbitmq_management 

启动之后 , 前边的括号中会有一个E , 如果是这个插件携带的插件 , 前边的括号中是一个 e

RabbitMQ的后台管理端口

  • 服务端口号为 5672 ,
  • 访问地址是http://localhost:15672

用户名和密码是guest


使用direct(直连交换机模式) :

  • 导入坐标 :

  • <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
        
    dependency>
    
  • 编写配置 :

  • rabbitmq:
      host: localhost
      port: 5672
    
  • 需要在编写一个配置类

  • @Configuration
    public class RabbitConfigDriect {
    
        //定义消息队列
        @Bean
        public Queue driectQueue(){
            return new Queue(\"driect queue\");
        }
    
        //定义交换机
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange(\"directExchange\");
        }
    
        //将消息队列与交换机进行绑定 , 可以同时绑定多个队列
        @Bean
        public Binding binding(){
            return BindingBuilder.bind(directExchange()).to(directExchange()).with(\"driect\");
        }
    }
    
  • 编写实现类 :

  • import com.sichen.service.MessageService;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    public class MessageServiceRabbitMQDirectImpl implements MessageService {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Override
        public void sendMessage(String id) {
            //第一个参数: 指定交换机 , 第二个参数:指定交换机和队列绑定键的名称 , 第三个值是你的消息值
            amqpTemplate.convertAndSend(\"directExchange\",\"driect\",id);
        }
    
        @Override
        public String doMessage() {
            return null;
        }
    }
    
  • 编写监听器类 : 实现自动消费队列中的信息 , 使用多个监听器对消息队列进行监听 , 可以实现消息轮循处理(两个监听器交替处理消息)

  • @Component
    public class MessageListener {
        @RabbitListener(queues = \"driect queue\")//需要指定消息队列的名称
        public void receive(){
            
        }
    }
    

使用 Topic (主题交换机模式) :

  • 配置类 :

定义消息队列 : 可以指定绑定键的名称 , 在使用这个队列的时候进行匹配 , 也就是一个消息可以指定多个队列进行消费 ,

\"SpringBoot-消息队列_第6张图片\"

\"SpringBoot-消息队列_第7张图片\"

  • 实现类 : 可以指定对应的消息队列的表达式 , 使消息到不同的消息队列中

  • \"SpringBoot-消息队列_第8张图片\"

  • 监听器类 : 实现指定的消息队列的自动消费

  • \"SpringBoot-消息队列_第9张图片\"


RocketMQ:

下载安装 :

下载地址 : https://rocketmq.apache.org/

安装 :解压缩即可

  • 默认端口号是 : 9876

环境变量 :

  • ROCKETMQ_HOME:
  • PATH
  • NAMESRV_ADDR : 127.0.0.1:9876 (建议配置)

启动命名服务器 :

双击bin目录下的mqnamesrv.cmd文件 , 启动命名服务器

启动broker服务器

双击bin目录下的mqbroker.cmd , 启动broker服务器


  • 不配置这个命名服务器

    • 在每次进行连接的时候 , 生产者和消费者每次都要连接所有的业务服务器 , 这就造成了程序很繁琐 ,
  • \"SpringBoot-消息队列_第10张图片\"

  • 配置这个命名服务器之后

    • 生产者和消费者只用去访问这个命名服务器即可 , 由命名服务器去进行业务服务器的管理
  • \"SpringBoot-消息队列_第11张图片\"


  • 导入依赖 :

    • <dependency>
          <groupId>org.apache.rocketmqgroupId>
          <artifactId>rocketmq-spring-boot-starterartifactId>
          <version>2.2.1version>
          
      dependency>
      
  • 配置yml文件 :

    • rocket: 
        name-server: localhost:9876
        producer:
        	group: group_rocketmq #定义消息消费者的组的信息
      
  • 编写实现类 :

    • @Service
      public class MessageServiceRocketmqImpl implements MessageService {
      
          @Autowired
          private RocketMQTemplate rocketMQTemplate;
      
          @Override
          //同步的消息处理机制 , 不推荐使用
          public void sendMessage(String id) {
              System.out.println(\"待发送短信的订单已经纳入处理队列 , id:\"+id);
              rocketMQTemplate.convertAndSend(\"rocket_id\",id);
          }
          
          @Override
          //我们以后常用的是异步的消息处理
          public void sendMessage(String id) {
              System.out.println(\"待发送短信的订单已经纳入处理队列 , id:\"+id);
              //rocketMQTemplate.convertAndSend(\"rocket_id\",id);
              SendCallback callback = new SendCallback() {
                  @Override
                  //成功执行这个方法
                  public void onSuccess(SendResult sendResult) {
                      System.out.println(\"消息发送成功\");
                  }
      
                  @Override
                  //失败执行这个方法
                  public void onException(Throwable throwable) {
                      System.out.println(\"消息发送失败\");
                  }
              };
              rocketMQTemplate.asyncSend(\"rocket_id\",id,callback);
          }
      }
      
  • 编写监听类 :

    • @Configuration
      @RocketMQMessageListener(consumerGroup = \"group_rocketmq\" , topic = \"rocket_id\")
      //只需要加一个注解即可 : 
      //有两个必填项 :
      //consumerGroup : yml配置文件中定义的组的名称
      //topic : 定义的队列的命名空间
      public class MessageListener implements RocketMQListener<String> {
      
          @Override
          //继承的方法
          public void onMessage(String s) {
          }
      }
      

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号