运用.NetCore实例讲解RabbitMQ死信队列,延时队列

 更新时间:2021-09-22 21:45:25   作者:佚名   我要评论(0)

目录一、死信队列二、延时队列三、延时消息设置不同过期时间四、延时消息用延时插件的方式实现一、死信队列

描述:Q1队列绑定了x-dead-lett

一、死信队列

描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2)

       P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息。

特定情况有哪些呢:

  • 1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);
  • 2.当前队列中的消息数量已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数量)。
  • 3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;

这里演示情况1:

假如场景:Q1中队列数据不完整,就算从新处理也会报错,那就可以不ack,把这个消息转到死信队列另外处理。

生产者

public static void SendMessage()
        {
            //死信交换机
            string dlxexChange = "dlx.exchange";
            //死信队列
            string dlxQueueName = "dlx.queue";

            //消息交换机
            string exchange = "direct-exchange";
            //消息队列
            string queueName = "queue_a";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {

                    //创建死信交换机
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建死信队列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信队列绑定死信交换机
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 创建消息交换机
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建消息队列,并指定死信队列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                         });
                    //消息队列绑定消息交换机
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //发布消息
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"向队列:{queueName}发送消息:{message}");
                }
            }
        }

消费者

public static void Consumer()
        {
            //死信交换机
            string dlxexChange = "dlx.exchange";
            //死信队列
            string dlxQueueName = "dlx.queue";

            //消息交换机
            string exchange = "direct-exchange";
            //消息队列
            string queueName = "queue_a";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {

                    //创建死信交换机
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建死信队列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信队列绑定死信交换机
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 创建消息交换机
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建消息队列,并指定死信队列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX
                                             { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                         });
                    //消息队列绑定消息交换机
                    channel.QueueBind(queueName, exchange, routingKey: queueName);


                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认");
                        //channel.BasicAck(ea.DeliveryTag, false);
                        //不ack(BasicNack),且不把消息放回队列(requeue:false)
                        channel.BasicNack(ea.DeliveryTag, false, requeue: false);
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

消费者加上channel.BasickNack()模拟消息处理不了,不ack确认。

执行结果:

RabbitMQ管理界面:

看到消息队列为queue_a,特性有DLX(死信交换机),DLK(死信路由)。因为消费端不nack,触发了死信,被转发到了死信队列dlx.queue。

二、延时队列

延时队列其实也是配合死信队列一起用,其实就是上面死信队列的第二中情况。给队列添加消息过时时间(TTL),变成延时队列。

简单的描述就是:P(生产者)发送消息到Q1(延时队列),Q1的消息有过期时间,比如10s,那10s后消息过期就会触发死信,从而把消息转发到Q2(死信队列)。

解决问题场景:像商城下单,未支付时取消订单场景。下单时写一条记录入Q1,延时30分钟后转到Q2,消费Q2,检查订单,支付则不做操作,没支付则取消订单,恢复库存。

生产者代码:

public static void SendMessage()
        {
            //死信交换机
            string dlxexChange = "dlx.exchange";
            //死信队列
            string dlxQueueName = "dlx.queue";

            //消息交换机
            string exchange = "direct-exchange";
            //消息队列
            string queueName = "delay_queue";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //创建死信交换机
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建死信队列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信队列绑定死信交换机
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 创建消息交换机
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                             { "x-message-ttl",10000} //设置队列的消息过期时间
                                        });
                    //消息队列绑定消息交换机
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //发布消息
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message}");
                }
            }
        }

消费者代码:

public static void Consumer()
        {
            //死信交换机
            string dlxexChange = "dlx.exchange";
            //死信队列
            string dlxQueueName = "dlx.queue";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    //创建死信交换机
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建死信队列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信队列绑定死信交换机
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{DateTime.Now},队列{dlxQueueName}消费消息:{message}");
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
                }
            }
        }

执行代码:

向延时队列发送消息,监听死信队列,发送和收到消息时间刚好是设置的10s。

RabbitMQ管理界面:

三、延时消息设置不同过期时间

上面的延时队列能解决消息过期时间都是相同的场景,能不能解决消息的过期时间是不一样的呢?

例如场景:机器人客服,为了更像人为操作,收到消息后要随机3-10秒回复客户。

  • 1)队列不设置TTL(消息过期时间),把过期时间设置在消息上。

生产者代码:

