目前常见的应用软件都有消息延迟推送的影子,而且应用广泛,比如:
在上述两种场景下,如果我们使用以下两种传统解决方案,系统的整体性能和吞吐量无疑会大大降低:
在RabbitMQ 3.6.x之前,我们通常用死信队列+TTL过期时间来实现延迟队列,这里就不做过多介绍了。
在RabbitMQ 3.6.x开始的时候,RabbitMQ官方提供了一个延迟队列的插件,可以下载放置在RabbitMQ根目录下的plugins下。
画
首先,我们创建交换机和消息队列,application.properties中的配置与前一篇文章中的相同。
导入org . spring framework . amqp . core . *; 导入org . spring framework . context . annotation . bean; import org . spring framework . context . annotation . configuration; 导入Java . util . hashmap; 导入Java . util . map; @ Configuration public class MQ config { public static final String LAZY _ EXCHANGE = & # 34;《出埃及记》LazyExchange & # 34; public static final String LAZY _ QUEUE = & # 34;MQ。懒人队列& # 34;; public static final String LAZY _ KEY = & # 34;懒惰。#"; @ Bean public topic exchange lazy exchange(){ //Map & lt;字符串,对象& gtpros = new HashMap & lt& gt(); //设置开关支持延迟消息推送 /pros . put(& # 34;x延迟消息& # 34;, "话题& # 34;); topic EXCHANGE EXCHANGE = new topic EXCHANGE(LAZY _ EXCHANGE,true,false,pros); exchange . set delayed(true); 退货换货; } @Bean 公共队列lazyQueue(){ 返回新队列(LAZY_QUEUE,true); } @ Bean public Binding lazy Binding(){ return Binding builder . bind(lazy queue())。到(lazyExchange())。用(LAZY _ KEY); } } 我们可以设置交换。setdelayed (true)在交换的声明中打开延迟队列,或者我们可以把下面的内容设置到switch声明的方法中,因为第一个方法的底层就是这样实现的。
//地图& lt字符串,对象& gtpros = new HashMap & lt& gt(); //设置开关支持延迟消息推送 /pros . put(& # 34;x延迟消息& # 34;, "话题& # 34;); topic EXCHANGE EXCHANGE = new topic EXCHANGE(LAZY _ EXCHANGE,true,false,pros); 发送消息时,我们需要指定延迟推送的时间。这里,我们在发送消息的方法中传入参数new MessagePostProcessor()以获取消息对象,因为我们需要借助消息对象的api来设置延迟时间。
导入com . anqi . MQ . config . MQ config; import org . spring framework . amqp . amqp exception; 导入org . spring framework . amqp . core . message; import org . spring framework . amqp . core . messagedeliverymode; import org . spring framework . amqp . core . messagepostprocessor; 导入org . spring framework . amqp . rabbit . connection . correlation data; 导入org . spring framework . amqp . rabbit . core . rabbit template; import org . spring framework . beans . factory . annotation . auto wired; 导入org . spring framework . stereotype . component; 导入Java . util . date; @ Component public class MQ sender { @ Autowired private rabbit template rabbit template; //确认回调返回回调代码省略,请参考上一篇文章 Public void send lazy(object message){ rabbit template . set mandatory(true); rabbit template . setconfirmcallback(confirm callback); rabbit template . setreturncallback(return callback);[/h //id+全局唯一时间戳 关联数据关联数据=新的关联数据(& # 34;12345678909"+new Date()); //指定发送消息时的头延迟时间 rabbit template . convertandsend(MQ config . lazy _ exchange,& # 34;lazy.boot & # 34,message, new messagepostprocessor(){ @ override public message postprocessmessage(message message)throwsamqpexception { //设置消息持久性 message . getmessageproperties()。setDeliveryMode(MessageDeliveryMode。持久); //message . getmessageproperties()。set header(& # 34;x延迟& # 34;, "6000"); message . getmessageproperties()。set delay(6000); 返回消息; } },correlation data); } } 我们可以观察setDelay(整数I)的底层代码,也可以在头中设置x-delay。相当于手动设置表头。
message.getMessageProperties()。set header(& # 34;x延迟& # 34;, "6000"); /** *设置x延迟标题。 * @param delay延迟。 * @自1.6 */ public void set delay(整数延迟){ if(delay = = null | | delay & lt;0){ this . headers . remove(X _ DELAY); } else { this . headers . put(X _ DELAY,DELAY); } } 消费端消费。
导入com . rabbit MQ . client . channel; 导入org . spring framework . amqp . rabbit . annotation . *; 导入org . spring framework . amqp . support . amqp headers; 导入org . spring framework . stereotype . component; 导入Java . io . io exception; 导入Java . util . map; @ Component public class MQReceiver { @ rabbit listener(queues = & # 34;MQ。懒人队列& # 34;) @ rabbit handler public void onLazyMessage(消息msg,通道Channel)抛出io exception { long delivery tag = msg . getmessage properties()。getDeliveryTag(); channel . basic ack(delivery tag,true); system . out . println(& # 34;懒惰接收& # 34;+new String(msg . getbody()); } ` ` ` ` # #测试结果[#](https://www.cnblogs.com/haixiang/p/10966985.html # 3724485 导入org . JUnit . runner . run with; import org . spring framework . beans . factory . annotation . auto wired; 导入org . spring framework . boot . test . context . spring boot test; 导入org . spring framework . test . context . JUnit 4 . spring runner; @ spring boot test @ run with(spring runner . class) public class MQSenderTest { @ Autowired private MQSender MQSender; @ Test public void send lazy()抛出异常{ String msg = & # 34;hello spring boot & # 34; MQ sender . send lazy(msg+& # 34;:"); } } 果不其然,6秒后我收到了消息lazy receive hello spring boot:
最新评论