hello云胜

技术与生活

0%

Docker基础命令

镜像相关

docker pull

docker pull 镜像名字 拉取远端的img,可以加版本,默认是latest

docker login

1
docker login harbor.rrswl.com

docker build

docker build 构建img

1
docker build -t xxx:1.0.0 -f 指定dockerfile文件 .

别漏了最后一个.,指定构建的上下文目录。不一定非要是当前目录。

–no-cache:构建时不要用缓存

docker images

docker images:显示本地有哪些镜像

docker rmi

docker rmi 镜像id 删除img

docker tag

1
docker tag img_id 新的镜像名和版本

docker commit

docker commit -m ‘更新说明’ dockerid 新镜像的名字:可以保存对容器内镜像的修改,保存后会生成个新的image

把容器的改变提交为一个新镜像

底层使用git

docker image prune

删除游离镜像

commit 一个新的同名镜像,以前的镜像就会变为游离镜像

docker push

推送镜像到docker官方镜像仓库

1
docker push docker.io/自己的用户名/镜像名:tag

push之前要先登录

1
docker login -u xxx -p xxxxx harbor-test.my.com

push之前需要将镜像改名,加上用户名才能符合规范

1
docker tag kubesphere/node-exporter:ks-v0.18.1 harbor-test.my.com/kubesphere/node-exporter:ks-v0.18.1

其他镜像仓库,修改docker.io为对应地址

docker export

导出一个容器的文件系统,作为一个tar包

1
docker export -o nginx.tar 容器id

docker import

把导出的tar包再导成一个镜像

这个镜像不能直接启动,需要知道这个镜像的启动命令

用docker inspect查看EntryPoint + CMD

或者docker ps –no-trunc 查看

docker save

把一个或多个镜像保存为tar文件

1
docker save -o xxx.tar 镜像名

docker load

1
docker load -i xxx.tar

加载为镜像

容器运行相关

docker create

只创建,不启动

docker start

启动一个已经创建过的容器,是后台运行

docker pause

暂停一个容器

docker unpause

恢复一个暂停的容器,start不能启动一个暂停的容器

docker stop

退出容器,要用start起来

docker kill

kill也是停掉容器。和stop的区别是,kill是强杀。stop是优雅停机。

docker run

1
docker run [OPTIONS] IMAGE [COMMAND] [ARG...]

run 是create + start

-p

指定端口绑定

-P

随机端口绑定

–link可以用来链接2个容器,使得源容器(被链接的容器)和接收容器(主动去链接的容器)之间可以互相通信,并且接收容器可以获取源容器的一些数据,如源容器的环境变量。

–link 不推荐使用

–link :alias

其中,name和id是源容器的name和id,alias是源容器在link下的别名。

把其他容器联进来使用,比如我们的java应用需要用到redis。可以用先启动一个redis容器,然后link进来。java容器可以ping通这个别名。

但是如果后来redis容器挂了,重启。ip变了之后,java容器不能再联通redis容器。因为java容器的hosts绑定了之前的

-d

-d是后台运行

docker run -p 8080:80 -d nginx 比如运行一个nginx -p的意识是将host主机的8080端口映射到容器的80端口 -d是守护进程的意思

-v 挂载

-v 主机的绝对路径:容器的绝对路径 绑定并挂载。小心空挂载

如果是 -v 不以/开头的路径:容器的绝对路径。 只绑定,不挂载。docker会自动管理,不会把他当成目录,而是当成卷

这种情况下,docker做的事情:

  1. 在主机目录/var/lib/docker/volumes创建指定名字的卷(具名卷)
  2. 把这个卷和容器内部目录绑定
  3. 容器启动以后,容器目录里的内容就在卷里面。都是同一份

建议:

  • 开发测试,使用-v 绝对路径
  • 生产用卷

–restart=always

--rm 用完自动删

docker ps

docker ps 查看当前正在运行的docker容器

docker ps -a 查看所有容器信息,包括之前运行过,已经停止的

docker ps -aq 只显示docker id

docker rm -f $(docker ps -aq) 删除所有容器

排查问题相关

docker logs

docker logs -f 容器id

每个容器的日志默认都会以 json-file 的格式存储于/var/lib/docker/containers/<容器id>/<容器id>-json.log

查看日志,也可以直接去/var/lib/docker/containers/ 具体容器挂载的目录下,直接看日志json格式(不建议这么做)

如果容器一直运行并且一直产生日志,容器日志会导致磁盘空间爆满,

全局设置限制容器日志大小su

1
2
3
4
5
6
7
# vim /etc/docker/daemon.json

{
"registry-mirrors": ["http://f613ce8f.m.daocloud.io"],
"log-driver":"json-file",
"log-opts": {"max-size":"1024m", "max-file":"3"}
}
1
2
3
4
# 重启docker守护进程
systemctl daemon-reload
# 重启docker
systemctl restart docker

注意:设置的日志大小,只对新建的容器有效。

设置完成之后,需要删除容器,并重新启动容器,

docker attach

绑定的是控制台。一般不用。可能导致容器停止。

docker exec

docker exec -it 容器 /bin/sh

-u 0:0 root用户:root组

–privileged 超级权限,特权

docker inspect

查看详情

docker [container] inspect

docker image inspect

操作容器

docker cp

可以往容器里copy,也可以从docker里复制出来

1
docker cp index.html mynginx:/usr/share/nginx/html

1
docker cp mynginx:/etc/nginx/nginx.conf nginx.conf

原文件是文件:

  • 目标文件不存在。则创建目标文件,内容为源文件的内容
  • 目标文件不存在,且以/结尾。则报错
  • 目标文件存在且是文件,则替换
  • 目标文件存在且是目录。则将源文件复制到目录内

