# 第4节.Nacos中策略设计模式的使用
# 一、策略模式的定义
策略模式属于行为型模式,是使用最多的设计模式之一,策略模式定义了算法族,分别封装起来,让它们之间可以互相替换,此模式让算法的变化独立于使用算法的客户。
# 二、策略模式的使用场景
策略设计模式一般使用的场景是,多种可互相替代的同类行为,在具体的运行过程中根据不同的情况,选择其中一种行为来执行。
# 三、Nacos中的使用
# 3.1、Distro协议组件维护对象相关DistroComponentHolder
在Nacos中,Distro协议的处理包括了:
存储:DataStorage
请求传输代理:TransportAgent
任务失败处理器:TaskFailedHandler
数据一致性服务:ConsistencyService
可以参考DistroHttpRegistry(1.x)类和DistroClientComponentRegistry(2.x)的注册设计,在这两个类中,对于如上几点的处理,分别注册了对应的处理策略,以DistroTransportAgent这个传输请求代理为例,该接口定义了请求的基本方法,
public interface DistroTransportAgent {
boolean supportCallbackTransport();
boolean syncData(DistroData data, String targetServer);
void syncData(DistroData data, String targetServer, DistroCallback callback);
boolean syncVerifyData(DistroData verifyData, String targetServer);
void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);
DistroData getData(DistroKey key, String targetServer);
DistroData getDatumSnapshot(String targetServer);
}
其中HTTP的请求方式代码如下:
public class DistroHttpAgent implements DistroTransportAgent {
private final ServerMemberManager memberManager;
public DistroHttpAgent(ServerMemberManager memberManager) {
this.memberManager = memberManager;
}
@Override
public boolean supportCallbackTransport() {
return false;
}
@Override
public boolean syncData(DistroData data, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true;
}
byte[] dataContent = data.getContent();
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
throw new UnsupportedOperationException("Http distro agent do not support this method");
}
@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
if (!memberManager.hasMember(targetServer)) {
return true;
}
NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);
return true;
}
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
throw new UnsupportedOperationException("Http distro agent do not support this method");
}
//.....省略部分代码
}
对于Grpc的方式核心代码如下:
public class DistroClientTransportAgent implements DistroTransportAgent {
private final ClusterRpcClientProxy clusterRpcClientProxy;
private final ServerMemberManager memberManager;
public DistroClientTransportAgent(ClusterRpcClientProxy clusterRpcClientProxy,
ServerMemberManager serverMemberManager) {
this.clusterRpcClientProxy = clusterRpcClientProxy;
this.memberManager = serverMemberManager;
}
@Override
public boolean supportCallbackTransport() {
return true;
}
@Override
public boolean syncData(DistroData data, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
return false;
}
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
}
return false;
}
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
return;
}
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
try {
clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
if (isNoExistTarget(targetServer)) {
return true;
}
// replace target server as self server so that can callback.
verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
return false;
}
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
} catch (NacosException e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
}
return false;
}
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
return;
}
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
try {
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
verifyData.getDistroKey().getResourceKey(), callback, member);
clusterRpcClientProxy.asyncRequest(member, request, wrapper);
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
//.....省略部分代码
}
当以上不同的策略定义好以后,Registry类将该实现类注册到DistroComponentHolder类中的Map对象了,并在使用的时候,根据不同的类型,进行了对象的读取,实现了不同策略的选择。
在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请求场景主要分为三种情况:
1、当该节点接收到属于该节点负责的实例的写请求时,直接写入。
2、当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,从而完成读写。
3、当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机器上)。