YUKIPEDIA's blog

一个普通的XMUER

《Summer Pockets》久島鴎推し


RocketMQ

目录

[TOC]

消息队列为什么会出现?

消息队列顾名思义就是存放消息的队列,我们需要理解的问题并不是消息队列是什么,而是消息队列为什么会出现?消息队列能用来干什么?会带来哪些好处和副作用?

对于一个分布式应用,必定涉及到各个系统之间的通信问题,这个时候消息队列也应运而生了,可以说分布式的产生是消息队列的基础。

消息队列能用来干什么?

异步

在刚开始学习分布式相关知识时,我们往往不太明白为什么好好的通信为什么中间非要插一个消息队列呢?我们不能直接进行通信吗?

如果我们的应用之间直接进行通信,就会涉及到一个概念:同步通信。比如现在业界使用比较多的 Dubbo 就是一个适用于各个系统之间同步通信的 RPC 框架。

举个例子,我们有一个购票系统,需求是用户在购买完之后能接收到购买完成的短信。

image.png

我们省略中间的网络通信时间消耗,假如购票系统处理需要 150ms ,短信系统处理需要 200ms ,那么整个处理流程的时间消耗就是 150 + 200 = 350ms 。

当然,乍看没什么问题。可是仔细一想你就感觉有点问题,我用户购票在购票系统的时候其实就已经完成了购买,而我现在通过同步调用非要让整个请求拉长时间,而短信系统这玩意又不是很有必要,它仅仅是一个辅助功能增强用户体验感而已。我现在整个调用流程就有点头重脚轻的感觉了,购票是一个不太耗时的流程,而我现在因为同步调用,非要等待发送短信这个比较耗时的操作才返回结果。那我如果再加一个发送邮件呢?

image.png

这样整个系统的调用链又变长了,整体响应时间也会因为应用间的同步调用而拉长。

所以,为了解决这一个问题,我们可以在这个过程中间加一层中间件——消息队列

image.png

这样,我们在将消息存入消息队列之后我们就可以直接返回了。另外举一个例子,你到饭店里吃饭,你只需要告诉服务员(消息队列)你想要吃什么,然后你就可以开始玩手机了。所以根据上图,整个相应耗时:150 + 10 = 160ms 。

解耦

回到最初同步调用的过程,我们写个伪代码简单概括一下。

public void purchaseTicket(Request request) {
	// 校验
	validate(request);
	// 购票
	Result result = purchase(request);
	// 发送短信
	sendMessage(result);
}

那么第二步,我们又添加了一个发送邮件,我们就得重新去修改代码,如果我们又加一个需求:用户购买完还需要给他加积分,这个时候我们是不是又得改代码?

public void purchaseTicket(Request request) {
	// 校验
	validate(request);
	// 购票
	Result result = purchase(request);
	// 发送短信
	sendMessage(result);
	// 添加积分
	addPoint(result);
	// ...其他需求
}

这样改来改去是不是很麻烦,那么此时我们就用一个消息队列在中间进行解耦。需要注意的是,我们后面的发送短信、发送邮件、添加积分等一些操作都依赖于上面的 result ,这东西抽象出来就是购票的处理结果,比如订单号、用户账号等等,也就是说我们后面的一系列服务都是需要同样的消息来进行处理。这样的话,我们就可以通过 “广播消息” 来实现。

上面所讲的 “广播” 并不是真正的广播,而是接下来的系统作为消费者去订阅特定的主题。比如我们这里的主题就可以叫做 订票 ,我们购买系统作为一个生产者去生产这条消息放入消息队列,然后消费者订阅了这个主题,会从消息队列中拉取消息并消费。就比如我们刚刚画的那张图,你会发现,在生产者这边我们只需要关注生产消息到指定主题中,而消费者只需要关注从指定主题中拉取消息就行了。

image.png

假如没有消息队列,每当一个新的业务接入,我们都要在主系统调用新接口;或者当我们取消某些业务,我们也得在主系统删除某些接口调用。

有了消息队列,我们只需要关心消息是否送达了消息队列,至于谁订阅,接下来消息如何处理,是下游的事情,无疑极大地减少了开发和联调的工作量。

削峰

我们再次回到一开始我们使用同步调用系统的情况,并且思考一下,如果此时有大量用户请求购票整个系统会变成什么样?

image.png

如果,此时有一万的请求进入购票系统,我们知道运行我们主业务的服务器配置一般会比较好,所以这里我们假设购票系统能承受这一万的用户请求,那么也就意味着我们同时也会出现一万调用发短信服务的请求。而对于短信系统来说并不是我们的主要业务,所以我们配备的硬件资源并不会太高,那么你觉得现在这个短信系统能承受这一万的峰值么,且不说能不能承受,系统会不会直接崩溃了?

