RabbitMq如何确保消息不丢失
上篇写了掌握Rabbitmq几个重要概念,从一条消息说起,这篇来总结关于消息丢失让人头痛的事情。网络故障、服务器重启、硬盘损坏等都会导致消息的丢失。消息从生产到消费主要结果以下几个阶段如下图。
①生产阶段,生产者创建消息,经过网络发送到rabbit服务器
②消息存储阶段,首先被发送到交换器然后经过路由算法,到达队列,等待被拉取消费
③消费阶段,消费者经过网络从rabbit服务器拉取消息进行消费
这三个阶段都有可能消息丢失,下面一一分析。
消息存储阶段
正常情况下,我们使用BasicPublish方法发送消息到交换器上然后路由到队列上面,消费者还没进行消费,此时服务器重启了(队列、交换器使用默认的创建方式),会发生什么?答案是:消息丢失。原因很简单:消息在内存中,没有刷盘,并且,他们默认是非持久化的,服务重启之后,它们需要重新创建,消息自然就丢失!
还好,Rabbit提供持久化的机制,队列、交换器创建的时候,durable属性设置为true,同时消息投递模式(delivery mode)设置为2,则消息标记成持久化。这样可以避免服务器重启消息丢失的情况。
发送阶段
由于发布操作不返回任何信息给生产者,那你怎么知道服务器是否已经持久化了持久消息到硬盘呢?服务器可能在把消息写入磁盘前就宕机了,消息因此而丢失!
有。)
Rabbit提供两中解决方案,事务,但是性能会大打折扣,而且会使生产者应用程序产生同步。生产环境一般不会采用;另外一种方案是确认模式。也很简单,消息路由给所有匹配的订阅队列中,之后会异步的告之生产者。使用channel.ConfirmSelect()方法,使信道开启确认模式。然后注入两个回调函数,ack和nack事件。
channel.BasicAcks += (sender, ev) =>
{
Console.WriteLine("消息已经确认收到" + ev.DeliveryTag);
};
channel.BasicNacks += (sender, ev) =>
{
Console.WriteLine("消息未确认" + ev.DeliveryTag);
};
消费阶段
你可能会问,消费端消息怎么会丢失呢?Rabbitmq提供自动和手动确认消息,然后消息从队列中移除。如果autoAck为true,自动确认模式,服务器就会在消息发给消费端后自动将其出队。如果因为某些原因连接中断了,或者你的消费端应用发生了故障,那么消息就会丢失!
通过把AutoAck设置为false,手工确认,告知服务器,消息已经处理了,可以进行消息出队删除。
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
consumer.Received += (model, ea) =>
{
//dosometing
channel.BasicAck(ea.DeliveryTag, false);//确认
};
小结:如果做了以上的处理,那么消息就不会跟你躲猫猫了。这里有性能的问题,消息持久化,是要刷到磁盘上的会影响投递速度,并且消息确认也会影响到消息投递速度。不基本上能够满足需求了。如果不能满足性能需求,可以使用其他方法,比如 在每次发送消息的时候,都包含应答队列的名称,这样消费者就可以回发应答以确认接受到了。如果消息应答未在合理时间范围内到达,生产者就重新发送消息。
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- ThinkPHP5&5.1实现验证码的生成、使用及点击刷新功能示例
- php反序列化长度变化尾部字符串逃逸(0CTF-2016-piapiapia)
- 对python 命令的-u参数详解
- ThinkPHP5.1+Ajax实现的无刷新分页功能示例
- Python推导式简单示例【列表推导式、字典推导式与集合推导式】
- Python 从相对路径下import的方法
- Python随机生成身份证号码及校验功能
- 对python的bytes类型数据split分割切片方法
- PHP For循环字母A-Z当超过26个字母时输出AA,AB,AC
- python 实现数字字符串左侧补零的方法
- tp5.0框架隐藏index.php入口文件及模块和控制器的方法分析
- laravel框架使用FormRequest进行表单验证,验证异常返回JSON操作示例
- 使用Python实现微信提醒备忘录功能
- 对web.py设置favicon.ico的方法详解
- PHP pthreads v3下同步处理synchronized用法示例