源文件是一个目录:

  • 目标目录不存在。则创建目录,复制所有文件
  • 目标目录存在,但是一个文件。报错
  • 目标目录存在,且以/结尾,则将原文件夹内的文件复制
  • 目标目录存在,且不以/结尾,则将源文件目录整个复制

docker diff

查看容器的变动

docker rm

docker rm 容器id 删除一个容器

docker update

更新容器配置

docker volume

对docker的卷进行管理

/var/lib/docker/volumes

Docker的卷挂载

Volumes(卷) :存储在主机文件系统的一部分中,该文件系统由Docker管理(在Linux上是“ / var / lib / docker / volumes /”)。非Docker进程不应修改文件系统的这一部分。

三种挂载方式:

  1. docker自动在主机创建文件夹,把容器内的目录挂出来,-v
  2. 自己在主机创建文件夹,手动挂载,–mount。
  3. 把数据挂载到内存,基本没用
  • 匿名卷使用

    1
    2
    docker run -dP -v :/etc/nginx nginx
    #docker将创建出匿名卷,并保存容器/etc/nginx下面的内容
  • 具名卷

    1
    2
    docker run -dP -v nginx:/etc/nginx nginx
    #docker将创建出名为nginx的卷,并保存容器/etc/nginx下面的内容

注意:

如果-v 主机的绝对目录:/usr/share/nginx/html,测试如果主机的目录下没有内容,那么容器内的内同也将被覆盖为空。

但是,如果-v 相对目录:/usr/share/nginx/html,docker会进行自动管理,把他当成一个卷,将容器的内容放到卷里

分布式系统核心问题

一致性问题

一致性问题是分布式领域最重要、最基础的问题。

一致性/Consistency,是说在有多个服务节点的情况下,执行一些列操作,在约定协议的保障下,使得他们对外的处理结果,能达到一定程度的协同。

规范的说,理想的分布式系统一致性应该满足:

  • 可终止性(Termination):一致的结果在有限时间内能完成;
  • 共识性(Consensus):不同节点最终完成决策的结果应该相同;
  • 合法性(Validity):决策的结果必须是其它进程提出的提案。

第一点很容易理解

第二点看似容易,但是隐藏了一些潜在信息。算法考虑的是任意的情形,凡事一旦推广到任意情形,就往往有一些惊人的结果。例如现在就剩一张票了,中关村和西单的电影院也分别刚确认过这张票的存在,然后两个电影院同时来了一个顾客要买票,从各自“观察”看来,自己的顾客都是第一个到的……怎么能达成结果的共识呢?记住我们的唯一秘诀:核心在于需要把两件事情进行排序,而且这个顺序还得是大家都认可的

第三点看似绕口,但是其实比较容易理解,即达成的结果必须是节点执行操作的结果。仍以卖票为例,如果两个影院各自卖出去一千张,那么达成的结果就是还剩八千张,决不能认为票售光了。

做过分布式系统的读者应该能意识到,绝对理想的强一致性(Strong Consistency)代价很大。除非不发生任何故障,所有节点之间的通信无需任何时间,这个时候其实就等价于一台机器了。实际上,越强的一致性要求往往意味着越弱的性能。

一般的,强一致性(Strong Consistency)主要包括下面两类:

顺序一致性:限制了各进程内指令的偏序关系,但不在进程间按照物理时间进行全局排序

线性一致性:在顺序一致性前提下加强了进程间的操作排序,形成唯一的全局顺序

强一致的系统往往比较难实现。很多时候,人们发现实际需求并没有那么强,可以适当放宽一致性要求,降低系统实现的难度。例如在一定约束下实现所谓最终一致性(Eventual Consistency),即总会存在一个时刻(而不是立刻),系统达到一致的状态,这对于大部分的 Web 系统来说已经足够了。这一类弱化的一致性,被笼统称为弱一致性(Weak Consistency)。

共识算法

实践中,要保障系统满足不同程度的一致性,往往需要通过共识算法来达成。

共识算法解决的是分布式系统对某个提案(Proposal),大部分节点达成一致意见的过程。

理论上,如果分布式系统中各个节点都能保证以十分强大的性能(瞬间响应、高吞吐)无故障的运行,则实现共识过程并不复杂,简单通过多播过程投票即可。

很可惜的是,现实中这样“完美”的系统并不存在,如响应请求往往存在时延、网络会发生中断、节点会发生故障、甚至存在恶意节点故意要破坏系统。

一般地,把故障(不响应)的情况称为“非拜占庭错误”,恶意响应的情况称为“拜占庭错误”(对应节点为拜占庭节点)。

常见算法

对于非拜占庭错误的情况,已经存在不少经典的算法,包括 Paxos(1990 年)、Raft(2014 年)及其变种等。这类容错算法往往性能比较好,处理较快,容忍不超过一半的故障节点。

对于要能容忍拜占庭错误的情况,包括 PBFT(Practical Byzantine Fault Tolerance,1999 年)为代表的确定性系列算法、PoW(1997 年)为代表的概率算法等。确定性算法一旦达成共识就不可逆转,即共识是最终结果;而概率类算法的共识结果则是临时的,随着时间推移或某种强化,共识结果被推翻的概率越来越小,最终成为事实上结果。拜占庭类容错算法往往性能较差,容忍不超过 1/3 的故障节点。

此外,XFT(Cross Fault Tolerance,2015 年)等最近提出的改进算法可以提供类似 CFT 的处理响应速度,并能在大多数节点正常工作时提供 BFT 保障。

Algorand 算法(2017 年)基于 PBFT 进行改进,通过引入可验证随机函数解决了提案选择的问题,理论上可以在容忍拜占庭错误的前提下实现更好的性能(1000+ TPS)。