短信业务又不是我们的主业务,我们能不能折中处理呢?如果我们把购买完成的信息发送到消息队列中,而短信系统尽自己所能地去消息队列中取消息进行消费,即使处理速度慢一点也无所谓,只要系统没崩溃就行。

RocketMQ 是什么?

RocketMQ 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式的特点。它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在 2016 年底贡献给 Apache,成为了 Apache 的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转。

队列模型和主题模型?

首先,消息队列为什么要叫消息队列?顾名思义,这玩意就是存放消息的队列。在早期的消息中间件开发中,很多都是通过队列这一模型来实现的,可能是历史原因,现在开发者都习惯把消息中间件叫成消息队列。

但是,如今例如 RocketMQKafka 这些优秀的消息中间件不仅仅是通过一个队列实现消息存储的。

队列模型

就像我们理解队列一样,消息中间件的队列模型就真的只是一个队列。

image.png

在前文提到了一个 “广播” 的概念,也就是说如果我们此时我们需要将一个消息发送给多个消费者(比如此时我需要将信息发送给短信系统和邮件系统),这个时候单个队列即不能满足需求了。

当然你可以让 Producer 生产消息放入多个队列中,然后每个队列去对应每一个消费者。问题是可以解决,创建多个队列并且复制多份消息是会很影响资源和性能的。而且,这样子就会导致生产者需要知道具体消费者个数然后去复制对应数量的消息队列,这就违背我们消息中间件的解耦这一原则。

主题模型

为了解决队列模型的不足,主题模型应运而生,我们也可以把主题模型成为发布订阅模型。

在主题模型中,消息的生产者称为发布者(Publisher) ,消息的消费者称为订阅者(Subscriber) ,存放消息的容器称为主题(Topic)

其中,发布者将消息发送到指定主题中,订阅者需要提前订阅主题才能接受特定主题的消息。

image.png

RocketMQ 中的消息模型

RocketMQ 中的消息模型就是按照主题模型所实现的。

对于主题模型的实现来说每个消息中间件的底层设计都是不一样的,就比如 Kafka 中的分区RocketMQ 中的队列RabbitMQ 中的 Exchange 。我们可以理解为主题模型/发布订阅模型就是一个标准,那些中间件只不过照着这个标准去实现而已。

所以,RocketMQ 中的主题模型到底是如何实现的呢?

image.png

在上图中,有 Producer GroupTopicConsumer Group 三个角色:

  • Producer Group 生产者组:代表某一类的生产者,比如我们有多个秒杀系统作为生产者,这多个合在一起就是一个 Producer Group 生产者组,它们一般生产相同的消息。
  • Consumer Group 消费者组:代表某一类的消费者,比如我们有多个短信系统作为消费者,这多个合在一起就是一个 Consumer Group 消费者组,它们一般消费相同的消息。
  • Topic 主题:代表一类消息,比如订单消息,物流消息等等。

生产者组中的生产者会向主题发送消息,而主题中存在多个队列,生产者每次生产消息之后都是向指定主题中的某个队列发送消息的

每个主题中都有多个队列(分布在不同的 Broker 中,如果是集群的话,Broker 又分布在不同的服务器中),集群消费模式下,一个消费者集群多台机器共同消费一个 topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1Consumer2 分别对应着两个队列,而 Consumer3 是没有队列对应的,所以一般来讲要控制 消费者组中的消费者个数和主题中队列个数相同

在上图中,每个消费组在每个队列上都维护了一个消费位置,这是为什么?

因为图上仅仅有一个消费者组,我们知道在发布订阅模式中一般会涉及到多个消费者组,而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组,那么消息被一个消费者组消费完之后是不会删除的。

image.png

上图的主题中维护了多个队列,那么我们可能会有这样的问题:为什么一个主题需要维护多个队列?

答案是提高并发能力。如果每个主题只存在一个队列,这个队列也维护着每个消费者组的消费位置,确实可以做到发布订阅模式。但这样的话,生产者是不是只能向一个队列发送消息?又因为需要维护消费位置,所以一个队列只能对应一个消费者组的消费者,这样其他的消费者就没有用武之地了,并发度一下子就小了很多。

所以总结来说,RocketMQ 通过使用在一个 Topic 中配置多个队列并且每个队列维护每个消费者组的消费位置实现了主题模式/发布订阅模式

RocketMQ 的架构图

