发布时间:2024-02-04 09:00
要实现MQ消息队列的话我们需要准备两个项目(只用于学习演示),一个消息发送者,一个消息接收者。
前期准备:第一步需要安装Erlang和MQ软件,如果不安装Erlang的话我们的MQ是无法安装的,我是在win10下面给大家演示的,我下面给大家准备好了两个安装包,直接下载下来就可以使用了,当然也可以自己去官网下载,但是下载下来一定要注意一下版本对应,不然下载下来Erlang安装好了安装MQ的时候就提示错误(版本太低),如果自己已经下载安装完了发现版本对不上的话就找到安装目录,双击下图选项Uninstall.exe删除,不要使用系统自带删除或者360删除,不然到时候重新安装新版本的Erlang一样还是会报错。(最后提示一下:如果你已经自己安装好了Erlang报错版本太低的话并且已经用360获取其他方式删除了,那么你在将旧版本重新安装一遍,在使用上面正确的方式删除一遍就可以了,或者麻烦点去删注册表了)
下面是给大家准备的安装包,直接下载就好,提取码 :AAAA 忘记是3个A还是4个A了,自己都试一下吧
百度网盘 请输入提取码
第一步:
上面安装完成以后就自行启动MQ了,这里就不演示了,启动完成以后打开浏览器输入
http://127.0.0.1:15672/ 访问MQ管理器,默认账号密码都是guest
第二步:项目里面导入我们下面的依赖包
junit
junit
org.springframework
spring-test
4.3.18.RELEASE
compile
com.alibaba
fastjson
1.2.79
org.springframework.boot
spring-boot-test
org.springframework.boot
spring-boot-starter-data-redis
com.rabbitmq
amqp-client
5.4.3
compile
com.rabbitmq
http-client
2.1.0.RELEASE
compile
true
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-task
第三步:准备代码
package com.example.demo.util;
import com.example.demo.entity.MqEbtity;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* MQ链接工具类
*/
public class MqUtil {
public static Channel channelPas = null;
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
channelPas = channel;
return channel;
}
}
准备消息发布者
package com.example.demo.util;
import java.util.Scanner;
import com.rabbitmq.client.*;
import org.springframework.stereotype.Service;
/**
* 消息发布者
* @title MessagePublisher
* @author HayDen
* @date 2022年09月2日
* @since v1.0.0
*/
@Service
public class MessagePublisher {
public static void main(String[] args) throws Exception {
//初始化消息配置类
Channel channel = MqUtil.getChannel();
/**
* 1、队列名称
* 2、队列是否持久化(磁盘)。默认情况下消息存储在内存中,当此参数为false时我们MQ出现宕机的话队列里面的消息队列将会丢失,所以我们一般给true就行,原先信道里面的消息如果是持久化的话我们改为非持久化会出现异常的,所以我们只能重新创建新的信道。
* 3、改列队只提供一个消费者消费,是否进行消息共享,true可以多个消费者消费 false只能一个消费者消费
* 4、是否自动删除、最后一个消费者断开链接以后 该队是否自动删除 true 自动删除 false 不自动删除
* 5、表示其他参数
*/
channel.queueDeclare("MQ111", true, false, false,null);
//这个表示我们在控制台输入消息内容发送
Scanner scanner = new Scanner(System.in);
//给个死循环吧,我们要一直发
while (true){
String msg = scanner.next();
/**
* 下面开始发消息
* 1、需要发生到那个交换机
* 2、路由的key值是那个 本次队列的名称
* 3、其他参数
* 4、发生消息的消息体
*
*/
channel.basicPublish("", "MQ111", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
System.out.printf("消息发生成功"+msg+"\n");
}
}
}
当我们消息发送完成以后这里面会显示我们发送消息数量的,可以去控制台查看
创建了那些信道
那些消息是持久化的
下面准备消费者
package com.example.demo.config;
import com.example.demo.util.MqUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* MQ消费者
*/
public class Consumer {
public static void main(String[] args) throws Exception{
//初始化消息配置类
Channel channel = MqUtil.getChannel();
//声明 消息的接收
DeliverCallback deliverCallback = (consumerTag, message) ->{
//将消息转换成一个字符串对象(不然消息就是一个地址)
System.out.printf(new String(message.getBody())+":接收到的消息 \n");
};
//取消消息的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.printf("当消息被中断");
};
System.out.printf("C1消息接收中.........");
/**
* 1、消费那个队列
* 2、消费成功以后是否要自动应答 true 自动应答 false表示手动应答
* 3、消费未成功消费的回调
* 4、消费者录取消费的回调
*/
channel.basicConsume("MQ111", true, deliverCallback, cancelCallback);
}
}
上面我们创建了一个简易版的消息发送和处理,下面我们将讲解消息自动应答和手动应答:
消费成功以后是否要自动应答 true 自动应答 false表示手动应答,自动应答的话可以快速处理消息,但是消息如果代码出错了消息将会丢失,但是手动应答不一样,手动应答必须要告诉MQ已经处理完了才会将消息删除掉,否则服务挂了消息将会被自动放回到消息队列,如果服务重新启动或者其他可以处理这个消息的服务还可以重新处理消息 。
package com.example.demo.config;
import com.example.demo.util.MqUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* MQ消费者
*/
public class Consumer02 {
public static void main(String[] args) throws Exception{
//初始化消息配置类
Channel channel = MqUtil.getChannel();
//声明 消息的接收
DeliverCallback deliverCallback = (consumerTag, message) ->{
//将消息转换成一个字符串对象(不然消息就是一个地址)
//沉睡1秒
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf(new String(message.getBody())+":接收到的消息 \n","UTF-8");
//下面手动应答
/**
* 1、消息的标记,标记我们需要应答的是那个消息
* 2、是否批量应答,我们默认改为false,如果改为true的话可能有的没有处理完就应答了造成了消息丢失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消息的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.printf("当消息被中断");
};
System.out.printf("C2消息接收中.........");
/**
* 1、消费那个队列(注意发送消息一定和这个接收名称一致)
* 2、消费成功以后是否要自动应答 true 自动应答 false表示手动应答
* 自动应答的话可以快速处理消息,但是消息如果代码出错了消息将会丢失,但是手动应答不一样,
* 手动应答必须要告诉MQ已经处理完了才会将消息删除掉,否则服务挂了消息将会被自动放回到消
* 息队列,如果服务重新启动或者其他可以处理这个消息的服务还可以重新处理消息
* 3、消费未成功消费的回调
* 4、消费者录取消费的回调
*/
channel.basicConsume("MQ333", false, deliverCallback, cancelCallback);
}
}
消息持久化:我们一般发送消息到队列里面的话消息是存在内存中的,如果出现宕机的话消息一样会丢失,所以我们将消息持久化以后就可以避免这个问题 ,下面代码第三个参数默认为null,现在我们加上 :
MessageProperties.PERSISTENT_TEXT_PLAIN
public static void main(String[] args) throws Exception { //初始化消息配置类 Channel channel = MqUtil.getChannel(); /** * 1、队列名称 * 2、队列里面的是否持久化(磁盘)。默认情况下消息存储在内存中 * 3、改列队只提供一个消费者消费,是否进行消息共享,true可以多个消费者消费 false只能一个消费者消费 * 4、是否自动删除、最后一个消费者断开链接以后 该队是否自动删除 true 自动删除 false 不自动删除 * 5、表示其他参数 */ channel.queueDeclare("MQ444", false, false, false,null); Scanner scanner = new Scanner(System.in); while (true){ String msg = scanner.next(); /** * 下面开始发消息 * 1、需要发生到那个交换机 * 2、路由的key值是那个 本次队列的名称 * 3、消息持久化 * 4、发生消息的消息体 */ channel.basicPublish("", "MQ444", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); System.out.printf("消息发生成功"+msg+"\n"); } }
上面就是队列持久化和消息持久化
不公平分发:
如果我们启动多个MQ消费者的时候他是自动负载均衡的,就是你一个我一个你一个我一个...........,当某个消费者处理的较块时就会出现大量的闲置时间,所以我们设置不公平分发,实现能者多劳,这个参数我们应该设置在消费者代码里面。basicQos默认是0,我们给他设置为1即可,如果有多个消费者的话我们都要设置,不然不会生效的。
public static void main(String[] args) throws Exception{
//初始化消息配置类
Channel channel = MqUtil.getChannel();
System.out.printf("C1消息接收中等待时间较短......... \n");
//声明 消息的接收
DeliverCallback deliverCallback = (consumerTag, message) ->{
//将消息转换成一个字符串对象(不然消息就是一个地址)
//沉睡1秒
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf(new String(message.getBody())+":接收到的消息 \n","UTF-8");
//下面手动应答
/**
* 1、消息的标记,标记我们需要应答的是那个消息
* 2、是否批量应答,我们默认改为false,如果改为true的话可能有的没有处理完就应答了造成了消息丢失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消息的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.printf("当消息被中断");
};
//设置不公平分发
channel.basicQos(1);
/**
* 1、消费那个队列
* 2、消费成功以后是否要自动应答 true 自动应答 false表示手动应答
* 3、消费未成功消费的回调
* 4、消费者录取消费的回调
*/
channel.basicConsume("MQ333", false, deliverCallback, cancelCallback);
}
预取值
什么是预取值,我们服务器有的服务器性能好,有的性能差,我们可以指定某一台服务器处理多少条消息basicQos默认是0,设置的越大处理的消息越多。
public static void main(String[] args) throws Exception{
//初始化消息配置类
Channel channel = MqUtil.getChannel();
System.out.printf("C1消息接收中等待时间较短......... \n");
//声明 消息的接收
DeliverCallback deliverCallback = (consumerTag, message) ->{
//将消息转换成一个字符串对象(不然消息就是一个地址)
//沉睡1秒
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf(new String(message.getBody())+":接收到的消息 \n","UTF-8");
//下面手动应答
/**
* 1、消息的标记,标记我们需要应答的是那个消息
* 2、是否批量应答,我们默认改为false,如果改为true的话可能有的没有处理完就应答了造成了消息丢失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消息的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.printf("当消息被中断");
};
//设置不公平分发
//channel.basicQos(1);
//预取值,如果开了两个线程,另一个线程设置的是5,这个设置的是2,我们发7条消息,那么这个线程只能处理2条,另一个线程可以处理7条
channel.basicQos(2);
/**
* 1、消费那个队列
* 2、消费成功以后是否要自动应答 true 自动应答 false表示手动应答
* 3、消费未成功消费的回调
* 4、消费者录取消费的回调
*/
channel.basicConsume("MQ333", false, deliverCallback, cancelCallback);
}
消息发布确认
我们发布消息以后我们不知道消息是否发布成功,所以有消息确认,这样能确保我们消息百分之发布成功,加上我们队列持久化和消息持久化,我们就能保证我们消息肯定不会丢失。
单个发布确认:发一条确认一条,同步确认方式,但是速度慢