hello云胜

技术与生活

0%

RocketMq高效的原因

1,commitlog的顺序写

Rocketmq存储模块

源码就在store模块下,入口类DefaultMessageStore

该类定义了很多重要的属性,如commitlog文件,index文件,consumequeue文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 存储相关的配置,如存储位置,commitlog文件大小等
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;

// 队列,按消息主题分组了
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

// 消息队列的刷盘线程服务
private final FlushConsumeQueueService flushConsumeQueueService;

// 删除commitlog的线程服务
private final CleanCommitLogService cleanCommitLogService;

// 清楚consumeQueue文件的线程
private final CleanConsumeQueueService cleanConsumeQueueService;

// 索引文件相关服务
private final IndexService indexService;

// 文件内存映射服务
private final AllocateMappedFileService allocateMappedFileService;

//CommitLog消息分发服务,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final ReputMessageService reputMessageService;

commitlog文件

我们先必须清楚commitlog文件在服务器上是怎么存储的。

代码中的commitlog 对应磁盘上的 commitlog文件夹

代码commitlog中有一个MappedFileQueue 对应磁盘上 commitlog下的文件列表

MappedFileQueue 里的MappedFile 映射 磁盘上具体的一个commitlog文件

入口方法

org.apache.rocketmq.store.DefaultMessageStore#putMessage

开始做了一些校验,校验了broker状态和msg的状态等

然后调用

1
this.commitLog.putMessage(msg);

putMessage方法

1,对消息体计算CRC32

2,获取当前正在写入的mappedFile

mappedFile非常重要。

我们知道RocketMq的消息是存储在commitlog中的,这是从大的方面理解。

继续往下追,commitlog又是基于mappedFileQueue

1
protected final MappedFileQueue mappedFileQueue;

从名字上就能看出,mappedFileQueue是一个队列,它的底层又是

1
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

MappedFile队列。

MappedFile对应着最终磁盘上的存储文件,

1
private MappedByteBuffer mappedByteBuffer;

是MappedByteBuffer的封装,消息存储跟磁盘、内存的交互都是通过它完成。

获取当前正在写入的mappedFile

1
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

如果是null,或者满了。新创建一个文件

1
2
3
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}

3,追加消息

1
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

这里传一个回调接口appendMessageCallback。appendMessageCallback是一个内部类,这个commitlog类接近2000行了,个人认为这里不该用内部类,拆分出去更好。

MappedFile的几个指针

wrotePosition : 文件的写入指针。标记当前写到文件的位置。

committedPosition: 提交位置指针

flushedPosition : 刷盘位置指针

4,回调append

回调commitlog的内部类方法。代码 org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)

4.1 计算msgid
4.2获取consume queue的偏移量
1
2
3
4
5
6
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
4.3 消息的size校验
4.4 消息写入bytebuffer
4.5更新队列的offset
1
2
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);

5,成功后,刷盘线程和同步线程

1
2
3
4
5
6
//释放锁
putMessageLock.unlock();
//刷盘
handleDiskFlush(result, putMessageResult, msg);
//执行HA主从同步
handleHA(result, putMessageResult, msg);

刷盘

消息都是先存到内存中,MappedByteBuffer中。然后根据是同步刷盘还是异步刷盘进行不同的刷盘策略。

文件内存映射

RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

MappedFile类对应commitlog文件夹下的消息文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static final int OS_PAGE_SIZE = 1024 * 4;//操作系统的页大小,默认是4K
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前JVM实例中MappedFile虚拟内存
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//当前JVM实例中MappedFile对象个数
protected final AtomicInteger wrotePosition = new AtomicInteger(0);//当前文件的写指针
protected final AtomicInteger committedPosition = new AtomicInteger(0);//当前文件的提交指针
private final AtomicInteger flushedPosition = new AtomicInteger(0);//刷写到磁盘指针
protected int fileSize;//文件大小
protected FileChannel fileChannel;//文件通道
/**
* 消息都是先放到这,然后再放进文件通道
*/
protected ByteBuffer writeBuffer = null;//堆外内存ByteBuffer
protected TransientStorePool transientStorePool = null;//堆外内存池
private String fileName;//文件名称
private long fileFromOffset;//该文件的初始偏移量
private File file;//物理文件
private MappedByteBuffer mappedByteBuffer;//物理文件对应的内存映射Buffer
private volatile long storeTimestamp = 0;//文件最后一次内容写入时间
private boolean firstCreateInQueue = false;//是否是MappedFileQueue队列中第一个文件

consumequeue文件

commitlog文件时顺序写,这样可以极大的提高写性能,但是如果随机读取就会效率很低。所以就有了consumequeue呵index文件

