.NETCore基于RabbitMQ实现延时队列的两方法(.netcore razor)学会了吗

随心笔谈12个月前发布 admin
111 0



目录前言实现延时队列的两种方式利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key.NETCore实现方式rabbitmq通过安装插件的形式实现(推荐).NET Core 实现第一种方式的缺陷以及解决方案

此文章用来记录自己学习延时队列过程的文章,并用.NET这两种方式实现了简单的Demo。

延时队列的应用场景 应用下单后,30分钟没有支付的话,则自动取消订单活动开始前30分钟,提醒参赛者参加活动。活动结束后,30分钟后提醒未进行评价的参赛人员进行评价…

上述的场景都可以使用延时队列进行对应的处理。

上面的场景虽说可以通过定时器也可以处理,但有点浪费资源, 而上述的场景时间是不定的,例如有两个活动需要提醒参赛者参加,一个是7点开始 ,另一个是8点开始,那么触发处理的一个是6点半,一个是7点半。

使用Rabbitmq实现延时队列可以让消息持久化,也支持分布式

缺点第一种第一种方式的缺陷以及解决方案第二种这个插件的当前设计并不真正适合具有大量延迟消息(例如成百上千或数百万)的场景。详情信息

实现需要创建两对交换机和队列,其中需要对其中一对的队列进行设置x-dead-letter-exchange和x-dead-letter-routing-key属性,属性指定转发到另一对的交换机,

随后实现流程图如下:

项目:.NET Core 控制台项目

install-package RabbitMQ.Client

生产者代码:

ConnectionFactory connectionFactory=new ConnectionFactory
{
UserName=”guest”,
Password=”guest”,
HostName=”127.0.0.1″
};

//创建连接
var connection=connectionFactory.CreateConnection();

//创建通道
var channl=connection.CreateModel();

//指定队列的x-dead-letter-exchange和x-dead-letter-routing-key
Dictionary<string, object> queueArgs=new Dictionary<string, object>()
{
{ “x-dead-letter-exchange”,”exchange.business.test” },

消费者

ConnectionFactory connectionFactory=new ConnectionFactory
{
UserName=”guest”,
Password=”guest”,
HostName=”127.0.0.1″
};

//创建连接
var connection=connectionFactory.CreateConnection();

var channel=connection.CreateModel();

EventingBasicConsumer consumer=new EventingBasicConsumer(channel);

//给消费时添加一个委托
consumer.Received +=(obj, ea)=>
{
var message=Encoding.UTF8.GetString(ea.Body.ToArray());
//打印消费的消息
Console.WriteLine(message);
channel.BasicAck(ea.DeliveryTag, false);
};

//消费queue.business.test队列的消息
channel.BasicConsume(“queue.business.test”, false, consumer);

Console.ReadKey();
channel.Dispose();
connection.Close();

实现效果:

使用 插件提供的类型的交换机

下载插件的地址:https://www.rabbitmq.com/community-plugins.html

选中rabbitmq_delayed_message_exchange插件

该插件使用只需要声明交换机的时候,指定类型,然后添加参数即可

生产者

ConnectionFactory connectionFactory=new ConnectionFactory()
{
UserName=”guest”,
Password=”guest”,
HostName=”127.0.0.1″
};

var connection=connectionFactory.CreateConnection();

var channel=connection.CreateModel();

Dictionary<string, object> exchangeArgs=new Dictionary<string, object>()
{

消费者:

ConnectionFactory connectionFactory=new ConnectionFactory
{
UserName=”guest”,
Password=”guest”,
HostName=”127.0.0.1″
};

//创建连接
var connection=connectionFactory.CreateConnection();

var channel=connection.CreateModel();

EventingBasicConsumer consumer=new EventingBasicConsumer(channel);

consumer.Received +=(obj, ea)=>
{
var message=Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(message);
channel.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(“plug.delay.queue”, false, consumer);

Console.ReadKey();
channel.Dispose();
connection.Close();

实现效果:

如果存在A、B消息进入了队列中,A在前,B在后,如果B消息的过期时间比A的过期时间要早,消费的时候,并不会先消费B,再消费A,而是B会等A先消费,即使A要晚过期

举例

生产者代码修改成如下:

ConnectionFactory connectionFactory=new ConnectionFactory
{
UserName=”guest”,
Password=”guest”,
HostName=”127.0.0.1″
};

//创建连接
var connection=connectionFactory.CreateConnection();

//创建通道
var channl=connection.CreateModel();

Dictionary<string, object> queueArgs=new Dictionary<string, object>()
{
{ “x-dead-letter-exchange”,”exchange.business.test” },
;

//延时的交换机和队列绑定
channl.ExchangeDeclare(“exchange.business.dlx”, “direct”, true, false, null);
channl.QueueDeclare(“queue.business.dlx”, true, false, false, queueArgs);
channl.QueueBind(“queue.business.dlx”, “exchange.business.dlx”, “”);

//业务的交换机和队列绑定
channl.ExchangeDeclare(“exchange.business.test”, “direct”, true, false, null);
channl.QueueDeclare(“queue.business.test”, true, false, false, null);
channl.QueueBind(“queue.business.test”, “exchange.business.test”, “businessRoutingkey”, null);

string message1=”Hello Word!1″;
string message2=”Hello Word!2″;
var body1=Encoding.UTF8.GetBytes(message1);
var body2=Encoding.UTF8.GetBytes(message2);
var properties=channl.CreateBasicProperties();
properties.Persistent=true;
//先发送过期时间5秒的消息
properties.Expiration=”5000″;
channl.BasicPublish(“exchange.business.dlx”, “”, properties, body2);

//再发送过期时间3秒的消息
properties.Expiration=”3000″;
channl.BasicPublish(“exchange.business.dlx”, “”, properties, body1);

结果:

这里先发了延时20秒的A消息,然后又发了延时10秒的B消息,但是最终结果并不是先消费了B消息,而是等A消息过期后,立刻再去消费B。

这个会影响什么业务呢?好比两个C、D活动,C活动开始时间是7点,D活动开始时间是5点,那么D活动提醒需要等到C活动提醒后,才会立刻提醒,这明显不符合我们的业务需求。

解决方案 每个活动都是单独的创建自己的交换机和队列使用第二种实现方式,即使用插件的形式。

第一种不太现实,因为如果活动多的话,则会创建很多的队列,而且只会使用一次。

业务上还是推荐使用插件的实现方式。

第二种方式的效果

github地址:

https://github.com/MDZZ3/RabbitmqDelay

到此这篇关于.NETCore基于RabbitMQ实现延时队列的两方法的文章就介绍到这了,更多相关.NETCore RabbitMQ 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:运用.net core中实例讲解RabbitMQ高可用集群构建运用.NetCore实例讲解RabbitMQ死信队列,延时队列运用.net core中实例讲解RabbitMQ如何用.NETCore操作RabbitMQ.Net?Core和RabbitMQ限制循环消费的方法

© 版权声明

相关文章