FLP不可能原理

FLP 不可能原理告诉我们,不要浪费时间,去试图为异步分布式系统设计面向任意场景的共识算法

异步:导致我们无法判断某个消息迟迟没有被响应是哪里出了问题(节点故障还是传输故障?)

学术研究,往往考虑地是数学和物理意义上理想化的情形,很多时候现实世界要稳定得多(感谢这个世界如此鲁棒!)。例如,上面例子中描述的最坏情形,每次都发生的概率其实并没有那么大。工程实现上某次共识失败,再尝试几次,很大可能就成功了。

科学告诉你什么是不可能的;工程则告诉你,付出一些代价,可以把它变成可行。

这就是科学和工程不同的魅力。FLP 不可能原理告诉大家不必浪费时间去追求完美的共识方案,而要根据实际情况设计可行的工程方案。

那么,退一步讲,在付出一些代价的情况下,共识能做到多好?

回答这一问题的是另一个很出名的原理:CAP 原理。

CAP原理

该原理被认为是分布式系统领域的重要原理之一,深刻影响了分布式计算与系统设计的发展。

CAP 原理:分布式系统无法同时确保一致性(Consistency)、可用性(Availability)和分区容忍性(Partition),设计中往往需要弱化对某个特性的需求。

一致性、可用性和分区容忍性的具体含义如下:

  • 一致性(Consistency):任何事务应该都是原子的,所有副本上的状态都是事务成功提交后的结果,并保持强一致;
  • 可用性(Availability):系统(非失败节点)能在有限时间内完成对操作请求的应答;
  • 分区容忍性(Partition):系统中的网络可能发生分区故障(成为多个子网,甚至出现节点上线和下线),即节点之间的通信无法保障。而网络故障不应该影响到系统正常服务。

CAP 原理认为,分布式系统最多只能保证三项特性中的两项特性。

比较直观地理解,当网络可能出现分区时候,系统是无法同时保证一致性和可用性的。要么,节点收到请求后因为没有得到其它节点的确认而不应答(牺牲可用性),要么节点只能应答非一致的结果(牺牲一致性)。

由于大部分时候网络被认为是可靠的,因此系统可以提供一致可靠的服务;当网络不可靠时,系统要么牺牲掉一致性(多数场景下),要么牺牲掉可用性。

注意:网络分区是可能存在的,出现分区情况后很可能会导致发生“脑裂”现象。

关键就在于网络,网络大部分情况是可靠的,但也总是不可靠的。

所以,cap可以简单理解为,因为网络总会出故障,当网络故障时,我们保一致性,还是要可用性。

要可用性:例如网站静态页面内容、实时性较弱的查询类数据库等,简单分布式同步协议如 Gossip,以及 CouchDB、Cassandra 数据库等,都为此设计。

要一致性:对结果一致性很敏感的应用,例如银行取款机,当系统故障时候会拒绝服务。MongoDB、Redis、MapReduce 等为此设计。Paxos、Raft 等共识算法,主要处理这种情况。在 Paxos 类算法中,可能存在着无法提供可用结果的情形,同时允许少数节点离线。

ACID 原则与多阶段提交

ACID,即 Atomicity(原子性)、Consistency(一致性)、Isolation(隔离性)、Durability(持久性)四种特性的缩写。

ACID 原则描述了分布式数据库需要满足的一致性需求,同时允许付出可用性的代价。

与 ACID 相对的一个原则是 eBay 技术专家 Dan Pritchett 提出的 BASE(Basic Availability,Soft-state,Eventual Consistency)原则。BASE 原则面向大型高可用分布式系统,主张牺牲掉对强一致性的追求,而实现最终一致性,来换取一定的可用性。

两阶段提交

对于分布式事务一致性的研究成果包括著名的两阶段提交算法(Two-phase Commit,2PC)和三阶段提交算法(Three-phase Commit,3PC)。

两阶段提交算法,其基本思想十分简单,既然在分布式场景下,直接提交事务可能出现各种故障和冲突,那么可将其分解为预提交和正式提交两个阶段,规避冲突的风险。

  • 预提交:协调者(Coordinator)发起提交某个事务的申请,各参与执行者(Participant)需要尝试进行提交并反馈是否能完成;
  • 正式提交:协调者如果得到所有执行者的成功答复,则发出正式提交请求。如果成功完成,则算法执行成功。

在此过程中任何步骤出现问题(例如预提交阶段有执行者回复预计无法完成提交),则需要回退。

两阶段提交算法因为其简单容易实现的优点,在关系型数据库等系统中被广泛应用。当然,其缺点也很明显。整个过程需要同步阻塞导致性能一般较差;同时存在单点问题,较坏情况下可能一直无法完成提交;另外可能产生数据不一致的情况(例如协调者和执行者在第二个阶段出现故障)。

三阶段提交

三阶段提交针对两阶段提交算法第一阶段中可能阻塞部分执行者的情况进行了优化。具体来说,将预提交阶段进一步拆成两个步骤:尝试预提交和预提交。

完整过程如下:

  • 尝试预提交:协调者询问执行者是否能进行某个事务的提交。执行者需要返回答复,但无需执行提交。这就避免出现部分执行者被无效阻塞住的情况。
  • 预提交:协调者检查收集到的答复,如果全部为真,则发起提交事务请求。各参与执行者(Participant)需要尝试进行提交并反馈是否能完成;
  • 正式提交:协调者如果得到所有执行者的成功答复,则发出正式提交请求。如果成功完成,则算法执行成功。

其实,无论两阶段还是三阶段提交,都只是一定程度上缓解了提交冲突的问题,并无法一定保证系统的一致性。首个有效的算法是后来提出的 Paxos 算法。