在messageStore初始化的时候,会开启一个线程reputMessageService,进行消息的分发。根据commitlog准实时的更新consumequeue中的偏移量

1
2
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

可以看到其run方法,不断执行doReput。

看看doReput都在干嘛

RocketMq设计原理

1,专业术语

  • Producer

消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Push Consumer

Consumer 的一种,推拉模型中的推模式。应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立

刻回调 Listener 接口方法。

  • Pull Consumer

Consumer 的一种,推拉模型中的拉模式。应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

  • Producer Group

一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。

  • Consumer Group

一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。

  • Broker

消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。

  • 广播消费

一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer

Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意

义。

在 CORBA Notification 规范中,消费方式都属于广播消费。

在 JMS 规范中,相当于 JMS publish/subscribe model

  • 集群消费

一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个

Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。

在 CORBA Notification 规范中,无此消费方式。

在 JMS 规范中,JMS point-to-point model 与之类似,但是 RocketMQ 的集群消费功能大等于 PTP 模型。

因为 RocketMQ 单个 Consumer Group 内的消费者类似于 PTP,但是一个 Topic/Queue 可以被多个 Consumer

Group 消费。

  • 顺序消息

消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺

序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送

的顺序去消费消息。

  • 普通顺序消息

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列

总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。

如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方

式比较合适。

  • 严格顺序消息

顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只

要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。

如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不

可用。

目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推

荐使用普通的顺序消息。

  • Message Queue

在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储

单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100

年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来

删除。

也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。

2,消息中间件需要解决哪些问题

本节阐述消息中间件通常需要解决哪些问题,在解决这些问题当中会遇到什么困难,RocketMQ 是否可以解决,

规范中如何定义这些问题。

4.1 Publish/Subscribe

发布订阅是消息中间件的最基本功能,也是相对于传统 RPC 通信而言。在此不再详述。

4.2 消息优先级

规范中描述的优先级是指在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消

息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。

由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特

意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级

的队列, 将不同优先级发送到不同队列即可。

对于优先级问题,可以归纳为 2 类

  1. 只要达到优先级目的即可,不是严格意义上的优先级,通常将优先级划分为高、中、低,或者再多几个级

别。每个优先级可以用不同的 topic 表示,发消息时,指定不同的 topic 来表示优先级,这种方式可以解决

绝大部分的优先级问题,但是对业务的优先级精确性做了妥协。

  1. 严格的优先级,优先级用整数表示,例如 0 ~ 65535,这种优先级问题一般使用不同 topic 解决就非常不合

适。如果要让 MQ 解决此问题,会对 MQ 的性能造成非常大的影响。这里要确保一点,业务上是否确实需

要这种严格的优先级,如果将优先级压缩成几个,对业务的影响有多大?

4.3 消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创

建,订单付款,订单完成。消费时,要按照这个顺序消费才能有意义。但是同时订单之间是可以并行消费的。

RocketMQ 可以严格的保证消息有序。

4.4 消息过滤

  • Broker 端消息过滤

在 Broker 中,按照 Consumer 的要求做过滤,优点是减少了对于 Consumer 无用消息的网络传输。

缺点是增加了 Broker 的负担,实现相对复杂。

(1). 淘宝 Notify 支持多种过滤方式,包含直接按照消息类型过滤,灵活的语法表达式过滤,几乎可以满足

最苛刻的过滤需求。

(2). 淘宝 RocketMQ 支持按照简单的 Message Tag 过滤,也支持按照 Message Header、body 进行过滤。

(3). CORBA Notification 规范中也支持灵活的语法表达式过滤。

  • Consumer 端消息过滤

这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到 Consumer 端。

4.5 消息持久化

消息中间件通常采用的几种持久化方式:

(1). 持久化到数据库,例如 Mysql。

(2). 持久化到 KV 存储,例如 levelDB、伯克利 DB 等 KV 存储系统。

(3). 文件记录形式持久化,例如 Kafka,RocketMQ

(4). 对内存数据做一个持久化镜像,例如 beanstalkd,VisiNotify

(1)、(2)、(3)三种持久化方式都具有将内存队列 Buffer 进行扩展的能力,(4)只是一个内存的镜像,作用是当 Broker

挂掉重启后仍然能将之前内存的数据恢复出来。

JMS 与 CORBA Notification 规范没有明确说明如何持久化,但是持久化部分的性能直接决定了整个消息中间件

的性能。

RocketMQ 参考了 Kafka 的持久化方式,充分利用 Linux 文件系统内存 cache 来提高性能。

4.6 消息可靠性