public static void SendMessage()
        {
            //死信交换机
            string dlxexChange = "dlx.exchange";
            //死信队列
            string dlxQueueName = "dlx.queue";

            //消息交换机
            string exchange = "direct-exchange";
            //消息队列
            string queueName = "delay_queue";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //创建死信交换机
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建死信队列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信队列绑定死信交换机
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 创建消息交换机
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                             //{ "x-message-ttl",10000} //设置队列的消息过期时间
                                        });
                    //消息队列绑定消息交换机
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message 10s后处理";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Expiration = "10000";//消息的有效期10s

                    //发布消息,延时10s
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message},延时:10s");



                    string message2 = "hello rabbitmq message 5s后处理";
                    var properties2 = channel.CreateBasicProperties();
                    properties2.Persistent = true;
                    properties2.Expiration = "5000";//消息有效期5s

                    //发布消息,延时5s
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties2,
                                         body: Encoding.UTF8.GetBytes(message2));
                    Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message2},延时:5s");


                }
            }
        }

消费者代码还是上面延时队列的不变,先试下效果。

生产者向队列中发送一条延时10s的消息再发一条延时5秒的消息,但消费者却先拿到延时10s的,再拿到延时5秒的,我想要的结果是先拿到延时5s的再拿到延时10s的,是什么原因呢。

原因是:队列是先进先出的,而RabbitMQ只会对首位第一条消息做检测,第一条没过期,那么后面的消息就会阻塞住等待前面的过期。

解决办法:增加一个消费者对延时队列消费,不ack,把第一条消息放到队列尾部。一直让消息在流动,这样就能检测到了。

  • 2)新增消费者代码
public static void SendMessage()
        {
            //死信交换机
            string dlxexChange = "dlx.exchange";
            //死信队列
            string dlxQueueName = "dlx.queue";

            //消息交换机
            string exchange = "direct-exchange";
            //消息队列
            string queueName = "delay_queue";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //创建死信交换机
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建死信队列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信队列绑定死信交换机
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 创建消息交换机
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列
                                             //{ "x-message-ttl",10000} //设置队列的消息过期时间
                                        });
                    //消息队列绑定消息交换机
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message 10s后处理";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Expiration = "10000";//消息的有效期10s

                    //发布消息,延时10s
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message},延时:10s");



                    string message2 = "hello rabbitmq message 5s后处理";
                    var properties2 = channel.CreateBasicProperties();
                    properties2.Persistent = true;
                    properties2.Expiration = "5000";//消息有效期5s

                    //发布消息,延时5s
                    channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties2,
                                         body: Encoding.UTF8.GetBytes(message2));
                    Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message2},延时:5s");


                }
            }
        }

执行效果:

这会得到了想要的效果。

RabbitMQ管理界面:

四、延时消息用延时插件的方式实现

相比上面第三的延时消息,这里的插件方式会显的更加简单,也推荐用这种。

因为这里只需要一个交换机和一个对队列,生产者向队列发送消息,会直接是延时才会到队列。

安装插件:

地址:https://www.rabbitmq.com/community-plugins.html

找到和自己RabbitMQ一样的版本,下载下来上传到Linux,或F12查看这个文件的地址,直接Linux上下载(这里用这种)

Linux下载插件:

#下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

已经下载到Linux上

#把文件复制到rabbitmq docker容器下的plugins文件夹
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

#进入rabbitmq docker容器
docker exec -it rabbitmq bash

#开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

做完上面这些在RabbitMQ管理界面可以看到多了一个延时消息的交换机。

插件装好了,生产者代码

public static void SendMessage()
        {
            //延时消息交换机
            string delayExchange = "delay.exchange";
            //延时消息队列
            string delayQueueName = "delay_queue";

            using (var connection = RabbitMQHelper.GetConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Dictionary<string, object> args = new Dictionary<string, object>();
                    args.Add("x-delayed-type", "direct"); //x-delayed-type必须加

                    //创建延时交换机,type类型为x-delayed-message
                    channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false,arguments: args);
                    //创建延时消息队列
                    channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false);
                    //交换机绑定队列
                    channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName);




                    string message = "hello rabbitmq message 10s后处理";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //延时时间从header赋值
                    Dictionary<string, object> headers = new Dictionary<string, object>();
                    headers.Add("x-delay", 10000);

                    properties.Headers = headers;
                   

                    //发布消息,按时10s
                    channel.BasicPublish(exchange: delayExchange,
                                         routingKey: delayQueueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now},向队列:{delayQueueName}发送消息:{message},延时:10s");

                   

                    string message2 = "hello rabbitmq message 5s后处理";
                    var properties2 = channel.CreateBasicProperties();
                    properties2.Persistent = true;
                    //延时时间从header赋值
                    Dictionary<string, object> headers2 = new Dictionary<string, object>();
                    headers2.Add("x-delay", 5000);
                    properties2.Headers = headers2;

                    //发布消息,延时5s
                    channel.BasicPublish(exchange: delayExchange,
                                         routingKey: delayQueueName,
                                         basicProperties: properties2,
                                         body: Encoding.UTF8.GetBytes(message2));
                    Console.WriteLine($"{DateTime.Now},向队列:{delayQueueName}发送消息:{message2},延时:5s");
                }
            }
        }