Paxos 算法

Paxos 问题是指分布式的系统中存在故障(crash fault),但不存在恶意(corrupt)节点的场景(即可能消息丢失或重复,但无错误消息)下的共识达成问题。这也是分布式共识领域最为常见的问题。

Paxos 是首个得到证明并被广泛应用的共识算法,其原理类似 两阶段提交 算法,进行了泛化和扩展,通过消息传递来逐步消除系统中的不确定状态。zookeeper中有使用。

作为后来很多共识算法(如 Raft、ZAB 等)的基础,Paxos 算法基本思想并不复杂。

基本原理

算法中存在三种逻辑角色的节点,在实现中同一节点可以担任多个角色。

  • 提案者(Proposer):提出一个提案,等待大家批准(Chosen)为结案(Value)。系统中提案都拥有一个自增的唯一提案号。往往由客户端担任该角色。(只有是被提案者提出的提案才可能被最终批准)
  • 接受者(Acceptor):负责对提案进行投票,接受(Accept)提案。往往由服务端担任该角色。
  • 学习者(Learner):获取批准结果,并帮忙传播,不参与投票过程。可为客户端或服务端。

基本思路类似两阶段提交:

多个提案者先要争取到提案的权利(得到大多数接受者的支持);

成功的提案者发送提案给所有人进行确认,得到大部分人确认的提案成为批准的结案。

Raft 算法

Paxos 算法虽然给出了共识设计,但并没有讨论太多实现细节,也并不重视工程上的优化,因此后来在学术界和工程界出现了一些改进工作,包括 Fast Paxos、Multi-Paxos,Zookeeper Atomic Broadcast(ZAB)和 Raft 等。这些算法重点在于改进执行效率和可实现性。

Raft 算法的主要设计思想与 ZAB 类似,通过先选出领导节点来简化流程和提高效率。实现上分解了领导者选举、日志复制和安全方面的考虑,并通过约束减少了不确定性的状态空间。

算法包括三种角色:领导者(Leader)、候选者(Candidate) 和跟随者(Follower),每个任期内选举一个全局的领导者。领导者角色十分关键,决定日志(log)的提交。每个日志都会路由到领导者,并且只能由领导者向跟随者单向复制。

典型的过程包括两个主要阶段:

  • 领导者选举:开始所有节点都是跟随者,在随机超时发生后未收到来自领导者或候选者消息,则转变角色为候选者(中间状态),提出选举请求。最近选举阶段(Term)中得票超过一半者被选为领导者;如果未选出,随机超时后进入新的阶段重试。领导者负责从客户端接收请求,并分发到其他节点;
  • 同步日志:领导者会决定系统中最新的日志记录,并强制所有的跟随者来刷新到这个记录,数据的同步是单向的,确保所有节点看到的视图一致。

此外,领导者会定期向所有跟随者发送心跳消息,跟随者如果发现心跳消息超时未收到,则可以认为领导者已经下线,尝试发起新的选举过程。

拜占庭问题

拜占庭是古代东罗马帝国的首都,由于地域宽广,假设其守卫边境的多个将军(系统中的多个节点)需要通过信使来传递消息,达成某些一致决定。但由于将军中可能存在叛徒(系统中节点出错),这些叛徒将向不同的将军发送不同的消息,试图干扰共识的达成。

拜占庭问题即讨论在此情况下,如何让忠诚的将军们能达成行动的一致。

在大多数的分布式系统中,拜占庭的场景并不多见。然而在特定场景下存在意义,例如允许匿名参与的系统(如比特币),或是出现欺诈可能造成巨大损失的情况。

根据 FLP 不可能原理,这个问题无通用解。

拜占庭问题之所以难解,在于任何时候系统中都可能存在多个提案(因为提案成本很低),并且在大规模场景下要完成最终确认的过程容易受干扰,难以达成共识。

比特币网络在设计时使用了 PoW(Proof of Work)的概率型算法思路,从如下两个角度解决大规模场景下的拜占庭容错问题。

首先,限制一段时间内整个网络中出现提案的个数(通过工作量证明来增加提案成本);其次是丢掉最终确认的约数,约定好始终沿着已知最长的链进行拓展。共识的最终确认是概率意义上的存在。这样,即便有人试图恶意破坏,也会付出相应的经济代价(超过整体系统一半的工作量)。

后来的各种 PoX 系列算法,也都是沿着这个思路进行改进,采用经济博弈来制约攻击者。

使用ShardingSphere-JDBC进行分库分表实践

ZooKeeper

设计目的

zk的设计是为了解决分布式服务领域的问题。

  • 最终一致性:client不论连接到哪个Server,展示给它的都是同一个视图。
  • 可靠性:具有简单、健壮、良好的性能、如果消息被到一台服务器接收,那么消息将被所有服务器接收。
  • 实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息
  • 原子性:更新只能成功或者失败,没有中间状态。
  • 顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息a在消息b前被同一个发送者发布,a必将排在b前面。

应用场景

  • 配置管理
  • DNS服务
  • 组成员管理
  • 各种分布式锁

不适合的场景:

  • 大数据量存储

ZooKeeper架构

img

  • 每个server的数据都是一样的,Client的读请求可以请求任意一个Server。
  • ZooKeeper启动时,将从实例中选举一个leader(Paxos协议)。
  • Leader负责处理数据更新等操作(ZAB协议)。(ZooKeeper Atomic Broadcast protocol)
  • 一个更新操作成功,当且仅当大多数Server在内存中成功修改 。