影响消息可靠性的几种情况:

(1). Broker 正常关闭

(2). Broker 异常 Crash

(3). OS Crash

(4). 机器掉电,但是能立即恢复供电情况。

(5). 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)

(6). 磁盘设备损坏。

(1)、(2)、(3)、(4)四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或

者丢失少量数据(依赖刷盘方式是同步还是异步)。

(5)、(6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通

过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,

同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。

RocketMQ 从 3.0 版本开始支持同步双写。

4.7 低延迟

在消息不堆积情况下,消息到达 Broker 后,能立刻到达 Consumer。

RocketMQ 使用长轮询 Pull 方式,可保证消息非常实时,消息实时性不低于 Push。

4.8 At least Once

是指每个消息必须投递一次

RocketMQ Consumer 先 pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,

所以 RocketMQ 可以很好的支持此特性。

4.9 Exactly Only Once

(1). 发送消息阶段,不允许发送重复的消息。

(2). 消费消息阶段,不允许消费重复的消息。

只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环

境下,不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能,并不保证此特性,要求在业务上进行去重,

也就是说消费消息要做到幂等性。RocketMQ 虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消

费情况,只有网络异常,Consumer 启停等异常情况下会出现消息重复。

此问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才产生了消息重复性问

题。

4.10 Broker Buffer 满了怎么办?

Broker 的 Buffer 通常指的是 Broker 中一个队列的内存 Buffer 大小,这类 Buffer 通常大小有限,如果 Buffer 满

了以后怎么办?

下面是 CORBA Notification 规范中处理方式:

(1). 拒绝接收

拒绝新来的消息,向 Producer 返回 RejectNewEvents 错误码。

(2). 按照特定策略丢弃已有消息

a) AnyOrder - 随机丢弃

b) FifoOrder -先进先出,最早收到的消息最先丢弃

c) LifoOrder - 后进先出,最后收到的消息最先丢弃

d) PriorityOrder - 按照优先级,优先级低的先丢弃

e) DeadlineOrder - 按过期时间,离过期时间最短的最先丢弃

RocketMQ 没有内存 Buffer 概念,RocketMQ 的队列都是持久化磁盘,数据定期清除。

对于此问题的解决思路,RocketMQ 同其他 MQ 有非常显著的区别,RocketMQ 的内存 Buffer 抽象成一个无限

长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker 会定期删除过期的数据,例如

Broker 只保存 3 天的消息,那么这个 Buffer 虽然长度无限,但是 3 天前的数据会被从队尾删除。

4.11 回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向

Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,

恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。

RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯(即跳过一段时间)。

4.12 消息堆积

消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要

求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:

(1). 消息堆积在内存 Buffer,一旦超过内存 Buffer,可以根据一定的丢弃策略来丢弃消息,如 CORBA Notification

规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存 Buffer 大小,而且消息

堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。

(2). 消息堆积到持久化存储系统中,例如 DB,KV 存储,文件记录形式。

当消息不能在内存 Cache 命中时,要不可避免的访问磁盘,会产生大量读 IO,读 IO 的吞吐量直接决定了

消息堆积后的访问能力。

评估消息堆积能力主要有以下四点:

(1). 消息能堆积多少条,多少字节?即消息的堆积容量。

(2). 消息堆积后,发消息的吞吐量大小,是否会受堆积影响?

(3). 消息堆积后,正常消费的 Consumer 是否会受影响?

(4). 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?

4.13 分布式事务

已知的几个分布式事务规范,如 XA,JTA 等。其中 XA 规范被各大数据库厂商广泛支持,如 Oracle,Mysql 等。

其中 XA 的 TM 实现佼佼者如 Oracle Tuxedo,在金融、电信等领域被广泛应用。

分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然需要 KV 存储的支持,因为第二阶段的提交回

滚需要修改消息状态,一定涉及到根据 Key 去查找 Message 的动作。RocketMQ 在第二阶段绕过了根据 Key 去查找

Message 的问题,采用第一阶段发送 Prepared 消息时,拿到了消息的 Offset,第二阶段通过 Offset 去访问消息,

并修改状态,Offset 就是数据的地址。

RocketMQ 这种实现事务方式,没有通过 KV 存储做,而是通过 Offset 方式,存在一个显著缺陷,即通过 Offset

更改数据,会令系统的脏页过多,需要特别关注。

4.14 定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能

被消费。

如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不

可避免的产生巨大性能开销。

RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。

4.15 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为

有以下几种情况

  1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10s 秒后再重试。

  2. 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ概览

1 RocketMQ 是什么?

RocketMQ 是什么

