发布时间:2022-10-13 16:00
第一部分:延迟消息的实现原理和知识点
使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
当上面的消息扔到队列中后,过了3分钟,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。
Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2. 上面的消息的TTL到了,消息过期了。
3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。
生产者输出消息到Queue1,并且这个消息是设置有有效时间的,比如3分钟。消息会在Queue1中等待3分钟,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。
完成延迟任务的实现。
第二部分:具体实现例子
1、新建立消息队列配置文件rabbitmq.properties
1 #rabbitmq消息队列的属性配置文件properties
2 rabbitmq.study.host=192.168.56.101
3 rabbitmq.study.username=duanml
4 rabbitmq.study.password=1qaz@WSX
5 rabbitmq.study.port=5672
6 rabbitmq.study.vhost=studymq
7
8 #Mail 消息队列的相关变量值
9 mail.exchange=mailExchange
10 mail.exchange.key=mail_queue_key
11
12
13 #Phone 消息队列的相关变量值
14 phone.topic.key=phone.one
15 phone.topic.key.more=phone.one.more
16
17 #delay 延迟消息队列的相关变量值
18 delay.directQueue.key=TradePayNotify_delay_2m
19 delay.directMessage.key=TradePayNotify_delay_3m
2、新建立配置文件,申明延迟队列相关的配置信息如:spring-rabbigmq-dlx.xml
1
2
6
7
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
46
47
48
49
50
51
52
58
60
61
62
63
64
65
66
67
73
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
3、新建立延迟队列测试Controller
1 package org.seckill.web;
2
3 import org.seckill.dto.SeckillResult;
4 import org.seckill.entity.Seckill;
5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl;
6 import org.seckill.utils.rabbitmq.MQProducer;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.amqp.core.Message;
10 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.beans.factory.annotation.Value;
12 import org.springframework.stereotype.Controller;
13 import org.springframework.web.bind.annotation.RequestMapping;
14 import org.springframework.web.bind.annotation.ResponseBody;
15
16 import java.util.Date;
17
18 /**
19 * Title: org.seckill.web
20 * Company:东软集团(neusoft)
21 * Copyright:Copyright(c)2018
22 * User: 段美林
23 * Date: 2018/5/30 17:33
24 * Description: 消息队列测试
25 */
26 @Controller
27 @RequestMapping("/rabbitmq")
28 public class RabbitmqController {
29
30 private final Logger logger = LoggerFactory.getLogger(this.getClass());
31 40
41 @Value("${delay.directQueue.key}")
42 private String delay_directQueue_key;
43
44 @Value("${delay.directMessage.key}")
45 private String delay_directMessage_key;
46 52
53 @Autowired
54 private MQProducerImpl delayMQProducerImpl;111
112 /**
113 * @Description: 消息队列
114 * @Author:
115 * @CreateTime:
116 */
117 @ResponseBody
118 @RequestMapping("/sendDelayQueue")
119 public SeckillResult testDelayQueue() {
120 SeckillResult result = null;
121 Date now = new Date();
122 try {
123 Seckill seckill = new Seckill();
124 //第一种情况,给队列设置消息ttl,详情见配置文件
125 for (int i = 0; i < 2; i++) {
126 seckill.setSeckillId(1922339387 + i);
127 seckill.setName("delay_queue_ttl_" + i);
128 String msgId = delayMQProducerImpl.getMsgId();
129 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
130 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);
131 }
132 //第二种情况,给消息设置ttl
133 for (int i = 0; i < 2; i++) {
134 seckill.setSeckillId(1922339287 + i);
135 seckill.setName("delay_message_ttl_" + i);
136 String msgId = delayMQProducerImpl.getMsgId();
137 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);
138 if (message != null) {
139 //给消息设置过期时间ttl,为3分钟
140 message.getMessageProperties().setExpiration("180000");
141 delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);
142 }
143 }
144 result = new SeckillResult(true, now.getTime());
145 } catch (Exception e) {
146 logger.error(e.getMessage(), e);
147 }
148 return result;
149 }
150
151 }
4、编写延迟消息确认类和监听类:
NotifyConfirmCallBackListener.java
1 package org.seckill.rabbitmqListener.notify;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
6 import org.springframework.amqp.rabbit.support.CorrelationData;
7
8 /**
9 * Title: org.seckill.rabbitmqListener.notify
10 * Company:东软集团(neusoft)
11 * Copyright:Copyright(c)2018
12 * User: 段美林
13 * Date: 2018/6/3 0:27
14 * Description: 延迟任务测试--->消息确认回调类
15 */
16 public class NotifyConfirmCallBackListener implements ConfirmCallback {
17
18 private final Logger logger = LoggerFactory.getLogger(this.getClass());
19
20 /**
21 * Confirmation callback.
22 *
23 * @param correlationData correlation data for the callback.
24 * @param ack true for ack, false for nack
25 * @param cause An optional cause, for nack, when available, otherwise null.
26 */
27 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
28 logger.info("延迟测试---确认消息完成-------->confirm--:correlationData:" + correlationData.getId() + ",ack:" + ack + ",cause:" + cause);
29 }
30 }
NotifyConsumerListener.java
1 package org.seckill.rabbitmqListener.notify;
2
3 import com.alibaba.fastjson.JSONObject;
4 import com.rabbitmq.client.Channel;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7 import org.springframework.amqp.core.Message;
8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
9
10 /**
11 * Title: org.seckill.rabbitmqListener.notify
12 * Company:东软集团(neusoft)
13 * Copyright:Copyright(c)2018
14 * User: 段美林
15 * Date: 2018/6/3 0:27
16 * Description: 订单通知队列监听服务
17 * 实现延迟任务的功能
18 */
19 public class NotifyConsumerListener implements ChannelAwareMessageListener {
20
21
22 private final Logger logger = LoggerFactory.getLogger(this.getClass());
23
24 /**
25 * Callback for processing a received Rabbit message.
26 * Implementors are supposed to process the given Message,
27 * typically sending reply messages through the given Session.
28 *
29 * @param message the received AMQP message (never null
)
30 * @param channel the underlying Rabbit Channel (never null
)
31 * @throws Exception Any.
32 */
33 public void onMessage(Message message, Channel channel) throws Exception {
34 try {
35 //将字节流对象转换成Java对象
36 // Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
37
38 String returnStr = new String(message.getBody(),"UTF-8");
39 JSONObject jsStr = JSONObject.parseObject(returnStr);
40
41 logger.info("延迟测试--消费开始:名称为--===>" + jsStr.getString("name") + "----->返回消息:" + returnStr + "||||消息的Properties:--》" + message.getMessageProperties());
42
43 //TODO 进行相关业务操作
44
45 //成功处理业务,那么返回消息确认机制,这个消息成功处理OK
46 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
47
48 } catch (Exception e) {
49 if (message.getMessageProperties().getRedelivered()) {
50 //消息已经进行过一次轮询操作,还是失败,将拒绝再次接收本消息
51 logger.info("消息已重复处理失败,拒绝再次接收...");
52 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
53
54 //TODO 进行相关业务操作
55
56 } else {
57 //消息第一次接收处理失败后,将再此回到队列中进行 再一次轮询操作
58 logger.info("消息即将再次返回队列处理...");
59 //处理失败,那么返回消息确认机制,这个消息没有成功处理,返回到队列中
60 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
61 }
62 }
63 }
64 }
NotifyFailedCallBackListener.java
1 package org.seckill.rabbitmqListener.notify;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.springframework.amqp.core.Message;
6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
7
8 /**
9 * Title: org.seckill.rabbitmqListener.notify
10 * Company:东软集团(neusoft)
11 * Copyright:Copyright(c)2018
12 * User: 段美林
13 * Date: 2018/6/3 0:28
14 * Description: 延迟任务测试----> 消息发送失败回调类
15 */
16 public class NotifyFailedCallBackListener implements ReturnCallback {
17
18 private final Logger logger = LoggerFactory.getLogger(this.getClass());
19
20 /**
21 * Returned message callback.
22 *
23 * @param message the returned message.
24 * @param replyCode the reply code.
25 * @param replyText the reply text.
26 * @param exchange the exchange.
27 * @param routingKey the routing key.
28 */
29 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
30 logger.info("延迟测试------------->return--message:" +
31 new String(message.getBody()) +
32 ",replyCode:" + replyCode + ",replyText:" + replyText +
33 ",exchange:" + exchange + ",routingKey:" + routingKey);
34 }
35 }
5、编写消息队列的操作类和接口:
MQProducer.java
1 package org.seckill.utils.rabbitmq;
2
3 import org.springframework.amqp.core.Message;
4 import org.springframework.amqp.core.MessagePostProcessor;
5 import org.springframework.amqp.rabbit.support.CorrelationData;
6
7 /**
8 * Title: org.seckill.utils.rabbitmq
9 * Company:东软集团(neusoft)
10 * Copyright:Copyright(c)2018
11 * User: 段美林
12 * Date: 2018/5/30 11:49
13 * Description: No Description
14 */
15 public interface MQProducer {
16
17 /**
18 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
19 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
20 * @param message
21 */
22 void sendDataToRabbitMQ(java.lang.Object message);
23
24 /**
25 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
26 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
27 * @param message
28 * @param messagePostProcessor
29 */
30 void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor);
31
32 /**
33 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
34 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
35 * @param message
36 * @param messagePostProcessor
37 * @param correlationData
38 */
39 void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
40
41 /**
42 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
43 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
44 * @param routingKey
45 * @param message
46 */
47 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message);
48
49 /**
50 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
51 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
52 * @param routingKey
53 * @param message
54 * @param correlationData
55 */
56 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
57
58 /**
59 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
60 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
61 * @param routingKey
62 * @param message
63 * @param messagePostProcessor
64 */
65 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
66
67 /**
68 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
69 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
70 * @param routingKey
71 * @param message
72 * @param messagePostProcessor
73 * @param correlationData
74 */
75 void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
76
77 /**
78 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
79 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
80 * @param exchange
81 * @param routingKey
82 * @param message
83 */
84 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message);
85
86 /**
87 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
88 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
89 * @param exchange
90 * @param routingKey
91 * @param message
92 * @param correlationData
93 */
94 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData);
95
96 /**
97 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
98 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
99 * @param exchange
100 * @param routingKey
101 * @param message
102 * @param messagePostProcessor
103 */
104 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);
105
106 /**
107 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
108 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
109 * @param exchange
110 * @param routingKey
111 * @param message
112 * @param messagePostProcessor
113 * @param correlationData
114 */
115 void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);
116
117 Message messageBuil(Object handleObject, String msgId);
118
119 String getMsgId();
120 }
MQProducerImpl.java
1 package org.seckill.utils.rabbitmq.Impl;
2
3 import com.alibaba.fastjson.JSONObject;
4 import org.seckill.utils.rabbitmq.MQProducer;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7 import org.springframework.amqp.AmqpException;
8 import org.springframework.amqp.core.Message;
9 import org.springframework.amqp.core.MessageBuilder;
10 import org.springframework.amqp.core.MessagePostProcessor;
11 import org.springframework.amqp.core.MessageProperties;
12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
13 import org.springframework.amqp.rabbit.support.CorrelationData;
14 import org.springframework.stereotype.Component;
15
16 import java.io.UnsupportedEncodingException;
17 import java.util.UUID;
18
19 /**
20 * Title: org.seckill.utils.rabbitmq.Impl
21 * Company:东软集团(neusoft)
22 * Copyright:Copyright(c)2018
23 * User: 段美林
24 * Date: 2018/6/2 22:54
25 * Description: 消息生产者操作主体类
26 */
27 @Component
28 public class MQProducerImpl implements MQProducer{
29
30 private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class);
31
32 private RabbitTemplate rabbitTemplate;
33
34 /**
35 * Sets the rabbitTemplate.
36 *
37 *
You can use getRabbitTemplate() to get the value of rabbitTemplate
38 *
39 * @param rabbitTemplate rabbitTemplate
40 */
41 public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
42 this.rabbitTemplate = rabbitTemplate;
43 }
44
45 /**
46 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
47 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
48 *
49 * @param message
50 */
51 public void sendDataToRabbitMQ(Object message) {
52 try {
53 if (message instanceof Message){
54 Message messageSend = (Message) message;
55 String msgId = messageSend.getMessageProperties().getCorrelationId();
56 CorrelationData correlationData = new CorrelationData(msgId);
57 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData);
58 }else {
59 rabbitTemplate.convertAndSend(message);
60 }
61 } catch (AmqpException e) {
62 logger.error(e.getMessage(), e);
63 }
64 }
65
66 /**
67 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
68 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
69 *
70 * @param message
71 * @param messagePostProcessor
72 */
73 public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) {
74 try {
75 if (message instanceof Message){
76 Message messageSend = (Message) message;
77 String msgId = messageSend.getMessageProperties().getCorrelationId();
78 CorrelationData correlationData = new CorrelationData(msgId);
79 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData);
80 }else {
81 rabbitTemplate.convertAndSend(message, messagePostProcessor);
82 }
83 } catch (AmqpException e) {
84 logger.error(e.getMessage(), e);
85 }
86 }
87
88 /**
89 * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.
90 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
91 *
92 * @param message
93 * @param messagePostProcessor
94 * @param correlationData
95 */
96 public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
97 try {
98 rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData);
99 } catch (AmqpException e) {
100 logger.error(e.getMessage(), e);
101 }
102 }
103
104 /**
105 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
106 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
107 *
108 * @param routingKey
109 * @param message
110 */
111 public void sendDataToRabbitMQ(String routingKey, Object message) {
112 try {
113 if (message instanceof Message){
114 Message messageSend = (Message) message;
115 String msgId = messageSend.getMessageProperties().getCorrelationId();
116 CorrelationData correlationData = new CorrelationData(msgId);
117 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
118 }else {
119 rabbitTemplate.convertAndSend(routingKey, message);
120 }
121 } catch (AmqpException e) {
122 logger.error(e.getMessage(), e);
123 }
124 }
125
126 /**
127 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
128 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
129 *
130 * @param routingKey
131 * @param message
132 * @param correlationData
133 */
134 public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {
135 try {
136 rabbitTemplate.convertAndSend(routingKey, message, correlationData);
137 } catch (AmqpException e) {
138 logger.error(e.getMessage(), e);
139 }
140 }
141
142 /**
143 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
144 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
145 *
146 * @param routingKey
147 * @param message
148 * @param messagePostProcessor
149 */
150 public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
151 try {
152 if (message instanceof Message){
153 Message messageSend = (Message) message;
154 String msgId = messageSend.getMessageProperties().getCorrelationId();
155 CorrelationData correlationData = new CorrelationData(msgId);
156 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);
157 }else {
158 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);
159 }
160 } catch (AmqpException e) {
161 logger.error(e.getMessage(), e);
162 }
163 }
164
165 /**
166 * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
167 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
168 *
169 * @param routingKey
170 * @param message
171 * @param messagePostProcessor
172 * @param correlationData
173 */
174 public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
175 try {
176 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);
177 } catch (AmqpException e) {
178 logger.error(e.getMessage(), e);
179 }
180 }
181
182 /**
183 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
184 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
185 *
186 * @param exchange
187 * @param routingKey
188 * @param message
189 */
190 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {
191 try {
192 if (message instanceof Message){
193 Message messageSend = (Message) message;
194 String msgId = messageSend.getMessageProperties().getCorrelationId();
195 CorrelationData correlationData = new CorrelationData(msgId);
196 rabbitTemplate.convertAndSend(routingKey,message,correlationData);
197 }else {
198 rabbitTemplate.convertAndSend(exchange, routingKey, message);
199 }
200 } catch (AmqpException e) {
201 logger.error(e.getMessage(), e);
202 }
203 }
204
205 /**
206 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
207 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
208 *
209 * @param exchange
210 * @param routingKey
211 * @param message
212 * @param correlationData
213 */
214 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {
215 try {
216 rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
217 } catch (AmqpException e) {
218 logger.error(e.getMessage(), e);
219 }
220 }
221
222 /**
223 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
224 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
225 *
226 * @param exchange
227 * @param routingKey
228 * @param message
229 * @param messagePostProcessor
230 */
231 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {
232 try {
233 rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
234 } catch (AmqpException e) {
235 logger.error(e.getMessage(), e);
236 }
237 }
238
239 /**
240 * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.
241 * 由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
242 *
243 * @param exchange
244 * @param routingKey
245 * @param message
246 * @param messagePostProcessor
247 * @param correlationData
248 */
249 public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {
250 try {
251 rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
252 } catch (AmqpException e) {
253 logger.error(e.getMessage(), e);
254 }
255 }
256
257 /**
258 * 构建Message对象,进行消息发送
259 * @param handleObject
260 * @param msgId
261 * @return
262 */
263 public Message messageBuil(Object handleObject, String msgId) {
264 try {
265 //先转成JSON
266 String objectJSON = JSONObject.toJSONString(handleObject);
267 //再构建Message对象
268 Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
269 .setCorrelationId(msgId).build();
270 return messageBuil;
271 } catch (UnsupportedEncodingException e) {
272 logger.error("构建Message出错:" + e.getMessage(),e);
273 return null;
274 }
275 }
276
277 /**
278 * 生成唯一的消息操作id
279 * @return
280 */
281 public String getMsgId() {
282 return UUID.randomUUID().toString();
283 }
284
285 }
至此就完成了延迟消息队列的所有代码实现,