在zookeeper的集群中,各个节点共有下面3种角色和4种状态:
角色:leader、follower、observer
状态:

  • LOOKING:当前Server不知道leader是谁,正在搜寻。
  • LEADING:当前Server即为选举出来的leader。
  • FOLLOWING:leader已经选举出来,当前Server与之同步。
  • OBSERVING:observer的行为在大多数情况下与follower完全一致,但是他们不参加选举和投票,而仅仅接受(observing)选举和投票的结果。

session

session是zk中非常重要的一个概念。zk客户端和zk集群的某个节点直接建立一个session。客户端可以主动关闭session。如果在一定的timeout时间内,客户端没有想zk集群发送消息,zk集群可以主动断开连接。

zk客户端发现可一个zk集群节点连接失败后,会自动同其他节点建立连接。

数据一致性

zk集群中,只有leader节点可以处理写请求。follower节点接收到写请求,会转发给leader处理。

先到达leader的写请求先被处理

zk处理写请求时序图。

节点2是leader

zk处理写请求时序图。

Observer

zk集群中,除了leader和follew之外,还有一种observer角色。

Observer不参加ZooKeeper的事务提交和选举。只是被动的接收leader的通知。所以通过observer节点来提高整个集群的性能。

9140032-9eadb32721740010

数据存储

zk

zxid:每一个对zk datatree的修改都会作为一个事务执行。每个事务都有一个id,就是zxid。zxid是递增的。

zxid是一个8Byte的整数,即java的long型。8个字节分两部分。高四个字节保存的是epoch。低4个字节保存的是递增计数。

epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。

epoch文件只有在集群模式下才会产生。accepedEpoch和currentEpoch

epoch标识了当前Leader周期,集群机器相互通信时,会带上这个epoch以确保彼此在同一个Leader周期中

事务日志

zk将所有的变更存到日志文件里。zk在DataDir下创建version-2目录,下面会存放log文件和snapshot文件。

zk提供的一个查看log的工具:zkTxnLogToolkit.sh

数据快照

数据快照是Zookeeper数据存储中非常核心的运行机制,数据快照用来记录Zookeeper服务器上某一时刻的全量内存数据内容,并将其写入指定的磁盘文件中。

zk生成快照文件的时机:

  1. 重启

  2. 使用snapCount参数来配置每次数据快照之间的事务操作次数,即ZooKeeper会在snapCount次事务日志记录后进行一个数据快照。

    但实际上,数据快照对于ZooKeeper所在机器的整体性能的影响,需要尽量避免ZooKeeper集群中的所有机器在同一时刻进行数据快照。因此ZooKeeper在具体的实现中,并不是严格的按照这个策略执行的,而是采取“过半随机”策略。

一个单独的异步线程来进行数据快照。

zk查看你快照文件,可以通过一个zk的类:org.apache.zookeeper.server.SnapshotFormatter

数据模型

zk的数据模型是树结构的层次模型。层次模型和KV模型是两种主流的数据模型。层次模型常见于文件系统。Zk选择使用文件系统模型。

  • 文件系统的树形结构,方便表达数据之间的层次关系
  • 文件系统的树形结构可以位不通应用分配独立的命名空间

zk的树形结构成为data tree。data tree的每个节点称为znode。和文件系统不一样,每个znode都是可以存储数据的。每个znode都有一个版本version,version从0开始。znode中的数据可以有多个版本,比如某一个znode下存有多个数据版本,那么查询这个路径下的数据需带上版本信息。

img

zk是一个内存数据库。在内存里保存了整棵树的内容,Zookeeper会定时将这个数据存储到磁盘上。

zk是是使用CurrentHashMap保存这颗树,map的key就是路径

ZNode的类型

znode可以分为持久性(persistent)的和临时性(ephemeral)的。

  • 持久性的。这种znode在创建之后,即使zk集群宕机或者client宕机都不会丢失
  • 临时性的。这种znode在client宕机或者client在指定的timeout时间内没有想zk发送消息,这种node就会消失。

znode还可以设置位顺序性和非顺序性的。顺序性的znode会关联一个唯一的单调递增整数。这个整数会作为znode名字的后缀。

以上两两组合就可以出来4中znode

Zookeeper原生API

watch机制

watch是zk中非常好用的机制。客户端在读取一个数据时,可以同时给这个数据设置一个watch。当这个数据有变化时,客户端就会收到一个事件通知。这样就避免了客户端不断轮询来查询最新的数据。

zk的watch采用了一种推拉结合的模式。一旦服务端感知数据变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的”推”部分,然后收到变更通知的客户端需要自己去拉变更的数据,这就是”拉”部分。

注意,watch是一次性的。

条件更新

对数据的更新操作,可以设置version,就行有条件更新。即只有客户端请求的version和实际数据的version相同时,才进行更新操作。否则不能操作。

如果客户端的请求中将version设置为-1,表示无条件更新。

multi操作

zk提供了一种multi api。可以把多个对znode的操作作为一个事务进行提交,要么全部成功,要么全部失败。

实现分布式队列

使用持久有序节点

实现分布式锁

使用临时有序节点

注意解决羊群效应:watch前一个锁请求,而不是watch锁本身

这样是一个公平锁

性能比较低,代码里取出所有节点,排序,查看前面是否有锁请求,有则watch,否则获取锁。这整个过程是加锁进行的,所以效率比较慢。

实现分布式选举

使用临时顺序znode

设计上和分布式锁很像。

Apache Curator

简化zookeeper代码的开发。

有很多写好的分布式服务可以使用

服务发现

curator有一个扩展curator-x-discovery,基于zk实现了服务发现。

基本设计:一个服务注册的根目录,服务目录,服务实例(临时节点)

container 节点

container节点是zk的一种特殊节点。引入他的目的是为了下挂子节点。当子节点都被删除时,这个container节点会被zk自动删除。