消费者代码

public static void DelayMessageConsumer()
        {
            //延时队列
            string queueName = "delay_queue";
            var connection = RabbitMQHelper.GetConnection();
            {
                //创建信道
                var channel = connection.CreateModel();
                {
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //处理业务
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{DateTime.Now},接收到消息:{message}");
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

执行代码:

RabbitMQ管理界面,只有一个队列:

到此这篇关于运用.NetCore实例讲解RabbitMQ死信队列,延时队列的文章就介绍到这了,更多相关.NetCore RabbitMQ死信队列,延时队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
  • 运用.net core中实例讲解RabbitMQ高可用集群构建
  • 运用.net core中实例讲解RabbitMQ
  • .Net RabbitMQ实现HTTP API接口调用
  • 如何用.NETCore操作RabbitMQ
  • .Net使用RabbitMQ即时发消息Demo
  • RabbitMQ .NET消息队列使用详解
  • .net平台的rabbitmq使用封装demo详解

相关文章

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录一、死信队列二、延时队列三、延时消息设置不同过期时间四、延时消息用延时插件的方式实现一、死信队列 描述:Q1队列绑定了x-dead-lett
    2021-09-22
  • Quartz.NET的具体使用

    Quartz.NET的具体使用

    目录一、什么是Quartz.NET?二、Quartz.NET可以做什么?三、ASP.NET Core如何使用Quartz.NET?四、Quartz的cron表达式一、什么是Quartz.NET?
    2021-09-22
  • asp.net core3.1cookie和jwt混合认证授权实现多种身份验证方案

    asp.net core3.1cookie和jwt混合认证授权实现多种身份验证方案

    目录认证授权 身份认证 授权 默认授权选择授权总结 开发了一个公司内部系统,使用asp.net core 3.1。在开发用户认证授权使用的是简单的c
    2021-09-22
  • 详解.NET数据库连接池

    详解.NET数据库连接池

    目录前置知识背景1. .NET数据库连接池的背景2. .NET 数据库连接池的表现3. .NET是如何形成数据库连接池的&#63;4. 连接池中的连接什么时候被移
    2021-09-22
  • .Net Core 之AutoFac的使用

    .Net Core 之AutoFac的使用

    目录Autofac介绍组件的三种注册方式生命周期AutoFac 在asp .net core中的使用本文不介绍IoC和DI的概念,如果你对Ioc之前没有了解的话,建议先
    2021-09-22
  • Asp.net Core 如何设置黑白名单(路由限制)

    Asp.net Core 如何设置黑白名单(路由限制)

    在原有的AspnetMvc中我们会使用到路由访问限制,在AppStart/RouteConfig.cs中写上如下: routes.IgnoreRoute("{resource}.axd/{*pathInfo}
    2021-09-22
  • C#中efcore-ShardingCore呈现“完美”分表

    C#中efcore-ShardingCore呈现“完美”分表

    目录efcore支持情况 数据库支持情况如何开始使用 自定义分表键,自定义分表规则 默认路由 动态添加分表信息 支持select,join,group by等
    2021-09-22
  • .NET Core对象池的应用:编程篇

    .NET Core对象池的应用:编程篇

    目录一、对象的借与还二、依赖注入三、池化对象策略四、对象池的大小五、对象的释放借助于有效的自动化垃圾回收机制,.NET让开发人员不在关心
    2021-09-22
  • .NET Core对象池的应用:设计篇

    .NET Core对象池的应用:设计篇

    目录一、 IPooledObjectPolicy<T>二、ObjectPool<T>DefaultObjectPool<T>DisposableObjectPool<T>三、ObjectPoolProvider《编程篇》已经涉及
    2021-09-22
  • 运用.net core中实例讲解RabbitMQ高可用集群构建

    运用.net core中实例讲解RabbitMQ高可用集群构建

    目录一、集群架构简介二、普通集群搭建2.1 各个节点分别安装RabbitMQ2.2 把节点加入集群2.3 代码演示普通集群的问题三、镜像集群四、HAProxy
    2021-09-22

最新评论