发布时间:2023-07-23 08:30
同步消息 : 收到响应消息之后才能继续发送
异步消息 : 不需要响应消息就可以继续发送
企业级应用中广泛使用的三种异步消息传递技术
下载地址 : https://activemq.apache.org/components/classic/download/
直接双击第一个 , 启动之后 , 访问 http://127.0.0.1:8161/ ,会显示一个管理后台页面 , 默认用户名密码是admin
添加依赖:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-activemqartifactId>
dependency>
配置 yml 文件
spring:
activemq:
#指定本机的端口号
broker-url: tcp://localhost:61616
#指定消息的存储空间
jms:
template:
default-destination: sichen
编写实现类 :
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Override
public void sendMessage(String id) {
System.out.println(\"待发送短信的订单已经纳入处理队列 , id:\" + id);
//将id存入消息队列里边 , 先转换在发送
jmsMessagingTemplate.convertAndSend(\"order.queue.id\",id);
//指定消息队列 , 相当于key值
}
@Override
public String doMessage() {
//从消息队列里边取出id , 取出并转换 , 指定为String类型的
String id = jmsMessagingTemplate.receiveAndConvert(\"order.queue.id\",String.class);
System.out.println(\"以完成短信发送业务 id :\" + id);
return id;
}
编写监听器 , 设置只要队列中有消息就消费
@Component
//需要被Spring管控
public class MessageListener {
@JmsListener(destination = \"order.queue.id\")
//指定监听器操作的位置 , 只要消息队列中有消息 , 就会一直运行
@SendTo(\"下一个消息队列\")
//将处理后的返回值 , 发送到下一个消息队列里边
public void receive(String id){
//设置消息的处理业务
System.out.println(\"以完成短信发送业务 id :\" + id);
}
}
设置发布订阅模型 ,
默认的处理模型是点对点的模型 , 一个消息只能有一个消费者消费 , 或超时
切换模型 . 要在配置文件中设置 , 即可
pub-sub-domain: true ;
RabbitMQ是基于Erlang语言编写的 , 需要安装Erlang
下载地址: https://www.erlang.org/downloads
安装一键式安装 , 安装完毕需要重启 , 需要依赖windows组件
环境变量 :
安装RabbitMQ
https://rabbitmq.com/install-windows.html
在安装目录下 , 有一个sbin目录 , 这个目录下就是RabbitMQ的可执行文件
启动命令 :
注意需要要使用管理员权限启动
rabbitmq-service.bat start
关闭使用 :
rabbitmq-service.bat stop
开启管理后台页面 :
在控制台 输入 :可以查看携带的所有插件的状态
rabbitmq-plugins.bat list
使用的是这个插件 :
启动这个插件 :
rabbitmq-plugins.bat enable rabbitmq_management
启动之后 , 前边的括号中会有一个E , 如果是这个插件携带的插件 , 前边的括号中是一个 e
RabbitMQ的后台管理端口
用户名和密码是guest
导入坐标 :
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
编写配置 :
rabbitmq:
host: localhost
port: 5672
需要在编写一个配置类
@Configuration
public class RabbitConfigDriect {
//定义消息队列
@Bean
public Queue driectQueue(){
return new Queue(\"driect queue\");
}
//定义交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange(\"directExchange\");
}
//将消息队列与交换机进行绑定 , 可以同时绑定多个队列
@Bean
public Binding binding(){
return BindingBuilder.bind(directExchange()).to(directExchange()).with(\"driect\");
}
}
编写实现类 :
import com.sichen.service.MessageService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageServiceRabbitMQDirectImpl implements MessageService {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String id) {
//第一个参数: 指定交换机 , 第二个参数:指定交换机和队列绑定键的名称 , 第三个值是你的消息值
amqpTemplate.convertAndSend(\"directExchange\",\"driect\",id);
}
@Override
public String doMessage() {
return null;
}
}
编写监听器类 : 实现自动消费队列中的信息 , 使用多个监听器对消息队列进行监听 , 可以实现消息轮循处理(两个监听器交替处理消息)
@Component
public class MessageListener {
@RabbitListener(queues = \"driect queue\")//需要指定消息队列的名称
public void receive(){
}
}
定义消息队列 : 可以指定绑定键的名称 , 在使用这个队列的时候进行匹配 , 也就是一个消息可以指定多个队列进行消费 ,
下载地址 : https://rocketmq.apache.org/
安装 :解压缩即可
环境变量 :
启动命名服务器 :
双击bin目录下的mqnamesrv.cmd文件 , 启动命名服务器
启动broker服务器
双击bin目录下的mqbroker.cmd , 启动broker服务器
不配置这个命名服务器
配置这个命名服务器之后
导入依赖 :
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.1version>
dependency>
配置yml文件 :
rocket:
name-server: localhost:9876
producer:
group: group_rocketmq #定义消息消费者的组的信息
编写实现类 :
@Service
public class MessageServiceRocketmqImpl implements MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
//同步的消息处理机制 , 不推荐使用
public void sendMessage(String id) {
System.out.println(\"待发送短信的订单已经纳入处理队列 , id:\"+id);
rocketMQTemplate.convertAndSend(\"rocket_id\",id);
}
@Override
//我们以后常用的是异步的消息处理
public void sendMessage(String id) {
System.out.println(\"待发送短信的订单已经纳入处理队列 , id:\"+id);
//rocketMQTemplate.convertAndSend(\"rocket_id\",id);
SendCallback callback = new SendCallback() {
@Override
//成功执行这个方法
public void onSuccess(SendResult sendResult) {
System.out.println(\"消息发送成功\");
}
@Override
//失败执行这个方法
public void onException(Throwable throwable) {
System.out.println(\"消息发送失败\");
}
};
rocketMQTemplate.asyncSend(\"rocket_id\",id,callback);
}
}
编写监听类 :
@Configuration
@RocketMQMessageListener(consumerGroup = \"group_rocketmq\" , topic = \"rocket_id\")
//只需要加一个注解即可 :
//有两个必填项 :
//consumerGroup : yml配置文件中定义的组的名称
//topic : 定义的队列的命名空间
public class MessageListener implements RocketMQListener<String> {
@Override
//继承的方法
public void onMessage(String s) {
}
}