hello云胜

技术与生活

0%

Redis5集群配置参数问题

今天有人让我帮忙看,他的redis集群切换要15秒多。

之前我用的是redis4,切换基本是秒级的,业务都是无感的。这个15秒似乎有点慢。

看了下redis5的配置参数

Redis集群的参数配置

在redis.conf中的一些参数说明:

cluster-enabled <yes/no>:

如果配置”yes”则开启集群功能,此redis实例作为集群的一个节点,否则,它是一个普通的单一的redis实例。

cluster-config-file :

注意:虽然此配置的名字叫“集群配置文件”,但是此配置文件不能人工编辑,它是集群节点自动维护的文件,主要用于记录集群中有哪些节点、他们的状态以及一些持久化参数等,方便在重启时恢复这些状态。通常是在收到请求之后这个文件就会被更新。

cluster-node-timeout :

这是集群中的节点能够失联的最大时间,超过这个时间,该节点就会被认为故障。

如果主节点超过这个时间还是不可达,则用它的从节点将启动故障迁移,升级成主节点。

注意,任何一个节点在这个时间之内如果还是没有连上大部分的主节点,则此节点将停止接收任何请求。

默认时15秒

cluster-replica-validity-factor :

从节点数据有效因子。

这个参数是控制什么样的从节点可以进行faiover(故障转移),变成master节点。

假设有多个从节点,我们当然希望选一个数据最新的节点进行failover。如果某个从节点的数据太旧,那就不要让他failover了。

这个参数就是控制这个的,默认是10。

比如前面的cluster-node-timeout用默认值15秒,那么如果某个从节点的交互时间>15*10 + ping心跳时间(10秒)

那么这个从节点就不会被选为failover的节点。

所以可能出现由于某主节点失联却没有从节点能顶上的情况,从而导致集群不能正常工作,在这种情况下,只有等到原来的主节点重新回归到集群,集群才恢复运作。

如果设置成0,则无论从节点与主节点失联多久,从节点都会尝试升级成主节点。

默认是10

总之,这个从节点数据有效因子如果太大,可能导致比较旧的从节点failover成主节点。如果太小,又可能导致没有从节点能failover成主节点。

看redis.conf文件中说

For maximum availability, it is possible to set the replica-validity-factor
to a value of 0, which means, that replicas will always try to failover the
master regardless of the last time they interacted with the master.
(However they’ll always try to apply a delay proportional to their
offset rank).

Zero is the only value able to guarantee that when all the partitions heal
the cluster will always be able to continue.

为了保持最高的高可用性,可以将replica-validity-factor设置为0.这样备节点总会尝试进行failover

所以,配置为

1
2
cluster-node-timeout 5000
cluster-replica-validity-factor 0

cachecloud源码参考

ssh登录

关键技术点之一就是搞定代码通过ssh同服务器交互。

前置条件

  1. redis机器放开cachecloud代码所在服务器到redis部署机器的ssh登录。默认开放端口22

  2. 所有redis机器配置统一的用户名和密码。并且建议密码永不过期。

    数据库表system_config

SSHUtil

统一的工具类。

com.sohu.cache.ssh.SSHUtil#execute(java.lang.String, int, java.lang.String, java.lang.String, java.lang.String)

通过SSHUtil执行命令

com.sohu.cache.ssh.SSHTemplate#execute(java.lang.String, int, java.lang.String, java.lang.String, com.sohu.cache.ssh.SSHTemplate.SSHCallback)

先建立连接

com.sohu.cache.ssh.SSHTemplate#getConnection

这就到了最后的依赖

1
2
3
4
5
<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>build210</version>
</dependency>

执行cmd

com.sohu.cache.ssh.SSHTemplate.SSHSession#executeCommand(ch.ethz.ssh2.Session, java.lang.String, int, com.sohu.cache.ssh.SSHTemplate.LineProcessor)

redis部署

入口 com.sohu.cache.web.controller.AppManageController#doAddAppDeploy

传到service层com.sohu.cache.stats.app.impl.AppDeployCenterImpl#allocateResourceApp

重点看部署cluster模式com.sohu.cache.stats.app.impl.AppDeployCenterImpl#deployCluster

com.sohu.cache.redis.impl.RedisDeployCenterImpl#deployClusterInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 循环填写的部署信息。每一行是主节点:最大内存:备节点
for (RedisClusterNode node : clusterNodes) {
...
// 前面的是主节点
boolean isMasterRun = runInstance(appDesc, masterHost, masterPort, maxMemory, true);
...
// 再启动备节点
boolean isSlaveRun = runInstance(appDesc, slaveHost, slavePort, maxMemory, true);

}