服务注册的根目录和服务目录都是container类型节点。

Zookeeper的运维

最重要的三个配置项

  1. clientPort
  2. dataDir。zk保存的是数据的一个个快照文件
  3. dataLogDir。保存事务日志文件的目录。zk在提交一个事务之前,必须保证事务日志的落盘。

硬件要求,zk应分配一个独占的服务器

  1. 内存,zk需要在内存中保存data tree。一般data tree也不会特别大。8G以上。
  2. CPU。zk不是计算密集型,2核以上
  3. 存储。存储设备的写延迟严重影响事务提交的效率。所以建议给dataLogDir分配一个独占的SSD盘

ZooKeeper常用命令

Zookeeper服务端命令

启动ZK服务: sh bin/zkServer.sh start
查看ZK服务状态: sh bin/zkServer.sh status
停止ZK服务: sh bin/zkServer.sh stop
重启ZK服务: sh bin/zkServer.sh restart

Zookeeper客户端命令

客户端登录Zookeeper: sh bin/zkCli.sh -server 127.0.0.1:2181

数据清理

手动清理

可以使用zk提供的bin/zkCleanup.sh脚本进行快照文件的清理

1
bin/zkCleanup.sh  -n 5

表示保留最近的5个快照

自动清理

在conf文件中可以配置

1
2
autopurge.snapRetainCount=10
autopurge.purgeInterval=1

autopurge.purgeInterval: 这个参数指定了清理频率,单位是小时,需要填写一个1或更大的整数,默认是0,表示不开启自动清理功能。

autopurge.snapRetainCount: 这个参数和上面的参数搭配使用,这个参数指定了需要保留的快照文件数目,默认是保留3个。

zk的监控

4字监控命令。

通过telnet或者ncat向zk发出命令。

1
echo ruok | ncat *.*.*.* 2181

jmx

zk很好的支持了jmx,大量的监控和管理工作可以通过jmx来做。可以把zk的JMX数据集成到prometheus,使用prometheus来做zk的监控管理。

1
jconsole 连接jmx

默认jmx只能本地连接。要配置远程可访问。需要先开放JMX端口

1
export JMXPORT=8081

然后再启动zk

跨数据中心部署

利用Observer节点

img

比如业务需要部署北京和香港两地都使用的 ZooKeeper服务。要求北京和香港的客户端请求的延迟都低。因此,需要再北京和香港都部署zk节点。
假设leader节点在北京。如果香港的节点也是follower,那么每个来自香港的写请求要需要在北京的leader和每个香港的follower节点之间进行propose、ack、和commit跨广域网的消息确认。
解决方案就是把香港的节点都设成observer。这些propose、ack和commit消息都变成同步一个leader的inform消息。

指定节点为observer

只需要在conf文件中,节点后面加上

1
server.3=x.x.x.x:2222:2223:observer

集群节点调整

手动调整

可以采取更改配置文件的方式调整。

  1. 停掉集群
  2. 修改conf文件的server.n
  3. 启动节点

缺点:

1. 服务中断
2. 可能导致已经提交的数据被覆盖

动态配置

3.5.0的新特性

在配置文件中开启动态配置

配置digest

使用命令进行节点的动态修改

Chubby vs Zookeeper

Chubby是一个分布式锁系统,非开源,广泛应用在Google的基础架构中,比如GFS和Bigtable中都是用chubby做协同服务

zookeeper借鉴了很多chubby的设计思想。所以他们之间有很多相似之处。

分布式任务调度框架XXL-Job

xxljob以quartz为基础

Service Mesh概念入门

Service Mesh译为服务网格,是微服务的升级进化。那么为什么会出现Service Mesh?就应该先分析当前以SpringCloud为代表的微服务框架有什么问题?个人认为有以下三点:

  • 框架服务。比如使用SpringCloud,需要对SC多个组件有充分的了解。同时需要有能力排查SC组件中的问题。
  • 微服务框架通常仅支持一种或特定的集中框架。然鹅,这和当初微服务定义的一个重要特性:语言无关性。显然是背道而驰的。
  • 在代码层面。微服务框架和业务代码强耦合在一起。框架的版本设计,依赖库的升级替换等,会对业务代码产生直接的影响。

所以出现了以Linkerd,Envoy,Ngixmesh为代表的代理模式(边车模式)应运而生。

啥是边车?就这个

img

很形象,对应到软件设计中,就是边车设计模式。个人认为和代理模式一样。

![img](Service Mesh概念入门.assets/6-a.png)

所谓边车模式,意为在业务代码之外,单独部署一个实现了负载均衡、服务发现、认证授权、监控追踪、流量控制等分布式系统所需要的功能的模块,作为一个和服务对等的代理服务,和业务服务部署在一起,接管服务的流量。由这个边车来控制微服务之间的发现调用和监控管理的功能。

站在一个更高的视角,当服务非常多时,便是下面这样的一个服务网格。绿色节点表示业务服务,蓝色为对应的边车。

![img](Service Mesh概念入门.assets/mesh1.png)

这就是第一代Service Mesh。

这个网格看起来错综复杂,运维困难。所以继续优化,出现了以以Istio为代表的第二代Service Mesh。

![img](Service Mesh概念入门.assets/6-b.png)

每个边车服务都会将自己的信息注册到控制台。控制台可以全局监控服务,对某个服务下发控制命令。

鸟瞰图

![img](Service Mesh概念入门.assets/mesh3.png)

服务依然在网格间不断穿梭,我们从统一的控制台掌控全局。

再来一张功能架构图

img

Service Mesh对我打动最深的是两点,

1,语言无关性。我们可以根据业务和开放人员能力特点选择合适的开发语言。并且各个小组之间可以根据情况自行选择。

