发布时间:2024-01-19 19:30
为什么使用消息队列?消息队列的优点和缺点?kafka、activemq、rabbitmq、rocketmq 都有什么优缺点?
面试官角度分析:
(1)你知不知道你们系统里为什么要用消息队列这个东西?
(2)既然用了消息队列这个东西,你知不知道用了有什么好处?
(3)既然你用了 MQ,那么当时为什么选用这一款 MQ?
面试官问这个问题的期望之一的回答是,你们公司有什么业务场景,这个业务场景有什么技术挑战,如果不用 MQ 可能会很麻烦,但是再用了之后带来了很多好处。
消息队列的常见使用场景有很多但是核心的有三个:解耦、异步、削峰
解耦
场景描述:A 系统发送个数据到 BCD 三个系统,接口调用发送,那如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?现在 A 系统又要发送第二种数据了呢?A 系统负责人崩溃中... 再来点崩溃的事儿,A 系统要时时刻刻考虑 BCDE 四个系统如果挂了怎么办?那我要不要重发?我要不要把消息存起来?头发都白了啊...
使用了 MQ 之后的解耦场景
面试技巧:你需要考虑下,你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,相互之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果 MQ 给他异步化解耦也是可以的,你就需要去考虑在你的项目里是不是可以运用这个 MQ 去进行系统解耦 。
异步
场景描述:系统 A 接受一个请求,需要在自己本地写库,还需要在系统 BCD 三个系统写库,自己本地写库需要 3ms。BCD 分别需要 300ms、450ms、200ms。最终总好时长:953ms,接近 1s。给用户的体验感觉一点也不好。
不用 MQ 的同步高延时请求场景
使用 MQ 异步化之后的接口性能优化
削峰
场景描述:每天 0 点到 11 点,系统 A 风平浪静,每秒并发请求数量就 100 个。结果每一一到 11 点到 1 点,每秒并发请求数量就会暴增大 1 万条 。但是系统最大的处理能力就只能每秒钟处理 1000 个请求。往期面试题汇总:001 期~150 期汇总
没有用 MQ 的时候高峰期系统被打死的场景
使用 MQ 来进行削峰的场景
优点:特殊场景下解耦、异步、削峰。
缺点:
系统可用性降低:系统引入的外部依赖越多,越容易挂掉,本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的没什么问题,你偏加个 MQ 进来,万一 MQ 挂了怎么办,整套系统崩溃了,就完蛋了
系统复杂性提高:硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
一致性问题:系统 A 处理完了直接返回成功了,人家都认为你这个请求成功了;但问题是,要是 BCD 三个系统哪里 BD 系统成功了,结果 C 系统写库失败了,咋整?数据就不一致了,
所以消息队列是一种非常复杂的架构,引入它有很多好处,但是也得针对他带来的坏处做各种额外的技术方案和架构来规避掉。做好之后你会发现系统复杂度提升了一个数量积,但是关键时刻,用,还是要用的。
特性 |
ActiveMQ |
RabbitMQ |
RocketMQ |
Kafka |
单机吞吐量 |
万级,吞吐量比 RocketMQ 和 Kafka 要低了一个数量级 |
万级,吞吐量比 RocketMQ 和 Kafka 要低了一个数量级 |
10 万级,RocketMQ 也是可以支撑高吞吐的一种 MQ |
10 万级别,这是 kafka 最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 |
topic 可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic |
topic 从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka 尽量保证 topic 数量不要过多。如果要支撑大规模 topic,需要增加更多的机器资源 |
||
时效性 |
ms 级 |
微秒级,这是 rabbitmq 的一大特点,延迟是最低的 |
ms 级 |
延迟在 ms 级以内 |
可用性 |
高,基于主从架构实现高可用性 |
高,基于主从架构实现高可用性 |
非常高,分布式架构 |
非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 |
有较低的概率丢失数据 |
经过参数优化配置,可以做到 0 丢失 |
经过参数优化配置,消息可以做到 0 丢失 |
|
功能支持 |
MQ 领域的功能极其完备 |
基于 erlang 开发,所以并发能力很强,性能极其好,延时很低 |
MQ 功能较为完善,还是分布式的,扩展性好 |
功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
优劣势总结 |
非常成熟,功能强大,在业内大量的公司以及项目中都有应用 偶尔会有较低概率丢失消息 而且现在社区以及国内应用都越来越少,官方社区现在对 ActiveMQ 5.x 维护越来越少,几个月才发布一个版本 而且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用 |
erlang 语言开发,性能极其好,延时很低; 吞吐量到万级,MQ 功能比较完备 而且开源提供的管理界面非常棒,用起来很好用 社区相对比较活跃,几乎每个月都发布几个版本分 在国内一些互联网公司近几年用 rabbitmq 也比较多一些 但是问题也是显而易见的,RabbitMQ 确实吞吐量会低一些,这是因为他做的实现机制比较重。 而且 erlang 开发,国内有几个公司有实力做 erlang 源码级别的研究和定制?如果说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复 bug。 而且 rabbitmq 集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是 erlang 语言本身带来的问题。很难读源码,很难定制和掌控。 |
接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障 日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是 ok 的,还可以支撑大规模的 topic 数量,支持复杂 MQ 业务场景 而且一个很大的优势在于,阿里出品都是 java 系的,我们可以自己阅读源码,定制自己公司的 MQ,可以掌控 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码 还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用 RocketMQ 挺好的 |
kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展 同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量 而且 kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略 这个特性天然适合大数据实时计算以及日志收集 |
(1)RabbitMQ 的高可用性
RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的,我们就以他为例子讲解第一种 MQ 的高可用性怎么实现。
rabbitmq 有三种模式:单机模式,普通集群模式,镜像集群模式
(1.1) 单机模式
就是 demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式
(1.2)普通集群模式
意思就是在多台机器上启动多个 rabbitmq 实例,每个机器启动一个。但是你创建的 queue,只会放在一个 rabbtimq 实例上,但是每个实例都同步 queue 的元数据。完了你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。
这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 rabbitmq 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。
所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性可言了,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
(1.3)镜像集群模式
这种模式,才是所谓的 rabbitmq 的高可用模式,跟普通集群模式不一样的是,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,然后每次你写消息到 queue 的时候,都会自动把消息到多个实例的 queue 里进行消息同步。
这样的话,好处在于,你任何一个机器宕机了,没事儿,别的机器都可以用。坏处在于,第一,这个性能开销也太大了吧,消息同步所有机器,导致网络带宽压力和消耗很重!第二,这么玩儿,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue
那么怎么开启这个镜像集群模式呢?我这里简单说一下,避免面试人家问你你不知道,其实很简单 rabbitmq 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后你再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
(2)kafka 的高可用性
kafka 一个最基本的架构认识:多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。
这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
实际上 rabbitmq 之类的,并不是分布式消息队列,他就是传统的消息队列,只不过提供了一些集群、HA 的机制而已,因为无论怎么玩儿,rabbitmq 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。
kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。
kafka 0.8 以后,提供了 HA 机制,就是 replica 副本机制。每个 partition 的数据都会同步到吉他机器上,形成自己的多个 replica 副本。然后所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。kafka 会均匀的将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。
这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker 上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。
写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)
消费的时候,只会从 leader 去读,但是只有一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
实际上这块机制,讲深了,是可以非常之深入的,但是我还是回到我们这个课程的主题和定位,聚焦面试,至少你听到这里大致明白了 kafka 是如何保证高可用机制的了,对吧?不至于一无所知,现场还能给面试官画画图。要遇上面试官确实是 kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。
但是大家一定要明白,这个事情是要权衡的,你现在是要快速突击常见面试题体系,而不是要深入学习 kafka,要深入学习 kafka,你是没那么多时间的。你只能确保,你之前也许压根儿不知道这块,但是现在你知道了,面试被问到,你大概可以说一说。然后很多其他的候选人,也许还不如你,没看过这个,被问到了压根儿答不出来,相比之下,你还能说点出来,大概就是这个意思了。
其实这个很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是 MQ 领域的基本问题,其实本质上还是问你使用消息队列如何保证幂等性,这个是你架构里要考虑的一个问题。
首先就是比如 rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是 mq 自己保证的,是给你保证的。然后我们挑一个 kafka 来举个例子,说说怎么重复消费吧。
kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表他的序号,然后 consumer 消费了数据之后,每隔一段时间,会把自己消费过的消息的 offset 提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性
幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
那所以第二个问题来了,怎么保证消息队列消费的幂等性?
其实还是得结合业务来思考,我这里给几个思路:
(1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧
(2)比如你是写 redis,那没问题了,反正每次都是 set,天然幂等性
(3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为 kafka 消费者还没来得及提交 offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据
如何保证 MQ 的消费是幂等性的,需要结合具体的业务来看
如何保证消息的幂等性
这个是肯定的,用 mq 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是刚才说的重复消费和幂等性问题。不能少,就是说这数据别搞丢了。那这个问题你必须得考虑一下。
如果说你这个是用 mq 来传递非常核心的消息,比如说计费,扣费的一些消息,因为我以前设计和研发过一个公司非常核心的广告平台,计费系统,计费系统是很重的一个业务,操作是很耗时的。所以说广告系统整体的架构里面,实际上是将计费做成异步化的,然后中间就是加了一个 MQ。
我们当时为了确保说这个 MQ 传递过程中绝对不会把计费消息给弄丢,花了很多的精力。广告主投放了一个广告,明明说好了,用户点击一次扣费 1 块钱。结果要是用户动不动点击了一次,扣费的时候搞的消息丢了,我们公司就会不断的少几块钱,几块钱,积少成多,这个就对公司是一个很大的损失。
面试题剖析
这个丢数据,mq 一般分为两种,要么是 mq 自己弄丢了,要么是我们消费的时候弄丢了。咱们从 rabbitmq 和 kafka 分别来分析一下
rabbitmq 这种 mq,一般来说都是承载公司的核心业务的,数据是绝对不能弄丢的
RabbitMQ 可能存在的数据丢失问题
(1)rabbitmq
1)生产者弄丢了数据
生产者将数据发送到 rabbitmq 的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。
此时可以选择用 rabbitmq 提供的事务功能,就是生产者发送数据之前开启 rabbitmq 事务(channel.txSelect),然后发送消息,如果消息没有成功被 rabbitmq 接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,rabbitmq 事务机制一搞,基本上吞吐量会下来,因为太耗性能。
所以一般来说,如果你要确保说写 rabbitmq 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 rabbitmq 中,rabbitmq 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 rabbitmq 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 rabbitmq 接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
2)rabbitmq 弄丢了数据
就是 rabbitmq 自己弄丢了数据,这个你必须开启 rabbitmq 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 rabbitmq 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq 还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。
设置持久化有两个步骤,第一个是创建 queue 的时候将其设置为持久化的,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化 queue 里的数据;第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 rabbitmq 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
而且持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,rabbitmq 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。
哪怕是你给 rabbitmq 开启了持久化机制,也有一种可能,就是这个消息写到了 rabbitmq 中,但是还没来得及持久化到磁盘上,结果不巧,此时 rabbitmq 挂了,就会导致内存里的一点点数据会丢失。
3)消费端弄丢了数据
rabbitmq 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq 认为你都消费了,这数据就丢了。
这个时候得用 rabbitmq 提供的 ack 机制,简单来说,就是你关闭 rabbitmq 自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack?那 rabbitmq 就认为你还没处理完,这个时候 rabbitmq 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
(2)kafka
1)消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况,就是说,你那个消费到了这个消息,然后消费者那边自动提交了 offset,让 kafka 以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
这不是一样么,大家都知道 kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是会重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
生产环境碰到的一个问题,就是说我们的 kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。
然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了
2)kafka 弄丢了数据
这块比较常见的一个场景,就是 kafka 某个 broker 宕机,然后重新选举 partiton 的 leader 时。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,他不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了
所以此时一般是要求起码设置如下 4 个参数:
给这个 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本
在 kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧
在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了
我们生产环境就是按照上述要求配置的,这样配置之后,至少在 kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失
3)生产者会不会弄丢数据
如果按照上述的思路设置了 ack=all,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
其实这个也是用 MQ 的时候必问的话题,第一看看你了解不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这个生产系统中常见的问题。
我举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据要达到上亿。mysql -> mysql,常见的一点在于说大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。
你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog,接着这三条 binlog 发送到 MQ 里面,到消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增加、修改、删除;你楞是换了顺序给执行成删除、修改、增加,不全错了么。
本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。
先看看顺序会错乱的俩场景
(1)rabbitmq:一个 queue,多个 consumer,这不明显乱了
(2)kafka:一个 topic,一个 partition,一个 consumer,内部多线程,这不也明显乱了
(1)rabbitmq:拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;
或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理
(2)kafka:一个 topic,一个 partition,一个 consumer,内部单线程消费,写 N 个内存 queue,然后 N 个线程分别消费一个内存 queue 即可
如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了,或者消费的极其极其慢。接着就坑爹了,可能你的消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是整个这就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,导致比如 rabbitmq 设置了消息过期时间后就没了怎么办?
所以就这事儿,其实线上挺常见的,一般不出,一出就是大 case,一般常见于,举个例子,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 那儿了,不动了。或者是消费端出了个什么叉子,导致消费速度极其慢。
关于这个事儿,我们一个一个来梳理吧,先假设一个场景,我们现在消费端出故障了,然后大量消息在 mq 里积压,现在事故了,慌了
广告插入:欢迎关注纯干货技术公众号:Java 学习指南,每天阅读技术文章。
几千万条数据在 MQ 里积压了七八个小时,从下午 4 点多,积压到了晚上很晚,10 点多,11 点多
这个是我们真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复 consumer 的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟是 18 万条,1000 多万条
所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
1)先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉
2)新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍或者 20 倍的 queue 数量
3)然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue
4)接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据
5)这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的 consumer 机器来消费消息
假设你用的是 rabbitmq,rabbitmq 是可以设置过期时间的,就是 TTL,如果消息在 queue 中积压超过一定的时间就会被 rabbitmq 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加 consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。
这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次
如果走的方式是消息积压在 mq 里,那么如果你很长时间都没处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。
1. 如果让你写一个消息队列,该如何进行架构设计啊?说一下你的思路
其实聊到这个问题,一般面试官要考察两块:
(1)你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个 mq 的架构原理
(2)看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来
其实回答这类问题,说白了,起码不求你看过那技术的源码,起码你大概知道那个技术的基本原理,核心组成部分,基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好
比如说这个消息队列系统,我们来从以下几个角度来考虑一下
说实话,我一般面类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西。类似的问题,我经常问的还有,如果让你来设计一个 spring 框架你会怎么做?如果让你来设计一个 dubbo 框架你会怎么做?如果让你来设计一个 mybatis 框架你会怎么做?
其实回答这类问题,说白了,起码不求你看过那技术的源码,起码你大概知道那个技术的基本原理,核心组成部分,基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好
比如说这个消息队列系统,我们来从以下几个角度来考虑一下
(1)首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
(2)其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘,才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。
2. 其次你考虑一下你的 mq 的可用性啊?
这个事儿,具体参考我们之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
其实一个 mq 肯定是很复杂的,面试官问你这个问题,其实是个开放题,他就是看看你有没有从架构角度整体构思和设计的思维以及能力。确实这个问题可以刷掉一大批人,因为大部分人平时不思考这些东西。
更多的java课程学习路线,笔记,面试等架构资料,需要的同学可以私信我(资料)即可免费获取!
python刷题笔记6--Valid Parentheses
[BIRT报表]Platform.createFactoryObject returns null
springboot实现mysql主从_基于 SpringBoot,来实现MySQL读写分离技术
Java微服务开发指南 -- 使用Docker和Kubernetes构建可伸缩的微服务
蓝桥杯第十一届单片机国赛真题(CT107D——IAP15F2K61S2)
调整pandas dataframe输出到excel的工作表worksheet顺序
浏览器本地缓存 localstorage/sessionstorage/cookie - 身份认证 cookie/session/token