image-20200927145619799

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

  • Producer、Consumer、队列都可以分布式。

  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 consumer

实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 topic 对应的队列集合。

  • 能够保证严格的消息顺序

  • 提供丰富的消息拉取模式

  • 高效的订阅者水平扩展能力

  • 实时的消息订阅机制

  • 亿级消息堆积能力

  • 较少的依赖

2 RocketMQ 物理部署结构

image-20200927152657426

RocketMQ 网络部署特点

  • Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能

对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId

为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节

点建立长连接,定时注册 Topic 信息到所有 Name Server。

  • Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路

由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可

集群部署。

  • Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路

由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer

既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

注,consumer是和master/slave均建立长连接,producer只和master建立长连接

3 RocketMQ 逻辑部署结构

image-20200927152910420

  • Producer Group

用来表示一个发送消息应用,一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以

是一台机器的多个进程,或者一个进程的多个 Producer 对象。一个 Producer Group 可以发送多个 Topic

消息,Producer Group 作用如下:

  1. 标识一类 Producer

  2. 可以通过运维工具查询这个发送消息应用下有多个 Producer 实例

  3. 发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意

一台机器来确认事务状态。

  • Consumer Group

用来表示一个消费消息应用,一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可

以是多个进程,或者是一个进程的多个 Consumer 对象。一个 Consumer Group 下的多个 Consumer 以均摊

方式消费消息,如果设置为广播方式,那么这个 Consumer Group 下的每个实例都消费全量数据。

RocketMQ 存储特点

1 零拷贝原理

Consumer 消费消息过程,使用了零拷贝,零拷贝包含以下两种方式

  1. 使用 mmap + write 方式

优点:即使频繁调用,使用小块文件传输,效率也很高

缺点:不能很好的利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂,需要避免 JVM Crash

问题。

  1. 使用 sendfile 方式

优点:可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题。

缺点:小块文件效率低于 mmap 方式,只能是 BIO 方式传输,不能使用 NIO。

RocketMQ 选择了第一种方式,mmap+write 方式,因为有小块数据传输的需求,效果会比 sendfile 更好。

关于 Zero Copy 的更详细介绍,请参考以下文章

http://www.linuxjournal.com/article/6345

项目开源主页:https://github.com/alibaba/RocketMQ

6.2 文件系统

RocketMQ 选择 Linux Ext4 文件系统,原因如下:

Ext4 文件系统删除 1G 大小的文件通常耗时小于 50ms,而 Ext3 文件系统耗时约 1s 左右,且删除文件时,磁盘

IO 压力极大,会导致 IO 写入超时。

文件系统层面需要做以下调优措施

文件系统 IO 调度算法需要调整为 deadline,因为 deadline 算法在随机读情况下,可以合并读请求为顺序跳跃

方式,从而提高读 IO 吞吐量。

Ext4 文件系统有以下 Bug,请注意

http://blog.donghao.org/2013/03/20/%E4%BF%AE%E5%A4%8Dext4%E6%97%A5%E5%BF%97%EF%BC

%88jbd2%EF%BC%89bug/

6.3 数据存储结构

14项目开源主页:https://github.com/alibaba/RocketMQ

6.4 存储目录结构

|– abort

|– checkpoint

|– config

| |– consumerOffset.json

| |– consumerOffset.json.bak

| |– delayOffset.json

| |– delayOffset.json.bak

| |– subscriptionGroup.json

| |– subscriptionGroup.json.bak

| |– topics.json

| `– topics.json.bak

|– commitlog

| |– 00000003384434229248

| |– 00000003385507971072

| `– 00000003386581712896

`– consumequeue

|– %DLQ%ConsumerGroupA

| `– 0

| `– 00000000000006000000

|– %RETRY%ConsumerGroupA

| `– 0

| `– 00000000000000000000

|– %RETRY%ConsumerGroupB

| `– 0

| `– 00000000000000000000

|– SCHEDULE_TOPIC_XXXX

| |– 2

| | `– 00000000000006000000

| |– 3

| | `– 00000000000006000000

|– TopicA

| |– 0

| | |– 00000000002604000000

| | |– 00000000002610000000

| | `– 00000000002616000000

| |– 1

| | |– 00000000002610000000

| | `– 00000000002616000000

|– TopicB

| |– 0

| | `– 00000000000732000000

| |– 1

| | `– 00000000000732000000

| |– 2

| | `– 00000000000732000000

项目开源主页:https://github.com/alibaba/RocketMQ

6.5 数据可靠性

RocketMq的Producer配置

DefaultMQProducer

image-20201021161520883