2,框架代码和业务代码的分离。因为大量的微服务底层的功能由边车代理模块提供。所以业务代码可以更简单,甚至于从复杂的SpringCloud中退回到SpringBoot。再加上服务容器化。将来可能业务开发人员和ServiceMesh开发运维人员会更加的泾渭分明。

个人看法,以Spring Cloud为代表的微服务是实现服务治理平台的现状,而Service Mesh却是未来。

参考链接

https://philcalcado.com/2017/08/03/pattern_service_mesh.html

https://www.nginx.com/blog/what-is-a-service-mesh/

消费者启动后究竟从哪里开始消费消息

问题:

一个新的消费组订阅一个已存在的topic,是从哪一条消息开始消费呢?

(注意是新的消费组,消费组重启这不是新的消费组)

代码验证

我们知道在消费时,可以设置一个字段ConsumeFromWhere,从哪开始消费。可选参数,去掉Deprecated的,剩下的就是

1
2
3
4
5
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET,
CONSUME_FROM_FIRST_OFFSET,
CONSUME_FROM_TIMESTAMP,
}

CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费,

CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费,

CONSUME_FROM_TIMESTAMP:从某个时间开始消费。

貌似看起来,CONSUME_FROM_LAST_OFFSET就是说会从消费者启动后,开始消费发送给broker的消息。

真的是这样吗?写个简单代码验证下

先发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("TopicTest",
("NewMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}

模拟的场景是,topic一直存在。我们现在是有新的业务接入。

启动消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class PushConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FromWhereConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("ReceiveMessages: %s%n", new String(msgs.get(0).getBody()));

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

发现直接是从第一条消息开始的。

image-20201020161159655

这就很尴尬了,明明设置了CONSUME_FROM_LAST_OFFSET,为什么还是从第一条开始消费呢?

原因

先说原因,原因其实也简单。其实在ConsumeFromWhere的源码注释中已经写了。

image-20201020161759144

CONSUME_FROM_LAST_OFFSET是从该消费者上次消费到的位置开始消费。

但是,如果是一个新的消费者。就要根据这个client所属的消费组的情况来判断。

如果所属的消费者组是新上线的,订阅的消息,最早的消息都没有过期。rocketmq的设计者认为,你这是一个新上线的业务,会强制从第一条消息开始消费。

相反,如果订阅的消息,已经产生了过期消息。那么才会从我们这个client启动的时间点开始消费。

所以说,ConsumeFromWhere这个参数只对一个新的消费者第一次启动时有效。

就是说,如果是一个消费者重启,他只会从自己上次消费到的offset,继续消费。这个参数是没用的。

这里最关键的是什么叫一个新的消费者?

如何判断新的消费者

先说一下,对于cluster消费模式,

消费者消费到哪的记录是在broker上的。是在数据目录下的config/consumerFilter.json文件

1
2
3
4
5
6
7
8
{
"offsetTable":{
"%RETRY%FromWhereConsumerGroup@FromWhereConsumerGroup":{0:0
},
"TopicTest@FromWhereConsumerGroup":{0:32,1:32,2:32,3:32
}
}
}

可以看出,broker记录消费位点是 topic名字+消费者组+队列

如上,现在是队列1消费到32等等

那么,当我们启动一个新的消费者时,broker首先回到这个文件中查询。如果找到,broker把这个消费位点返回给client,作为client第一次拉取消息的参数。

如果没找到,那么就是上面的逻辑了。

所以,最后强调一下。如果你的消费者组以前监听过某个topic,setConsumeFromWhere这个参数不管你设置什么都是不起效的。只要broker找的到消费位点,就是按照broker的来。就是这么霸道。

源码分析

org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

// 用topic+consumergroup+queueId去查
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());

// 如果查到了,且大于-1。就是在broker这边找到了。返回,按照broker的来
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
// 没找到,看看现在消息的最小offset
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
// 如果topic的消息最小偏移量还是0,并且最早的这条消息也存于内存中。那么强制从最早的消息开始消费
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
// 这个就是说最早的消息已经过期了。才能按照客户端设置的ConsumeFromWhere策略来消费
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}

return response;
}

checkInDiskByConsumeOffset这个方法的作用。查看这个消息是否不在内存中了。

1
2
3
4
5
6
7
8
9
/**
* Check if the given message has been swapped out of the memory.
*
* @param topic topic.
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
*/
boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);

消息中间件运维总结

分领域部署

不应该搭建太大的集群。因为一旦集群出故障,影响范围会很大

核心业务要按业务域进行规划

集群扩容

新扩容进原集群的broker是不会有流量进来的,因为没有创建任何Topic

为了快速让新broker上拥有集群内其他节点中的topic,我们通常可以拷贝集群内其他节点的主题定义文件,具体要复制的文件路径为:{ROCKETMQ_HOME}/store/config/topics.json 文件

如果 Broker 关闭了自动创建消费组(autoCreateSubscriptionGroup=false)(生产一般都关掉了),还需要拷贝 subscriptionGroup.json 文件。

再次重启新加入的broker,就可以承担读写流量,实现负载均衡了

集群缩容

将节点从集群中移除的基本原则是,存储在这些节点上的消息必须完成消费,否则会造成消息消费丢失。

关闭节点的写权限。避免新的数据再写入该节点,然后等消息过期再下线。

具体命令如下:

1
sh ./mqadmin updateBrokerConfig -b 127.0.0.1:10911 -n 127.0.0.1:9876 -k brokerPermission -v 4

为了保守起见,通常要等待消息过期后,再关闭 Broker。如果消息的存储时间为 72 小时,那要在关闭写权限 3 天之后才可以下线该节点。在此期间,该节点还是可以提供读取服务,也就是说,存在这个节点的消息仍然可以被消费端消费。

