发布时间:2023-09-13 19:30
前言:
RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现
负责接收并存储消息
,同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServerProducer/Consumer
:通过查询接口获取Topic对应的Broker的地址信息和Topic-Queue的路由配置Broker
: 注册配置信息到NameServer, 实时更新Topic信息到NameServer一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次。
//设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
一个Consumer Group中的所有Consumer平均分摊消费消息(组内负载均衡)
//设置集群模式,也就是负载均衡模式
consumer.setMessageModel(MessageModel.CLUSTERING);
rocketMq使用轻量级的NameServer服务进行服务的协调和治理工作,NameServer多节点部署时相互独立互不干扰。每一个rocketMq服务节点(broker节点)启动时都会遍历配置的NameServer列表并建立长链接,broker节点每30秒向NameServer发送一次心跳信息、NameServer每10秒会检查一次连接的broker是否存活。消费者和生产者会随机选择一个NameServer建立长连接,通过定期轮训更新的方式获取最新的服务信息。架构简图如下:
NameServer:启动,监听端口,等待producer,consumer,broker连接上来
Broker:启动,与nameserver保持长链接,定期向nameserver发送心跳信息,包含broker的ip,端口,当前broker上topic的信息
producer:启动,随机选择一个NameServer建立长连接,拿到broker的信息,然后就可以给broker发送消息了
consumer:启动,随机选择一个NameServer建立长连接,拿到broker的信息,然后就可以建立通道,消费消息
RocketMQ 存储用的是本地文件存储系统,将所有topic的消息全部写入同一个文件中(commit log),这样保证了IO写入的绝对顺序性,最大限度利用IO系统顺序读写带来的优势提升写入速度。
由于消息混合存储在一起,需要将每个消费者组消费topic最后的偏移量记录下来。这个文件就是consumer queue(索引文件)。所以消息在写入commit log 文件的同时还需将偏移量信息写入consumer queue文件。在索引文件中会记录消息的物理位置、偏移量offse,消息size等,消费者消费时根据上述信息就可以从commit log文件中快速找到消息信息。
Broker 存储结构如下:
整个流程简介:
Producer 使用轮询的方式分别向每个 Queue 中发送消息。
Consumer 启动的时候会在 Topic,Consumer group 维度发生负载均衡,为每个客户端分配需要处理的 Queue。负载均衡过程中每个客户端都获取到全部的的 ConsumerID 和所有 Queue 并进行排序,每个客户端使用相同负责均衡算法,例如平均分配的算法,这样每个客户端都会计算出自己需要消费那些 Queue,每当 Consumer 增加或减少就会触发负载均衡,所以我们可以通过 RocketMQ 负载均衡机制实现动态扩容,提升客户端收发消息能力。客户端负责均衡为客户端分配好 Queue 后,客户端会不断向 Broker 拉取消息,在客户端进行消费。
这里有个小问题:
可以一直增加客户端的数量提升消费能力吗?当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。
topic 在创建之处可以设置 comsumer queue数量。而 comsumer 在启动时会和comsumer queue绑定,这个绑定策略是咋样的?
天然弊端:
RocketMQ 采用一个 consumer 绑定一个或者多个 Queue 模式,假如某个消费者服务器挂了,则会造成部分Queue消息堆积
RocketMQ 底层对 commitLog、consumeQueue 之类的磁盘文件的读写操作都采用了 mmap 技术。
传统缓存 IO:
传统 I/O 的工作方式是,数据读取和写入是从用户空间到内核空间来回复制,而内核空间的数据是通过操作系统层面的 I/O 接口从磁盘读取或写入。
传统IO发生了 4 次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是 read()
,一次是 write()
,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。
其次,还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的
传统IO,write() 过程是怎样?
wirte() 写请求 和 read(),需要先写入用户缓存区,然后通过系统调用,CPU 拷贝数据从用户缓存区到内核缓存区,再从内核缓存区拷贝到磁盘文件!
简述上述过程:
因为文件传输的应用场景中,在用户空间我们并不会对数据「再加工」,所以数据实际上可以不用搬运到用户空间,因此用户的缓冲区是没有必要存在的。
Mmap(内存映射):
read()
系统调用的过程中会把内核缓冲区的数据拷贝到用户的缓冲区里,于是为了减少这一步开销,我们可以用 mmap()
替换 read()
系统调用函数。
mmap()
系统调用函数会把文件磁盘地址「映射」到内核缓存区(page cache),而内核缓存区会 「映射」到用户空间(虚拟地址)。这样,操作系统内核与用户空间就不需要再进行任何的数据拷贝。
注意,这里用户空间(虚拟地址)不是直接映射到文件磁盘地址,而是文件对应的 page cache,用户虚拟地址一般是和用户内存地址「映射」的,如果使用内存映射技术,则用户虚拟地址可以和内核内存地址「映射」。
根据维基百科给出的定义:在大多数操作系统中,映射的内存区域实际上是内核的page cache,这意味着不需要在用户空间创建副本。多个进程之间也可以通过同时映射 page cache,来进行进程通信)
mmap() 函数简介:
void * mmap(void *start, size_t length, int prot , int flags, int fd, off_t offset)
从磁盘拷贝到内核空间的页缓存 (page cache),然后将用户空间的虚拟地址映射到内核的page cache,这样不需要再将页面从内核空间拷贝到用户空间了。
简述上述过程:
使用 mmap() 写数据到磁盘文件会怎样?
mmap() 将用户虚拟地址映射内核缓存区(内存物理地址)后,写数据直接将数据写入内核缓存区,只需要经过一次CPU拷贝,将数据从内核缓存区拷贝到磁盘文件;比传统 IO 的 write() 操作少了一次数据拷贝的过程!
在传统IO过程中,其中第一步都是先需要先把磁盘文件数据拷贝「内核缓冲区」里,这个「内核缓冲区」实际上是磁盘高速缓存(PageCache)。
接着给大家说几个Broker针对上述的磁盘文件高性能读写机制做的一些优化:
内存预映射机制:Broker 会针对磁盘上的各种 CommitLog、ConsumeQueue 文件预先分配好MappedFile,也就是提前对一些可能接下来要读写的磁盘文件,提前使用 MappedByteBuffer 执行 mmap() 函数完成内存映射,这样后续读写文件的时候,就可以直接执行了(减少一次 CPU 拷贝)。
文件预热:在提前对一些文件完成内存映射之后,因为内存映射不会直接将数据从磁盘加载到内存里来,那么后续在读,取尤其是 CommitLog、ConsumeQueue 文件时候,其实有可能会频繁的从磁盘里加载数据到内存中去。所以,在执行完 mmap() 函数之后,还会进行 madvise() 系统调用,就是提前尽可能将磁盘文件加载到内存里去。(读磁盘 -> 读内存)
// TODO
为了突破单个机器容量上限和单个机器读写性能,RocketMQ 支持 topic 数据分片
架构图如下:
详细请看这篇文章: 零拷贝(Zero-copy)和mmap
【kaldi】chain-model的TCP server部署
@FeignClient注解中属性contextId的使用说明
android-19_如何在Android上关闭COVID-19曝光跟踪和通知
看看谷歌如何在目标检测任务使用预训练权值 | CVPR 2022
NanoMQ Newsletter 2022-06|规则引擎正式发布 & NanoSDK 支持 MQTT over QUIC
[JavaWeb开发中]HTTP 协议的基本格式和Fiddler抓包工具
红帽官宣新任总裁兼 CEO!转型关键人物 Paul Cormier “退而不休”
Python 项目管理与构建工具:CookieCutter, PyScaffold, PyBuilder, Poetry