所有的消息发送都通过DefaultMQProducer作为入口。DefaultMQProducer继承ClientConfig。ClientConfig中是Producer和Consumer都会用到的配置。

所以先看ClientConfig中的配置

ClientConfig

namesrvAddr

namesrv的地址,一般我们都会设置。多个的话用;隔开。

不设置的话,会从环境变量中取。

clientIP

客户端ip。默认值是获取本机ip。

这个值还是很有用的。会保存到消息的bornHost字段中,标记消息是哪台机器发出的。

这里有获取本机ip的代码,getLocalAddress可以参考使用。

instanceName

客户端实例名

默认值是从环境变量rocketmq.client.name取,或者使用DEFAULT

虽然这里设置了默认值DEFAULT

但是在客户端启动时,会判断如果是DEFAULT,会用进程号PID来作为实例名

1
2
3
4
5
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}

RocketMq源码解析(一)

源码下载

从github官网下载https://github.com/apache/rocketmq

image-20201009091752934

我这里下载的是当前最新代码,4.7.1的版本

导入idea

image-20201009092657602

各个代码包的功能

  • acl:权限控制模块
  • broker: broker 模块(broke 启动进程)
  • client :消息客户端,包含消息生产者、消息消费者相关类
  • common :公共包
  • dev :开发者信息(非源代码)
  • distribution :部署实例文件夹(非源代码)
  • docs:很多官方文档。虽然不是源码,但很重要。强烈建议通读一遍。
  • example: RocketMQ 例代码
  • filter :消息过滤相关基础类
  • logappender:日志实现相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessageing:消息开放标准
  • remoting:远程通信模块,基于Netty。
  • srcutil:服务工具类
  • store:消息存储实现相关类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类

启动测试环境

编译打包

1
mvn clean package -Dmaven.test.skip=true

启动NameSrv

先启动nameSrv

先建一个conf目录,存放需要的配置文件。

image-20201010151316718

distribution目录中的broker.conflogback_broker.xmllogback_namesrv.xml复制过去一份

再配置一下Rocketmq_home环境参数,用来找到配置文件。如果不配置,会启动失败

Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation

image-20201010151438237

启动

成功会打印

1
The Name Server boot success. serializeType=JSON

详细的日志,可以去日志文件中看。在logback的配置文件中可以找到。

启动Broker

修改conf文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
# 自动创建topic,方便测试
autoCreateTopicEnable=true
# 存储路径
storePathRootDir=D:\\github\\rocketmq-master\\dataDir
# commitLog路径
storePathCommitLog=D:\\github\\rocketmq-master\\dataDir\\commitlog

配置启动参数

image-20201010153314946

注意,要配置一个-c参数,执行broker的配置文件

启动成功

1
The broker[broker-a, x.x.x.x:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

此时会产生下列文件

image-20201110173130142

发送消息

在org.apache.rocketmq.example.quickstart包下,有最简单的测试代码。设置topic名为TopicTest

1
2
3
4
5
6
DefaultMQProducer producer = new DefaultMQProducer("my_produce_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

执行main方法,可以看到消息发送成功的日志

查看datadir目录,又生成了文件。最重要的就是commitlog和consumequeue

commitlog下的文件,直接就是占1G空间的文件。

image-20201010155153719

并且commitlog文件的命名就是以文件的数据偏移量命名,1G = 1073741824byte

image-20201110174730262

image-20201110175259130

consumequeue下的是以topic名字分组的消费队列进度。文件大小默认是5860KB。

image-20201110175108593

index下是消息索引文件,目的是为了快速定位到消息。大小默认是400M。名字默认是时间戳。

消费消息

1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consume_group_name");
consumer.setNamesrvAddr("127.0.0.1:9876")

运行main方法,可以看到消息消息掉了。

RocketMq源码知识点学习

RandomAccessFile 和 FileLock

在看store源码时,DefaultMessageStore中定义了一个FileLock属性

1
2
private RandomAccessFile lockFile;
private FileLock lock;

以前没有用过。学习一下

这两个属性用在start方法中,用来对一个文件或文件夹加锁。

1
2
3
4
5
6
7
8
9
public void start() throws Exception {

lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}

lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);

RandomAccessFile还比较常用,但是一般用来读写文件,因为可以指定位置的读写所以比较方便。

而这里是通过对文件进行加锁,保证只有一个store在启动。

哪个文件呢?在前面的代码中可以找到,就是store存储目录

1
2
3
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");

我们重点要学一下tryLock方法。

看注释说明,tryLock的特点是非阻塞的,尝试获取文件锁会立即返回。

获得的FileLock是进程级别的,不是线程级别的。换句话说文件锁可以解决多个进程并发访问、修改同一个文件的问题,但不能解决多线程并发访问、修改同一文件的问题。就是说同一进程内(同一个JVM)的多个线程,可以同时访问、修改此文件。但是其他JVM不可以。

文件锁是当前程序所属的JVM实例持有的,一旦获取到文件锁(对文件加锁),要调用release(),或者关闭对应的FileChannel对象,或者当前JVM退出,才会释放这个锁。

代码验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FileLockTest {
public static void main(String[] args) throws IOException {
File file = new File("D:\\tmp\\lock.txt");
RandomAccessFile lockFile = new RandomAccessFile(file, "rw");

FileLock lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
System.out.println("get locked failed");
} else {
System.out.println("get locked:" + lock.isValid());
}

Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("running");
}, 10, 60, TimeUnit.SECONDS);
}
}