RocketMQ 技术架构中有四大角色 NameServerBrokerProducerConsumer

  • Broker:主要负责消息的存储、投递和查询以及服务高可用保证。说白了就是消息队列服务器嘛,生产者生产消息到 Broker ,消费者从 Broker 拉取消息并消费。
    • 这里还需要理清楚 BrokerTopic 和队列的关系,我们知道,一个 Topic 中存在多个队列,那么这个 Topic 和队列存放在哪里呢?
    • 一个 Topic 分布在多个 Broker上,一个 Broker 可以配置多个 Topic ,它们是多对多的关系
    • 如果某个 Topic 消息量很大,应该给它多配置几个队列(上文中提到了提高并发能力),并且尽量多分布在不同 Broker 上,以减轻某个 Broker 的压力
    • Topic 消息量都比较均匀的情况下,如果某个 broker 上的队列越多,则该 broker 压力越大。

image.png

  • NameServerZooKeeperSpring Cloud 中的 Eureka ,它其实也是一个注册中心 ,主要提供两个功能:Broker 管理路由信息管理 。说白了就是 Broker 会将自己的信息注册到 NameServer 中,此时 NameServer 就存放了很多 Broker 的信息(Broker 的路由表),消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)。
  • Producer:消息发布的角色,支持分布式集群方式部署。说白了就是生产者。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。说白了就是消费者。

image.png

看到这里,可能会发现一个问题,这 NameServer 干啥用的,这不多余吗?直接 ProducerConsumerBroker 直接进行生产消息,消费消息不就好了么?

但是,如上文提到的,Broker 是需要保证高可用的,如果整个系统仅仅靠一个 Broker 来维持的话,那么这个 Broker 的压力会不会很大?所以我们需要使用多个 Broker 来保证负载均衡

如果说,我们的消费者和生产者直接和多个 Broker 相连,那么当 Broker 修改的时候必定会牵连着每个生产者和消费者,这样就会产生耦合问题,而 NameServer 注册中心就是用来解决这个问题的。

当然,RocketMQ 中的技术架构肯定不止前面那么简单,因为上面图中的四个角色都是需要做集群的。

image.png

第一:我们的 Broker做了集群并且还进行了主从部署 ,由于消息分布在各个 Broker 上,一旦某个 Broker 宕机,则该 Broker 上的消息读写都会受到影响。所以 Rocketmq 提供了 master/slave 的结构,salve 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,slave 提供消费服务,但是不能写入消息

第二:为了保证 HA(高可用) ,我们的 NameServer 也做了集群部署,但是请注意它是去中心化的。也就意味着它没有主节点,你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的,在 RocketMQ 中是通过 单个 Broker 和所有 NameServer 保持长连接 ,并且在每隔 30 秒 Broker 会向所有 Nameserver 发送心跳,心跳包含了自身的 Topic 配置信息,这个步骤就对应这上面的 Routing Info

第三:在生产者需要向 Broker 发送消息的时候,需要先从 NameServer 获取关于 Broker 的路由信息,然后通过轮询的方法去向每个队列中生产数据以达到负载均衡的效果。

第四:消费者通过 NameServer 获取所有 Broker 的路由信息后,向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下,一条消息会发送给同一个消费组中的所有消费者 ,集群模式下消息只会发送给一个消费者。

RocketMQ 的原生事务消息

在我的面试中,面试官常常会问这样的问题:消息中间件为什么选择 RocketMQ 而不是其他的消息队列?选型上有什么考虑吗?

我们可以从 RocketMQ 的优点逐一说明:

  • RocketMQ 吞吐量非常大,支持百万级 TPS ,相较于其他两种消息队列更符合本项目的高并发需求
  • RocketMQ 支持原生事务消息,适用于分布式事务,Kafka 和 RabbitMQ 都没有事务支持
  • RocketMQ 延迟是毫秒级,相较于另外两种消息队列响应更迅速

其中,RocketMQ 支持的原生事务消息是实现分布式事务的一种方法,类似于数据库的 XA 事务,它采用 “两阶段提交” 机制,保证分布式一致性。

事务消息的流程:

  1. 第一阶段(Prepare 阶段)
    • 生产者先发送半消息(half message)到 RocketMQ ,此时这条消息并不会被消费
    • RocketMQ 返回确认,表示消息已存储,但不会投递给消费者
  2. 执行本地事务
    • 生产者执行本地事务(比如扣减库存,记录订单状态)
  3. 第二阶段(提交/回滚)
    • 如果本地事务执行成功,生产者提交消息,RocketMQ 正式投递消息给消费者
    • 如果本地事务失败,生产者回滚消息,RocketMQ 删除该消息

补偿机制(事务回查):

RocketMQ 在某些情况下可能无法确认事务的最终状态,比如在生产者发送半消息后宕机了,此时 RocketMQ 就获取不到生产者本地事务的执行情况,因此引入了 “补偿机制”

