微服务之RabbitMQ总结(spring整合RabbitMQ)

发布时间:2023-01-02 08:00

添加依赖


    org.springframework.boot
    spring-boot-starter-amqp

核心配置文件(链接虚拟机中的RabbitMQ):

spring:
  rabbitmq:
    host: 192.168.64.140
    port: 5672
    username: admin
    password: admin

简单模式:

Main:

package com.tedu.rabbitmqspring.m1;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
    @Bean
    public Queue helloworldQueue(){
        return new Queue("helloworld11",true,false,false);
    }
    @Autowired
    private Producer producer;
    @PostConstruct
    public void qwe(){
        producer.send();
    }
}

1. 完成内容:创建队列

2.调用producer方法提交消息

创建队列如果不填写2,3,4参数,则默认队列属性为:持久化,不排它,不自动删除

@postconstruct可能第一次没有执行到,需要再次启动

producer:

package com.tedu.rabbitmqspring.m1;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send() {
        amqpTemplate.convertAndSend("helloworld11","123");
    }
}

1.把此类对象交给spring进行管理

2.注入AmqpTemplate

Consummer:

package com.tedu.rabbitmqspring.m1;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloworld11")
public class Consummer {
    @RabbitHandler
    public void receive(String s){
        System.out.println("收到:"+s);
    }
}

1.@RabbitListener(queues = "队列名")

2.@RabbitHandler 

工作模式:

Main:

package com.tedu.rabbitmqspring.m2;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
    @Bean
    public Queue helloworldQueue(){

        return new Queue("helloworld12",true,false,false);
    }
    @Autowired
    private Producer producer;
    @PostConstruct
    public void qwe(){
        new Thread(()->producer.send()).start();
    }
}

代码同上,

创建了一个新的线程:

new Thread(()->方法).start(); 

producer:

package com.tedu.rabbitmqspring.m2;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send() {
        while (true) {
            System.out.println("请输入想要发送的消息:");
            String s = new Scanner(System.in).nextLine();
            amqpTemplate.convertAndSend("helloworld12", s);
        }

    }
}

持久化:1.不写队列参数,spring默认产生持久化队列

             2.消息默认持久化

Consummer:

package com.tedu.rabbitmqspring.m2;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloworld12")
public class Consummer {
    @RabbitListener(queues = "helloworld12")
    public void receive1(String s){

        System.out.println("消费者2收到:"+s);
    }
    @RabbitListener(queues = "helloworld12")
    public void receive2(String s){

        System.out.println("消费者1收到:"+s);
    }
}

合理分发:1.spring接收消息默认开启手动回执

                2.Qos spring默认250 可以手动配置 prefetch = 1 

 发布订阅模式:

Main:

package com.tedu.rabbitmqspring.m3;

import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
//    @Bean
//    public Queue helloworldQueue(){
//
//        return new Queue("helloworld12",true,false,false);
//    }
    @Bean
    public FanoutExchange logsExchange(){
        return new FanoutExchange("logs1",false,false);
    }
    @Autowired
    private Producer producer;
    @PostConstruct
    public void qwe(){
        new Thread(()->producer.send()).start();
    }
}

 1.创建fanoutExchange 交换机并交给spring容器进行管理

producer:

package com.tedu.rabbitmqspring.m3;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send() {
        while (true) {
            System.out.println("请输入想要发送的消息:");
            String s = new Scanner(System.in).nextLine();
            amqpTemplate.convertAndSend("logs1","", s);
        }

    }
}

发布信息其他不变,参数编程三个:第一个参数为交换机名称 

Consummer:

package com.tedu.rabbitmqspring.m3;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consummer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "logs1",declare = "false")
    ))
    public void receive1(String s){

        System.out.println("消费者1收到:"+s);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "logs1",declare = "false")
    ))
    public void receive2(String s){

        System.out.println("消费者2收到:"+s);
    }
}

 发布订阅模式需要改变@RabbitListener的参数

@RabbitListener(bindings=@QueueBinding(value=@queue,exchange=@Exchange(name="交换机名称",declare="是否新建交换机")))

 

 

 

 

 

 

 

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号