启动第一个JVM获得锁,但是后面启动的获取文件锁失败。

image-20201106152919444

image-20201106152941556

这还是挺神奇的,怎么做到的?

依赖于底层操作系统,是由操作系统底层来实现的(如在 Win 的进程间不能同时读写一个文件,而在 Linux 的不同进程可以同时读写一个文件,称为询问式文件锁,所以要求程序按照规范判断文件是否加锁。)

文件锁分为2类:

  • 排它锁:又叫独占锁。对文件加排它锁后,该进程可以对此文件进行读写,该进程独占此文件,其他进程不能读写此文件,直到该进程释放文件锁。
  • 共享锁:某个进程对文件加共享锁,其他进程也可以访问此文件,但这些进程都只能读此文件,不能写。线程是安全的。只要还有一个进程持有共享锁,此文件就只能读,不能写。

有4种获取文件锁的方法:

  • lock() //对整个文件加锁,默认为排它锁。
  • lock(long position, long size, booean shared) //自定义加锁方式。前2个参数指定要加锁的部分(可以只对此文件的部分内容加锁),第三个参数值指定是否是共享锁。
  • tryLock() //对整个文件加锁,默认为排它锁。
  • tryLock(long position, long size, booean shared) //自定义加锁方式。

如果指定为共享锁,则其它进程可读此文件,所有进程均不能写此文件,如果某进程试图对此文件进行写操作,会抛出异常。

lock与tryLock的区别:

  • lock是阻塞式的,如果未获取到文件锁,会一直阻塞当前线程,直到获取文件锁
  • tryLock和lock的作用相同,只不过tryLock是非阻塞式的,tryLock是尝试获取文件锁,获取成功就返回锁对象,否则返回null,不会阻塞当前线程。

自旋锁与可重入锁

在RocketMq的broker落盘消息的store代码中,append数据之前加了锁。默认是使用自旋锁。

1
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config

那么,在mq这种超高并发的情况下,怎么保证线程安全且兼顾效率?

spinlock自旋锁

为什么叫自旋锁呢?

是因为这种获取锁的方式是当一个线程在获取锁时,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁。就像不断的在门口转圈,查看锁开了没。

好处是节省有上下文的切换。坏处是,如果竞争过多,会空耗cpu。

非自旋锁在获取不到锁的时候会进入阻塞状态,从而进入内核态,当获取到锁的时候需要从内核态恢复,需要线程上下文切换。 (线程被阻塞后便进入内核态,这样导致系统在用户态与内核态之间来回切换,会影响锁的性能)

可以这样理解,内核态相当于一个休息室,非自旋锁发现锁着,就到休息室去了,一会再从休息室过来。自旋锁就一直在门口转悠。

rocketmq中自旋锁的实现,可以拿来参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class PutMessageSpinLock {
//true: 可以获取锁, false : 已经被锁住了.
private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

@Override
public void lock() {
boolean flag;
do {
flag = this.putMessageSpinLock.compareAndSet(true, false);
}
while (!flag);
}

@Override
public void unlock() {
this.putMessageSpinLock.compareAndSet(false, true);
}
}

原理就是不断的尝试compareAndSet,直到set成功。

可以看出这个实现是不可重入的。

ReentrantLock可重入锁

rocketmq直接使用jdk提供的ReentrantLock

ReentrantLock可以根据构造参数选择是公平锁还是非公平锁。非公平锁的优点在于吞吐量比公平锁大。

RocketMq使用的是非公平锁。

CopyOnWriteArrayList

RocketMq在存储MappedFile时,用到了CopyOnWriteArrayList

1
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

在多线程编程中经常会用到CopyOnWriteArrayList

CopyOnWriteArrayList要理解关键词CopyOnWrite。

CopyOnWrite的意思是写时复制。即当有修改动作时,会复制一份list数据。完成修改后,将

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