在上面事务消息的流程中,如果消息队列没有收到生产者的事务消息(生产者宕机、网络问题),他就会回查生产者,确认事务状态

如果 RocketMQ 多次回查后,仍然不知道事务的状态,那么可能会丢弃消息,或者之后定期回查。同时,由于可能会出现多次回查,我们必须保证回查方法的幂等性

RocketMQ 性能为什么不如 Kafka?

零拷贝

如果用户想要将数据从磁盘发送到网络,会发生下面几件事:

程序发起系统调用 read() ,尝试读取磁盘数据,

  • 磁盘数据从设备 拷贝 到内核空间的缓冲区
  • 再从内核空间的缓冲区 拷贝 到用户空间

程序再发起系统调用 write() ,将读到的数据发到网络:

  • 数据从用户空间 拷贝 到 socket 发送缓冲区
  • 再从 socket 发送缓冲区 拷贝 到网卡

最终数据就会经过网络到达消费者。

image.png

整个过程,本机内发生了 2 次系统调用,对应 4 次用户空间和内核空间的切换,以及 4 次数据拷贝

如何优化这个过程?可以采用 零拷贝 技术,常见的方案有两种,分别是 mmapsendfile

什么是 mmap?

mmap 是操作系统内核提供的一个方法,可以将内核空间的缓冲区 映射 到用户空间。用了它,整个发送流程就会发生一些变化。

程序发起系统调用 mmap() ,尝试读取磁盘数据,具体情况如下:

  • 磁盘数据从设备 拷贝 到内核空间的缓冲区
  • 内核空间的缓冲区 映射 到用户空间,此处不需要拷贝

程序再发起 系统调用 write() ,将读到的数据发到网络:

  • 数据从内核空间缓冲区 拷贝 到 socket 发送缓冲区
  • 再从 socket 发送缓冲区 拷贝 到网卡

image.png

整个过程发生了 2 次系统调用,对应 4 次用户空间和内核空间的切换,以及 3 次数据拷贝,对比之前,省下了一次拷贝。

注意,mmap 作为一种零拷贝技术,指的是 用户空间到内核空间 这个过程不需要拷贝,不是说数据从磁盘到网卡这个过程零拷贝。

什么是 sendfile?

sendfile 也是内核提供的一个方法,程序发起系统调用 sendfile() 后,内核会尝试读取磁盘数据然后发送,具体过程如下:

  • 磁盘数据从设备 拷贝 到内核空间的缓冲区
  • 内核空间缓冲区里的设备 可以直接拷贝到网卡

image.png

整个过程发生了 1 次系统调用,对应 2 次用户空间和内核空间的切换,以及 2 次数据拷贝

这里的零拷贝,指的是 零 CPU 拷贝,也就是说 sendfile 场景下需要的两次拷贝,都不是 CPU 直接参与的拷贝,而是其他硬件设备做的拷贝,不耽误 CPU 跑程序。

Kafka 为什么性能比 RocketMQ 好?

这是因为 RocketMQ 使用的是 mmap 零拷贝技术,而 Kafka 使用的是 sendfile。Kafka 以更少的拷贝次数以及系统调用切换次数为代价,获得了更高的性能。

那为什么 RocketMQ 不使用 sendfile 呢?

sendfile() 函数:

ssize_t sendfile(int out_fd, int in_fd, off_t* offset, size_t count);
// num = sendfile(xxx);

mmap() 函数:

void *mmap(void *addr, size_t length, int prot, int flags,
           int fd, off_t offset);
// buf = mmap(xxx)

可以看出,mmap 返回的是数据的 具体内容,应用层能获取到消息内容并进行一些逻辑处理,而 sendfile 返回的是成功发送了几个字节数,具体发送了什么内容,应用层是无法获知的

而 RocketMQ 的一些功能,需要了解消息的具体内容,方便进行二次投递等操作,比如将消费失败的消息扔到死信队列。如果使用的是 sendfile ,就没法获取到消息内容长什么样子,也就没法实现一些好用的功能了。

Kafka 则没有这些功能特性,追求极致的性能,正好可以使用 sendfile。

那 Kafka 和 RocketMQ 到底怎么选呢?

如果是大数据场景,比如 spark、flink 这些,就用 Kafka;除此之外,尽量用 RocketMQ。

参考:https://golangguide.top/%E4%B8%AD%E9%97%B4%E4%BB%B6/rocketmq/%E6%A0%B8%E5%BF%83%E7%9F%A5%E8%AF%86%E7%82%B9/RocketMQ%E4%B8%BA%E4%BB%80%E4%B9%88%E6%80%A7%E8%83%BD%E4%B8%8D%E5%A6%82Kafka.html