topic扩容

topic扩容就是扩队列数

1
sh ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t dw_test_01 -r 8 -w 8

-w 、-r 分别指定扩容后的队列数。其中 -w 表示写队列个数,-r 表示读队列个数,在进行Topic扩容时,它们必须一致。

topic缩容

分两步,先缩写队列

1
sh ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t dw_test_01 -r 8 -w 4

等消息达到过期时间后,再将读队列数量变更为缩容后的队列

1
sh ./mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t dw_test_01 -r 4 -w 4

重置位点

1
sh ./mqadmin resetOffsetByTime -g dw_test_mq_consuemr_test_01 -n 127.0.0.1:9876 -t dw_zms_test_topic -s '2022-07-10#10:00:00:000'

-s 参数,它表示回溯时间。其中:

now 或者 currentTimeMillis 表示当前时间;

yyyy-MM-dd#HH:mm:ss:SSS 表示具体的时间戳。在执行命令时,需要严格按照格式,否则会抛出空指针异常

nameserver扩容

更新两台 Broker 的配置文件,让 Broker 能够感知新 NameServer 的存在,具体的配置项:

1
namesrvAddr=192.168.3.100:9876;192.168.3.101:9876;192.168.3.107:9876

紧接着,依次重启 Broker。

一定要注意的是,**集群内的 Broker 没有全部重启时,新加入集群的 NameServer 地址是不能让消息发送 / 消息消费客户端使用的。**因为这时候新的 NameServer 上的路由信息会和集群内其他 NamServer 存储的信息不一致。

NameServer 的下线就比较简单了。直接先 kill 掉 NameServer 进程,这时,无论是 Broker、还是消息发送、消息消费客户端都会抛出错误,但这个错误不影响使用。

然后依次更新 Broker 配置文件中的 namesrvAddr,移除已下线的 NameServer 地址并依次重启。

在生产实践中,NameServer 的扩容还是比较少见的,更多的是更换机器。举个例子,192.168.3.100 这台机器由于内存、磁盘等故障,需要被下线。但为了保证 NameServer 节点数量不受影响,我们通常还会在一台新机器上部署一台新的 NameServer。同时,为了避免客户端或 Broker 需要更新 NameServer 列表,更换机器时还要 IP 保持不变。

运维命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
sh ./mqadmin
# 该命令的输出结果如下:
The most commonly used mqadmin commands are:
updateTopic Update or create topic
deleteTopic Delete topic from broker and NameServer.
updateSubGroup Update or create subscription group
deleteSubGroup Delete subscription group from broker.
updateBrokerConfig Update broker's config
updateTopicPerm Update topic perm
topicRoute Examine topic route info
topicStatus Examine topic Status info
topicClusterList get cluster info for topic
brokerStatus Fetch broker runtime status data
queryMsgById Query Message by Id
queryMsgByKey Query Message by Key
queryMsgByUniqueKey Query Message by Unique key
queryMsgByOffset Query Message by offset
QueryMsgTraceById query a message trace
printMsg Print Message Detail
printMsgByQueue Print Message Detail
sendMsgStatus send msg to broker.
brokerConsumeStats Fetch broker consume stats data
producerConnection Query producer's socket connection and client version
consumerConnection Query consumer's socket connection, client version and subscription
producerConnectionAll Query all producer's socket connection and client version
consumerProgress Query consumers's progress, speed
consumerStatus Query consumer's internal data structure
cloneGroupOffset clone offset from other group.
clusterList List all of clusters
topicList Fetch all topic list from name server
updateKvConfig Create or update KV config.
deleteKvConfig Delete KV config.
wipeWritePerm Wipe write perm of broker in all name server
resetOffsetByTime Reset consumer offset by timestamp(without client restart).
updateOrderConf Create or update or delete order conf
cleanExpiredCQ Clean expired ConsumeQueue on broker.
cleanUnusedTopic Clean unused topic on broker.
startMonitoring Start Monitoring
statsAll Topic and Consumer tps stats
allocateMQ Allocate MQ
checkMsgSendRT check message send response time
clusterRT List All clusters Message Send RT
getNamesrvConfig Get configs of name server.
updateNamesrvConfig Update configs of name server.
getBrokerConfig Get broker config by cluster or special broker!
queryCq Query cq command.
sendMessage Send a message
consumeMessage Consume message
updateAclConfig Update acl config yaml file in broker
deleteAccessConfig Delete Acl Config Account in broker
clusterAclConfigVersion List all of acl config version information in cluster
updateGlobalWhiteAddr Update global white address for acl Config File in broker
getAccessConfigSubCommand List all of acl config information in cluster

查看每一个命令的具体使用方法,可以使用如下命令:

1
sh ./mqadmin updateTopic -h

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。

如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。

但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

img

这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才会一下子更新到2200。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,3.2.6之前的RocketMQ无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan

1
2
3
4
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;

这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。

但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复。

Ack卡进度解决方案

对于这个卡消费进度的问题,最显而易见的解法是设定一个超时时间,达到超时时间的那个消费当作消费失败处理。

后来RocketMQ显然也发现了这个问题,而RocketMQ在3.5.8之后也就是采用这样的方案去解决这个问题。

  1. 在pushConsumer中 有一个consumeTimeout字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。
  2. 消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。

核心源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//ConsumeMessageConcurrentlyService.java
public void start() {
this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
cleanExpireMsg();
}

}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}

//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}

int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {

break;
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}

try {

pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
msgTreeMap.remove(msgTreeMap.firstKey());
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
}

通过源码看这个方案,其实可以看出有几个不太完善的问题:

  1. 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
  2. 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。