完成修改后,将指针指向新的数据

1
2
3
4
5
6
/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}

注意CopyOnWriteArrayList的数据使用了transient 和 volatile两个关键字

1
2
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;

volatile

volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变 化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。

transient

在对象序列化的过程中,标记为transient的变量不会被序列化。

对CopyOnWriteArrayList来说,volatile修饰很重要,保证了多个线程,读取的时同一份数据。

应用场景

CopyOnWriteArrayList适合于读多写少的场景。CopyOnWriteArrayList是完全不加锁的。因此效率很高。

缺点

CopyOnWriteArrayList的缺点主要是内存占用和数据一致性问题。

内存占用是在写时会复制一份数据,因此多占了一份内存。如果这个list非常大,可能引起gc问题。但是因为写时加锁的,最多也只会多存在一个复制备份。

一致性问题是,CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。在修改操作中,读操作依然读取的是旧数据。

CRC32校验

CRC的全称是循环冗余校验。

在RocketMq存储数据时,会计算数据的CRC32值。并存储。

当消费端收到数据后,会计算这个值。保证保证数据传输过程中没有损坏。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static int crc32(byte[] array) {
if (array != null) {
return crc32(array, 0, array.length);
}

return 0;
}

public static int crc32(byte[] array, int offset, int length) {
CRC32 crc32 = new CRC32();
crc32.update(array, offset, length);
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

这是rocketmq计算crc的源码,就是用了jdk提供的CRC32。

但是mq这里按位与0x7FFFFFFF,取了31位。原因还不理解。

ByteBuffer和MappedByteBuffer

都是jdk nio提供的缓冲区类。ByteBuffer和MappedByteBuffer操作的都是堆外内存

slice方法,切片。将一个大缓冲区的一部分切出来,作为一个单独的缓冲区,但是它们共用同一个内部数组。切片从原缓冲区的position位置开始,至limit为止。原缓冲区和切片各自拥有自己的属性。

在RocketMq的store流程中,将缓冲区的切片返回给commitlog的回调函数

msgID的生成

RocketMq源码分析–NameSrv

先来分析NameSrv的源码。

在整个RocketMq中NameSrv还是相对简单的。

NameSrv的功能作用

NameSrv在rocketmq的部署中,是

RocketMq消息的删除

image-20201021144644125

问题:

RocketMq的文档中说,消息的过期时间是72小时。那么是说消息72小时之后就会被删除吗?

解答:

先说结论,不是。

原因很简单,rocketmq的消息是存在于CommitLog中,而CommitLog是以文件为单位(而非消息)存在的。

而且CommitLog的设计是只允许顺序写,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂)。

所以消息被消费了,消息所占据的物理空间也不会立刻被回收。但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?——答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求。这样就保证了正常情况下,消息只会被投递一次。

什么时候清理物理消息文件?

那消息文件到底删不删,什么时候删?

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):

  1. 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
  2. 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
  3. 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

这样设计带来的好处

消息的物理文件一直存在,消费逻辑只是听客户端的决定而搜索出对应消息进行,这样做,笔者认为,有以下几个好处:

  1. 一个消息很可能需要被N个消费组(设计上很可能就是系统)消费,但消息只需要存储一份,消费进度单独记录即可。这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个消费组。
  2. 由于消费从哪里消费的决定权一直都是客户端决定,所以只要消息还在,就可以消费到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯消费。即我可以通过设置消费进度回溯,就可以让我的消费组重新像放快照一样消费历史消息;或者我需要另一个系统也复制历史的数据,只需要另起一个消费组从头消费即可(前提是消息文件还存在)。
  3. 消息索引服务。只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息,方便事后排查问题。

注:在消息清理的时候,由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作,这对于IO的压力是非常大的,这时候如果有消息写入,写入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到。

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作,相比Ext3有非常明显的提升。

集合统计模式

聚合统计

所谓的聚合统计,就是指统计多个集合元素的聚合结果,包括:统计多个集合的共有元素 (交集统计);把两个集合相比,统计其中一个集合独有的元素(差集统计);统计多个 集合的所有元素(并集统计)

场景,统计手机 App 每天的新增用户数和第二天的留存用户数。

要完成这个统计任务,我们可以用一个集合记录所有登录过 App 的用户 ID,同时,用另 一个集合记录每一天登录过 App 的用户 ID。然后,再对这两个集合做聚合统计。

具体操作

记录所有登录过 App 的用户 ID 还是比较简单的,我们可以直接使用 Set 类型,把 key 设 置为 user:id,表示记录的是用户 ID,value 就是一个 Set 集合,里面是所有登录过 App 的用户 ID,我们可以把这个 Set 叫作累计用户 Set。