// 前面的步骤只是启动了多个单独的redis实例,并没有配置为集群
// 配置为集群
isCluster = startCluster(appId, clusterMap);
// 收尾工作。保存信息到instance_info表
// 部署定时任务收集redis运行数据

1,获取可用端口

com.sohu.cache.machine.impl.MachineCenterImpl#getAvailablePort

MachineCenter是所有和服务器进行交互的接口

先通过shell到服务器查询服务器已用的最大port。

执行的命令是

1
ps -ef | grep redis | grep -v 'grep'

经过一些列的解析代码,拿到当前服务器上redis的最大端口号。

(从这里我们也可以看到,redis的机器上最好只部署redis,不要搞上其他的服务)

再去数据库表,验证这个port确实从来没有用过。

2,启动实例

com.sohu.cache.redis.impl.RedisDeployCenterImpl#runInstance

组装redis的config配置

com.sohu.cache.redis.impl.RedisDeployCenterImpl#handleCommonConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 获取redis 基础配置
*
* @param port
* @param maxMemory
* @return
*/
public List<String> handleCommonConfig(int port, int maxMemory) {
List<String> configs = null;
try {
configs = redisConfigTemplateService.handleCommonConfig(port, maxMemory);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (CollectionUtils.isEmpty(configs)) {
configs = redisConfigTemplateService.handleCommonDefaultConfig(port, maxMemory);
}
return configs;
}

组装的是一个String列表

默认的配置项是保存在数据库表instance_config里

image-20220105153533957

最后组合起来的list,写到服务器上的conf文件

image-20220105160904507

创建配置文件

com.sohu.cache.machine.impl.MachineCenterImpl#createRemoteFile

先将config配置写到本地的/tmp/cachecloud/redis-cluster-xxxx.conf

然后通过scp将本地的配置文件推到redis服务器上

1
SSHUtil.scpFileToRemote(host, localAbsolutePath, MachineProtocol.CONF_DIR);

组装的redis的启动命令

1
redis-server %s > " + MachineProtocol.LOG_DIR + "redis-%d-%s.log 2>&1 &

启动实例

1
boolean isMasterShell = machineCenter.startProcessAtPort(host, port, runShell);

3.配置集群

这是重点

代码就是这个com.sohu.cache.redis.impl.RedisDeployCenterImpl#startCluster方法

1
2
3
4
5
6
7
8
9
10
11
12
// 先随便选一个redis实例,后面的cluster meet命令只发给一个redis就行了。只要一个redis有了全部redis的连接信息,通过gossip协议自动会组建成一个cluster
final Jedis jedis = new ArrayList<Jedis>(clusterMap.keySet()).get(0);
// 对redis实例循环执行ClusterMeet连接其他redis
boolean isMeet = clusterMeet(jedis, appId, master.getClient().getHost(), master.getClient().getPort());
// 分配slot
String response = masterJedis.clusterAddSlots(slotArr);
// 设置从节点
// 先获取主节点的nodeID,通过cluster nodes命令,取出myself
final String nodeId = getClusterNodeId(masterJedis);
// 再设置为备节点cluster replicate命令
response = slaveJedis.clusterReplicate(nodeId);

clusterMeet实际就是发送cluster meet命令。CLUSTER MEET命令被用来连接不同的开启集群支持的 Redis 节点,以进入集群。

分配slot用的是cluster addslots命令,将16384个slot分给对应的主节点

redis命令:https://redis.io/commands/cluster-addslots,注意版本

jedis客户端

通过查看源码发现,cachecloud提供的jedis客户端,完全是sohu自己实现的。不是常用的开源的那个jedis。

jedis封装了redis指令。

通过jedis客户端连接redis实例,并发送配置redis指令。

连接的代码redis.clients.jedis.Connection#connect

image-20220105175314592

用java的socket直接连接

Redis使用规范建议

Redis使用规范建议

key命名规范

  1. 把业务名作为前缀, 然后用冒号分隔,再加上具体的业务数据名

    比如,存储页面1991的uv。key设计为:uv:page:1991

  2. key不要太长。保证可读性的情况下,尽量简写

    减少内存的损耗

避免bigkey

Redis 是使用单线程读写数据,bigkey 的读写操作会阻塞线程,降低 Redis 的处理效率。

  1. 对于string类型的value,尽量保持10k以下
  2. 对于集合类型。尽量将集合的元素个数控制在1万以下

如果业务层的 String 类型数据确实很大,我 们还可以通过数据压缩来减小数据大小;

如果集合类型的元素的确很多,我们可以将一个 大集合拆分成多个小集合来保存。

使用整数对象共享池

整数是常用的数据类型,Redis 内部维护了 0 到 9999 这 1 万个整数对象,并把这些整数 作为一个共享池使用。

换句话说,如果一个键值对中有 0 到 9999 范围的整数,Redis 就不会为这个键值对专门 创建整数对象了,而是会复用共享池中的整数对象。

基于这个特点,在满足业务数据需求的前提下,能用整数时就尽量用整数,这 样可以节省内存

使用redis保持热数据

不同业务实例隔离

数据保存时尽量设置过期时间

控制单个 Redis 实例的容量

Redis 单实例的内存大小都不要太大,根据我自己的经验值,建议你设置在 2~6GB

线上禁用不安全命令

一些涉及大量操作、耗时长的命令,就会严 重阻塞主线程,导致其它请求无法得到正常处理,主要是这三个:

keys,flushall,flushdb

管理员用 rename-command 命令在配置文件中对这些命令进行重命名。

慎用monitor命令

慎用全量操作命令

对于集合类型的数据来说,如果想要获得集合中的所有元素,一般不建议使用全量操作的 命令(例如 Hash 类型的 HGETALL、Set 类型的 SMEMBERS)。

这些操作会对 Hash 和 Set 类型的底层数据结构进行全量扫描,如果集合类型数据较多的话,就会阻塞 Redis 主 线程。

  1. 可以使用 SSCAN、HSCAN 命令分批返回集合中的数据
  2. 可以化整为零,把一个大的 Hash 集合拆分成多个小的 Hash 集合。

不要把redis当成数据库来用

Redis使用规范建议

key命名规范

  1. 把业务名作为前缀, 然后用冒号分隔,再加上具体的业务数据名

    比如,存储页面1991的uv。key设计为:uv:page:1991

  2. key不要太长。保证可读性的情况下,尽量简写

    减少内存的损耗

避免bigkey

Redis 是使用单线程读写数据,bigkey 的读写操作会阻塞线程,降低 Redis 的处理效率。

  1. 对于string类型的value,尽量保持10k以下
  2. 对于集合类型。尽量将集合的元素个数控制在1万以下

如果业务层的 String 类型数据确实很大,我 们还可以通过数据压缩来减小数据大小;

如果集合类型的元素的确很多,我们可以将一个 大集合拆分成多个小集合来保存。

使用整数对象共享池

整数是常用的数据类型,Redis 内部维护了 0 到 9999 这 1 万个整数对象,并把这些整数 作为一个共享池使用。

换句话说,如果一个键值对中有 0 到 9999 范围的整数,Redis 就不会为这个键值对专门 创建整数对象了,而是会复用共享池中的整数对象。

基于这个特点,在满足业务数据需求的前提下,能用整数时就尽量用整数,这 样可以节省内存

使用redis保持热数据

不同业务实例隔离

数据保存时尽量设置过期时间

控制单个 Redis 实例的容量

Redis 单实例的内存大小都不要太大,根据我自己的经验值,建议你设置在 2~6GB

线上禁用不安全命令

一些涉及大量操作、耗时长的命令,就会严 重阻塞主线程,导致其它请求无法得到正常处理,主要是这三个:

keys,flushall,flushdb

管理员用 rename-command 命令在配置文件中对这些命令进行重命名。

慎用monitor命令

慎用全量操作命令

对于集合类型的数据来说,如果想要获得集合中的所有元素,一般不建议使用全量操作的 命令(例如 Hash 类型的 HGETALL、Set 类型的 SMEMBERS)。

这些操作会对 Hash 和 Set 类型的底层数据结构进行全量扫描,如果集合类型数据较多的话,就会阻塞 Redis 主 线程。

  1. 可以使用 SSCAN、HSCAN 命令分批返回集合中的数据
  2. 可以化整为零,把一个大的 Hash 集合拆分成多个小的 Hash 集合。

不要把redis当成数据库来用

总体方案

接到需求,有一个业务表数据量达到2亿。查询变慢。及时加上索引效果依旧达不到要求。

进行架构优化,对业务表按月份进行分表。

分表之后,计划保存3年之内的数据。那么每个月进行一次数据备份,备份方案如下

  1. 使用mongodump进行数据的导出。
  2. 编写脚本指定要进行导出的collection
  3. 使用cron定时执行
  4. 如果有业务需求,使用mongorestore导入数据进行还原

具体实现

mongodump 是 MongoDB 自带的备份工具,可以备份指定的数据库或集合中的数据到本地文件系统,方便进行数据恢复和迁移等操作。

我们现在需要备份指定的collection

1
mongodump --host <hostname> --port <port> --db <database_name> --collection <collection_name> --out <backup_directory>

另外,如果需要具有特定查询条件的文档,可以输入以下命令:

1
mongodump --host <hostname> --port <port> --db <database_name> --collection <collection_name> --query '{field: value}' --out <backup_directory>

其中,{field: value} 表示备份时可使用的查询条件,如可以输入 {date: { $gte: new ISODate("2023-04-20T00:00:00Z")}} 查找某一时间点之后的数据备份。

测试数据准备

创建了一个新库paas,一个新collection:log202303。

image-20230406154736648

用户paasAdmin,角色必须给readWrite和dbAdmin都有

1
2
3
4
5
6
7
8
9
const data = [];
for (let i = 1; i <= 100; i++) {
data.push({ name: `User ${i}`, age: Math.floor(Math.random() * 50) + 20 });
}

db.log202303.insertMany(data, (err, res) => {
if (err) throw err;
console.log(res.insertedCount + ' documents inserted');
});

使用代码直接插入了100条数据。

image-20230406144940844

可以看到其对应的wt文件

image-20230406145014625

看起来collection和wt文件是一一对应的。

找了生产的数据库看。实际上,存在大于2G的数据文件的

image-20230406152017870

备份数据

1
mongodump --host x.x.x.x --port 20000 -u paasAdmin -p 123456 --authenticationDatabase paas --db paas --collection log202303 --out /home/mongo/backup

image-20230406154847584

恢复数据

删除log202303这个collection。

模拟又新增了其他月份的数据log202304,并插入了一些数据。

1
2
3
4
midplat:PRIMARY> use paas
switched to db paas
midplat:PRIMARY> show collections
log202304

下面使用mongorestore进行log202303的恢复

1
mongorestore --host x.x.x.x --port 20000 -u paasAdmin -p 123456 --authenticationDatabase paas --db paas --collection log202303 --dir=/home/mongo/backup/paas/log202303.bson

image-20230406160246099

查看数据已经恢复

1
2
3
4
5
6
7
MongoDB server version: 4.0.26
midplat:PRIMARY> use paas
switched to db paas
midplat:PRIMARY> show collections
log202303
log202304
midplat:PRIMARY>

可以说是very good了

TODO

下面的工作就是写定时任务,用脚本把bson文件保存到对象存储上去

Predixy安装部署

Predixy是一款高性能的redis代理。支持redis集群模式。目前redis官方是在计划出自己的redis-cluster-proxy,但是目前进展是还没有发布生产版本。很尴尬,现在还是只能在非官方的软件中选择,根据网上其他人的测试,Predixy是其中性能最好的一款。也是大厂有在使用的。

步骤

Redis 集群故障切换

总结写在前面:

  1. 使用cluster failover命令进行手动主从切换
  2. cluster failover命令的参数
  3. redis集群的切换步骤(理论)

redis集群的故障切换可以分成自动和手动两种。例如部署的3主3从的集群,如果一台master节点宕机,redis集群有能力自动选举新的主节点,完成故障迁移,保障系统的高可用性。

本文记录的故障切换是手动的操作,意义在于

  1. 当集群完全正常时,因为某些原因需要主从进行切换。例如我们需要下线某台服务器进行服务器升级。
  2. 当集群中多数master节点宕机,集群已不可用时的快速恢复。

操作

命令部分很简单,要注意的是这个命令必须在备节点上执行

1
cluster failover

或者

1
cluster failover force

或者

1
cluster failover takeover

第一种情况,集群正常时

这种情况是我们按计划进行服务器升级时,经常遇到的情况。

要升级的服务器上可能跑着很多个master节点,我们应该在变更前将该服务器上的master节点进行主备切换,这样下线服务器时可以做到业务应用无感知。

操作很简单,这种情况下只要在备节点上执行

1
cluster failover
具体redis做的切换步骤
  1. slave节点告知其master节点停止处理来自业务的请求
  2. master 节点将当前replication offset 回复给该slave节点
  3. salve节点在master节点的变更没有全部同步到自己时(追上replication offset),不会进行主备切换动作。
  4. salve节点追上replication offset,开始进行主备切换的工作:从集群中其他master处获取最新的epoch,然后广播自己的配置
  5. 原master节点收到配置更新:解除客户端的访问阻塞,回复重定向信息,以便客户端可以和新master通信。

第二种情况,集群已宕机

第二种情况就比较紧急了,可能遇到了意外情况导致多台或者全部master宕机,或者遇到网络分区隔离。测试redis集群已经是不可用状态。如果备节点都在,我们可以使用failover快速恢复redis集群服务。

此时在备节点上执行

1
cluster failover takeover

在上面第一种情况时,主备切换的第四步中需要从集群中其他master处获取最新的epoch。所以如果多数master已经宕机的情况下,是会获取失败的。

加上参数takeover,备节点会自己生成epoch。如果epoch不是最大的,则取当前有效epoch值中的最大值并自增作为新的配置epoch。

然后将原master节点管理的所有哈希槽分配给自己,接着就广播配置。

如此可以快速恢复集群能力。

有得必有失

TAKEOVER 违反Redis群集的last-failover-wins 原则,因为这种情况下epoch是备节点自己产生的,并没有同集群进行协商,所以可能存在冲突。

所以非紧急情况下不要使用takeover参数

没有用到的force

failover还有一个参数是force。

加上force的切换步骤是,不可master节点进行协商,直接开启第四步从集群中其他master处获取最新的epoch,然后广播自己的配置

所以,使用force的前提是多数master依然活着。

那么,这种情况下,如果master节点宕机,redis集群可以自动选出主节点切换。也不需要使用failover手动切换

所以,我没用过这个参数,没有什么场景适合使用。

1,理论理解

微服务全家桶。

image-20200725153030522

image-20200725153307412

image-20200725153622675

(题外话,架构设计上,业务无关的,剥离出来做公共服务。业务相关的沉淀下来做基础服务。)

cloud组件升级

image-20200726223538485

版本匹配

springcloud和springboot的版本匹配

image-20200726224047806

更具体的版本可以查看:https://start.spring.io/actuator/info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
"spring-cloud": {
"Finchley.M2": "Spring Boot >=2.0.0.M3 and <2.0.0.M5",
"Finchley.M3": "Spring Boot >=2.0.0.M5 and <=2.0.0.M5",
"Finchley.M4": "Spring Boot >=2.0.0.M6 and <=2.0.0.M6",
"Finchley.M5": "Spring Boot >=2.0.0.M7 and <=2.0.0.M7",
"Finchley.M6": "Spring Boot >=2.0.0.RC1 and <=2.0.0.RC1",
"Finchley.M7": "Spring Boot >=2.0.0.RC2 and <=2.0.0.RC2",
"Finchley.M9": "Spring Boot >=2.0.0.RELEASE and <=2.0.0.RELEASE",
"Finchley.RC1": "Spring Boot >=2.0.1.RELEASE and <2.0.2.RELEASE",
"Finchley.RC2": "Spring Boot >=2.0.2.RELEASE and <2.0.3.RELEASE",
"Finchley.SR4": "Spring Boot >=2.0.3.RELEASE and <2.0.999.BUILD-SNAPSHOT",
"Finchley.BUILD-SNAPSHOT": "Spring Boot >=2.0.999.BUILD-SNAPSHOT and <2.1.0.M3",
"Greenwich.M1": "Spring Boot >=2.1.0.M3 and <2.1.0.RELEASE",
"Greenwich.SR6": "Spring Boot >=2.1.0.RELEASE and <2.1.17.BUILD-SNAPSHOT",
"Greenwich.BUILD-SNAPSHOT": "Spring Boot >=2.1.17.BUILD-SNAPSHOT and <2.2.0.M4",
"Hoxton.SR6": "Spring Boot >=2.2.0.M4 and <2.3.3.BUILD-SNAPSHOT",
"Hoxton.BUILD-SNAPSHOT": "Spring Boot >=2.3.3.BUILD-SNAPSHOT and <2.4.0.M1",
"2020.0.0-SNAPSHOT": "Spring Boot >=2.4.0.M1"
},
"spring-cloud-alibaba": {
"2.2.1.RELEASE": "Spring Boot >=2.2.0.RELEASE and <2.3.0.M1"
},

可以看到,Hoxton版本的springCloud,要求的SpringBoot版本在2.2.0和2.3.3之间

2,注册中心

eureka

已经闭源,不再使用

zookeeper

使用临时节点存储服务信息

个人认为并不是一个很好的注册中心,只是大家被dubbo带了节奏。注册中心只是存储一下服务信息,通过心跳管理服务状态,并没有什么高级的。

zookeeper的服务名区分大小写,并且不能有下划线

consul

go写的。服务发现+配置管理

异同点

image-20200730101909961

image-20200730101858106

在现在的分布式环境下,p一定要被保证。所以不是cp就是ap

image-20200730102153537

Ribbon负载均衡

客户端负载均衡

项目目前已经进入维护模式,但是依然很多人在用,ribbon还是比较优秀的

替代方案:spring自己出的loadbalance,但是还不稳定

功能相当于负载均衡+restTemplate调用

IRule切换负载均衡算法

注意不要放在主启动类能扫描的包下,会导致所有的ribbon都使用切换后的算法。

openfeign

feign已经放弃,现在使用openfeign

对ribbon的进一步封装,解决使用ribbon需要自己包装restTemplate的问题

使用起来有点像dubbo,但是依然不如dubbo封装的好,跟人感觉有点不伦不类

可以设定超时时间

服务降级

避免一个服务故障时,引起级联故障,导致整个服务崩溃。

当某个服务故障时,断路器能够监控到故障,进而向调用方返回一个可处理的备选响应(即一个兜底方案),而不是长时间的等待或者爆出异常,从而调用方的线程被长时间、不必要的占用。即使释放资源,从而避免故障在分布式系统中蔓延。

Hystrix 豪猪

已经停止更新,但是设计思想需要了解

功能:服务降级、服务熔断、接近实时的监控

服务降级、服务熔断、服务限流什么区别?

服务降级:fallback。不可用情况下的兜底服务响应。

服务熔断:break。达到最大服务量时,直接拒绝访问,然后以服务降级的方式返回兜底响应。

服务限流:flowlimit。在秒杀等高并发场景下,控制流量的进入。

解决:

服务降级:

服务端,启动类注解启动断路器。在方法上加上备选方法

消费端,也可以加上服务降级的备选方法。

看来服务降级挺好用的,在我们的springboot项目里也可使用。

优化:

1,配置全局fallback方法
2,在feignclient中配置fallback
服务熔断

熔断机制是应对雪崩效应的一种微服务链路保护机制。当链路服务不可用时们进行服务降级,进而熔断该节点微服务的调用,快速返回错误信息。并且当检测到该节点微服务恢复正常后,恢复链路的调用。

当失败达到一定的阈值,默认是5秒内20次失败,触发熔断机制。

网关

zuul

废弃了,zuul2也出不来了。

gateway

spring社区自己出的

基于webflux,webflux又是基于netty。异步非阻塞。

servlet2.5 vs servlet 3.0

image-20200801174421801

路由

断言,匹配条件

过滤器

配置中心

springcloud config

使用git存储

要做到自动刷新,需要手动向client端发一个post刷新请求actuator/refresh

改进:引入消息总线bus

消息总线bus

支持rabbitmq和kafka

原理就是所有configclient订阅消息topic

两种设计:

1,触发任意一个configclient端

2,触发configserver端

第二种设计更好

还是要手动post刷新一次,好处是只需要向configserver刷新一次

可以配置webhook

可以全局通知和定点通知

cloud stream

作用是屏蔽底层消息中间件的差异,统一编程模型

SpringCloud Gateway内置过滤器

基于2.2.4Release。Spring Cloud Gateway

SpringCloud Gateway的filter从作用范围上分有两种

  1. 针对于单个路由的gateway filter,它在配置文件中的写法同predict类似
  2. 针对于所有路由的global filer。不需要在配置文件中配置,即对所有路由生效。

GatewayFilter

Spring Cloud Gateway

为了方便起见,springcloud已经内置了很多常用的过滤器,我们应该知晓这些官方过滤器的存在,避免重复造轮子。具体每个过滤器的用法细节用到时再细究。

过滤器名 作用
AddRequestHeader 为原始请求添加Header
AddRequestParameter 为原始请求添加请求参数
AddResponseHeader 为原始响应添加Header
DedupeResponseHeader 剔除响应头中重复的值
Hystrix 为路由引入Hystrix的断路器保护
FallbackHeaders 为fallbackUri的请求头中添加具体的异常信息
PrefixPath 为原始请求路径添加前缀
PreserveHostHeader 为请求添加一个preserveHostHeader=true的属性,路由过滤器会检查该属性以决定是否要发送原始的Host
RequestRateLimiter 用于对请求限流,限流算法为令牌桶
RedirectTo 将原始请求重定向到指定的URL
RemoveHopByHopHeadersFilter 为原始请求删除IETF组织规定的一系列Header 默认就会启用,可以通过配置指定仅删除哪些Header
RemoveRequestHeader 为原始请求删除某个Header
RemoveResponseHeader 为原始响应删除某个Header
RewritePath 重写原始的请求路径
RewriteResponseHeader 重写原始响应中的某个Header
SaveSession 在转发请求之前,强制执行WebSession::save操作
SecureHeaders 为原始响应添加一系列起安全作用的响应头
SetPath 修改原始的请求路径
SetResponseHeader 修改原始响应中某个Header的值
SetStatus 修改原始响应的状态码
StripPrefix 用于截断原始请求的路径
Retry 针对不同的响应进行重试
RequestSize 设置允许接收最大请求包的大小。如果请求包大小超过设置的值,则返回 413 Payload Too Large 请求包大小,单位为字节,默认值为5M
ModifyRequestBody 在转发请求之前修改原始请求体内容
ModifyResponseBody 修改原始响应体的内容
Default 为所有路由添加过滤器 也就是说通过Default Filter所配置的过滤器工厂会作用到所有的路由上
MapRequestHeader 添加一个新的header,其值为一个已经存在的header的值。 fromHeader和toHeader两个参数

注意:每个过滤器工厂都对应一个实现类,并且这些类的名称必须以GatewayFilterFactory结尾,这是Spring Cloud Gateway的一个约定,例如AddRequestHeader对应的实现类为AddRequestHeaderGatewayFilterFactory。对源码感兴趣的小伙伴就可以按照这个规律拼接出具体的类名,以此查找这些内置过滤器工厂的实现代码

使用方法举例

1
2
3
4
5
6
7
8
spring:
cloud:
gateway:
routes:
- id: add_request_header_route
uri: <https://example.org>
filters:
- AddRequestHeader=My-Header, Hello

这样即可为原始请求添加名为 My-Header ,值为 Hello的请求头

配置默认过滤器,即Default的用法

1
2
3
4
5
spring:
cloud:
gateway:
default-filters:
- PrefixPath=/base

GlobalFilter

Spring Cloud Gateway

不需要进行配置,默认已经生效了。

但我们必须了解其功能,以备不时之需。

ForwardRoutingFilter

这个过滤器会根据formord模式规则将情况转发给DispatcherHandler,然后转发给gateway网关自己的服务中

处理uri的scheme是forward的请求

forward模式就是forward:///localendpoint这种形式

举个例子来理解

1
2
3
4
5
6
7
- id: forward_routing_filter
uri: forward:///app
order: 10000
predicates:
- Path=/forwardFilterTest
filters:
- PrefixPath=/gateway

还要在gateway项目中创建个服务

1
2
3
4
5
6
7
8
@RestController
@RequestMapping("gateway")
public class FowardRoutingFilterController {
@RequestMapping("app")
public String globalFilters() {
return "Forward跳转成功";
}
}

我们输入http://localhost:8080/forwardFilterTest进行测试。这个请求最终会进到/gateway/app这个服务中

因为在spring-cloud-gateway服务收到请求之后,会执行以下步骤

  1. 根据请求路径/forwardFilterTest匹配到路由forward_routing_filter,并将请求跳转为:http://localhost:8080/app

  2. filters里面的PrefixPath配置请求改写为:http://localhost:8080/gateway/app

  3. ForwardRoutingFilter过滤器中判断路由中有foroward://前缀,将请求转发给DispatcherHandler

  4. DispatcherHandler匹配并转到spring-cloud-gateway服务中的contoller匹配的路径

总之,这种转发的重点在于转发给了gateway本身项目的服务。

使用场景不多。

LoadBalancerClientFilter

负载均衡过滤器。这个是非常常用到的过滤器。

因为网关后面的服务可以启动多个实例,使用负载均衡过滤器可以自动根据负载均衡规则路由到某台服务实例上面。

处理uri的scheme是lb的请求

使用这个过滤器的规则就是匹配到负载均衡的模式,即lb://xxx

举个例子

1
2
3
4
5
6
7
8
spring:
cloud:
gateway:
routes:
- id: myRoute
uri: lb://service
predicates:
- Path=/service/**

uri写为lb://service,即可在转发时使用ReactiveLoadBalancerClientFilter 。

service的名字即为spring.application.name的值

注意:默认情况下,如果LoadBalancer根据配置的实例名找不到有效的服务实例,返回状态码503,如果你想配置返回404,可以这样设置:

1
spring.cloud.gateway.loadbalancer.use404=true

NettyRoutingFilter

这是一个优先级最低的过滤器。

1
2
3
4
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

处理uri的sechme说http或者https的请求。

因为他的优先级最低,也就是pre流程的最后一个filter。它将使用Netty的HttpClient创建向下执行的请求代理。

前面说过,请求分为pre过程和post过程。那么springcloud-gateway是怎么区分一个请求目前是处在那个过程的呢?

答案就在NettyRoutingFilter里。

NettyRoutingFilter是请求进来进行处理的最后一个filter,所以在NettyRoutingFilter的代码中,

1
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

会加上CLIENT_RESPONSE_CONN_ATTR一个属性。

之后的请求都带上这个属性,springcloud-gateway判断有这个属性的就是post类型的filter。比如下面的NettyWriteResponseFilter

NettyWriteResponseFilter

它的优先级是最高的。所以他是在前面的filter。

并且他是post类型的过滤器。也就相当于是进行响应处理的最后一个filter

经过他之后,就将响应的数据发送给网关的客户端了。

1
2
3
4
5
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
@Override
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}

为什么他是post类型的过滤器?

上面说到springcloud-gateway判断有CLIENT_RESPONSE_CONN_ATTR属性的就是post类型的filter

看源码

1
2
3
4
5
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

if (connection == null) {
return Mono.empty();
}

NettyWriteResponseFilter会判断CLIENT_RESPONSE_CONN_ATTR属性,如果没有,直接返回不做任何处理。

所以,因为NettyWriteResponseFilter的order是-1。

一般会排在最前面,所以请求首先会进来,但是因为当前没有CLIENT_RESPONSE_CONN_ATTR属性,所以就算经过NettyWriteResponseFilter也不会做任何处理。

等到过了NettyRoutingFilter,加上了CLIENT_RESPONSE_CONN_ATTR属性,再回来的时候就会得到NettyWriteResponseFilter的处理了。这也就是post过程了。

WebsocketRoutingFilter

很明显处理websocket的过滤器。

处理uri的scheme是ws或wss的请求。

1
2
3
4
- id: websocket_route
uri: ws://localhost:3001
predicates:
- Path=/websocket/**

也可以对ws进行负载均衡,比如:lb:ws://serviceid

RouteToRequestUrlFilter

处理的是请求的属性中有gatewayRoute这个属性的请求。

它的作用是根据route的uri配置,重新修改请求的URL地址

比如浏览器请求网关的URL是:http://localhost:8080/app-a/app/balance,

路由的URI配置是:uri: lb://app-a

那么修改之后的路由的URI是:lb://app-a/app/balance

filter的执行顺序

路由的filter可以配置多个。

如果是GatewayFilter,根据其在路由中的配置确定其执行顺序

1
2
3
4
5
6
7
8
9
10
11
12
spring:
cloud:
gateway:
routes:
- id: path_route
uri: http://www.baidu.com
predicates:
- Path=/get
filters:
- AddRequestHeader=X-Request-Foo, Bar
- AddResponseHeader=X-Response-Foo, Bar
- DedupeResponseHeader=Foo

又因为一个请求分为进pre和出post两个方向。放在前面的filter在pre过程中先执行,在post过程中后执行。

Spring Cloud Gateway Diagram

然后再加上GlobalFilter,和GatewayFilter的顺序会怎么执行呢?

根据Ordered接口

我们自定义GlobalFilter时,都会同时实现Ordered接口。

他只有一个方法getOrder(),返回优先级。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean
public GlobalFilter customFilter() {
return new CustomGlobalFilter();
}

public class CustomGlobalFilter implements GlobalFilter, Ordered {

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("custom global filter");
return chain.filter(exchange);
}

@Override
public int getOrder() {
return -1;
}
}

数字越大,优先级越低,这个filter越靠后。

所以,源码中最高优先级的值就是Integer的最小值。

1
2
3
4
5
6
7
8
9
10
11
12

/**
* Useful constant for the highest precedence value.
* @see java.lang.Integer#MIN_VALUE
*/
int HIGHEST_PRECEDENCE = Integer.MIN_VALUE;

/**
* Useful constant for the lowest precedence value.
* @see java.lang.Integer#MAX_VALUE
*/
int LOWEST_PRECEDENCE = Integer.MAX_VALUE;

做个测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
@Configuration
public class GatewayTest {
@Bean
@Order(-1)
public GlobalFilter a() {
return (exchange, chain) -> {
log.info("first pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("third post filter");
}));
};
}

@Bean
@Order(0)
public GlobalFilter b() {
return (exchange, chain) -> {
log.info("second pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("second post filter");
}));
};
}

@Bean
@Order(1)
public GlobalFilter c() {
return (exchange, chain) -> {
log.info("third pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("first post filter");
}));
};
}
}

触发一个请求,打印日志

1
2
3
4
5
6
7
first pre filter
second pre filter
third pre filter

first post filter
second post filter
third post filter

也可以看出,“pre”类型的过滤器是在(exchanges,chain)->{}中执行的,

而“post”类型的过滤器是在chain.filter(exchange).then(Mono.fromRunnable(()->{}))中执行的。

以上代码,如果不喜欢lambda形式的写法,等价于以下写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Component
public class CorsResponseHeaderFilter implements GlobalFilter, Ordered {

@Override
public int getOrder() {
return 1;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("third pre filter");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("first post filter");
}));
}