把每一天登录的用户 ID,记录到一个新集合中,我们把这个集合叫作每 日用户 Set,它有两个特点: 1. key 是 user:id 以及当天日期,例如 user:id:20220114; 2. value 是 Set 集合,记录当天登录的用户 ID。

差集计算

1
SDIFFSTORE user:new user:id:20220114 user:id

把在当日登录set中的,而不在累计用户中的,记到user:new中,就是新增用户

做并集

1
SUNIONSTORE user:id user:id user:id:20220114

每天将前一天的当日登录set和累计用户 Set进行并集运算,存到累计用户set中

交集运算

求留存用户,指的是20220113登录了,20220114又登录了,就是20220114的留存用户据

1
SINTERSTORE user:id:rem user:id:20220113 user:id:20220114

风险

Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计 算,会导致 Redis 实例阻塞。

所以,我给你分享一个小建议:你可以从主从集群中选择一 个从库,让它专门负责聚合计算,或者是把数据读取到客户端,在客户端来完成聚合统 计,这样就可以规避阻塞主库实例和其他从库实例的风险了。

排序统计

List 和 Sorted Set 就属于有序集合。

List 是按照元素进入 List 的顺序进行排序的,而 Sorted Set 可以根据元素的权重来排 序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入 Sorted Set 的时间确定权重值,先插入的元素权重小,后插入的元素权重大。

List的操作

LPUSH 命令把它插入 List 的队头。

LRANGE 命令获取部分元素。通过index位置来取。

Sorted Set

ZRANGEBYSCORE 命令就可以按权重排序后返回元素

在面对需要展示最新列表、排行榜等场景时,如果数据更新频繁或者需要分页显 示,建议你优先考虑使用 Sorted Set。

二值状态统计

二值状态就是指集合元素的取 值就只有 0 和 1 两种。在签到打卡的场景中,我们只用记录签到(1)或未签到(0),所 以它就是非常典型的二值状态

在签到统计时,每个用户一天的签到用 1 个 bit 位就能表示,一个月(假设是 31 天)的签 到情况用 31 个 bit 位就可以,而一年的签到也只需要用 365 个 bit 位,根本不用太复杂 的集合类型。这个时候,我们就可以选择 Bitmap。这是 Redis 提供的扩展数据类型。

Bitmap 本身是用 String 类型作为底层数据结构实现的一种统计二值状态的数据类型。你可以把 Bitmap 看作是一个 bit 数组

Bitmap 提供了 GETBIT/SETBIT 操作

Bitmap 还提供了 BITCOUNT 操作,用来统计这个 bit 数组中所有“1”的个数。

Bitmap 支持用 BITOP 命令对多个 Bitmap 按位 做“与”“或”“异或”的操作,操作的结果会保存到一个新的 Bitmap 中。

场景:统计 1 亿个用户连续 10 天的签到情况

可以把每天的日期作为 key,每个 key 对应一个 1 亿位的 Bitmap,每一个 bit 对应一个用户当天的签到情况。

1 个 1 亿位的 Bitmap,大约占 12MB 的内存(10^8/8/1024/1024),10 天的 Bitmap 的内存开销约 为 120MB,内存压力不算太大。

对 10 个 Bitmap 做“与”操作,只有 10 天都签到的用户对应的 bit 位上的值才会是 1。最后,我们可以用 BITCOUNT 统计下 Bitmap 中的 1 的个数,这就是连续签到 10 天的用户总数了。

基数统计

基数统计就是指统计一个集合中不重复的元 素个数。对应到场景中,就是统计网页的 UV。

这个问题的问题点就是在去重,在 Redis 的集合类型中,Set 类型默认支持去重,所以看到有去重需求时,我们可能 第一时间就会想到用 Set 类型。

SADD page1:uv user1

SCARD 命令,返回一个集合中的元素个数。

也可以用hash,利用key不会重复的特性。

但是上面这两个方案,都消耗很大的内存空间

HyperLogLog 是一种用于统计基数的数据集合类型,它的最大优势就在于,当集合元素数 量非常多时,它计算基数所需的空间总是固定的,而且还很小。

在 Redis 中,每个 HyperLogLog 只需要花费 12 KB 内存,就可以计算接近 2^64 个元素 的基数。你看,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非 常节省空间。

PFADD

PFCOUNT

不过,有一点需要你注意一下,HyperLogLog 的统计规则是基于概率完成的,所以它给出 的统计结果是有一定误差的,标准误算率是 0.81%。

image-20220114143203348