# 第4节.Nacos中策略设计模式的使用

# 一、策略模式的定义

策略模式属于行为型模式,是使用最多的设计模式之一,策略模式定义了算法族,分别封装起来,让它们之间可以互相替换,此模式让算法的变化独立于使用算法的客户。

# 二、策略模式的使用场景

策略设计模式一般使用的场景是,多种可互相替代的同类行为,在具体的运行过程中根据不同的情况,选择其中一种行为来执行。

# 三、Nacos中的使用

# 3.1、Distro协议组件维护对象相关DistroComponentHolder

在Nacos中,Distro协议的处理包括了:

存储:DataStorage

请求传输代理:TransportAgent

任务失败处理器:TaskFailedHandler

数据一致性服务:ConsistencyService

可以参考DistroHttpRegistry(1.x)类和DistroClientComponentRegistry(2.x)的注册设计,在这两个类中,对于如上几点的处理,分别注册了对应的处理策略,以DistroTransportAgent这个传输请求代理为例,该接口定义了请求的基本方法,

public interface DistroTransportAgent {
    
    boolean supportCallbackTransport();
    
    boolean syncData(DistroData data, String targetServer);
    
    void syncData(DistroData data, String targetServer, DistroCallback callback);
    
    boolean syncVerifyData(DistroData verifyData, String targetServer);
    
    void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);
    
    DistroData getData(DistroKey key, String targetServer);
    
    DistroData getDatumSnapshot(String targetServer);
}

其中HTTP的请求方式代码如下:

public class DistroHttpAgent implements DistroTransportAgent {
    
    private final ServerMemberManager memberManager;
    
    public DistroHttpAgent(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
    }
    
    @Override
    public boolean supportCallbackTransport() {
        return false;
    }
    
    @Override
    public boolean syncData(DistroData data, String targetServer) {
        if (!memberManager.hasMember(targetServer)) {
            return true;
        }
        byte[] dataContent = data.getContent();
        return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
    }
    
    @Override
    public void syncData(DistroData data, String targetServer, DistroCallback callback) {
        throw new UnsupportedOperationException("Http distro agent do not support this method");
    }
    
    @Override
    public boolean syncVerifyData(DistroData verifyData, String targetServer) {
        if (!memberManager.hasMember(targetServer)) {
            return true;
        }
        NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);
        return true;
    }
    
    @Override
    public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
        throw new UnsupportedOperationException("Http distro agent do not support this method");
    }
    
    //.....省略部分代码
}

对于Grpc的方式核心代码如下:

public class DistroClientTransportAgent implements DistroTransportAgent {
    
    private final ClusterRpcClientProxy clusterRpcClientProxy;
    
    private final ServerMemberManager memberManager;
    
    public DistroClientTransportAgent(ClusterRpcClientProxy clusterRpcClientProxy,
            ServerMemberManager serverMemberManager) {
        this.clusterRpcClientProxy = clusterRpcClientProxy;
        this.memberManager = serverMemberManager;
    }
    
    @Override
    public boolean supportCallbackTransport() {
        return true;
    }
    
    @Override
    public boolean syncData(DistroData data, String targetServer) {
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        DistroDataRequest request = new DistroDataRequest(data, data.getType());
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
        }
        return false;
    }
    
    @Override
    public void syncData(DistroData data, String targetServer, DistroCallback callback) {
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
            return;
        }
        DistroDataRequest request = new DistroDataRequest(data, data.getType());
        Member member = memberManager.find(targetServer);
        try {
            clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }
    
    @Override
    public boolean syncVerifyData(DistroData verifyData, String targetServer) {
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        // replace target server as self server so that can callback.
        verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
        }
        return false;
    }
    
    @Override
    public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
            return;
        }
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        try {
            DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                    verifyData.getDistroKey().getResourceKey(), callback, member);
            clusterRpcClientProxy.asyncRequest(member, request, wrapper);
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }
		//.....省略部分代码
    
}    

当以上不同的策略定义好以后,Registry类将该实现类注册到DistroComponentHolder类中的Map对象了,并在使用的时候,根据不同的类型,进行了对象的读取,实现了不同策略的选择。

在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请求场景主要分为三种情况:

1、当该节点接收到属于该节点负责的实例的写请求时,直接写入。

2、当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,从而完成读写。

3、当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机器上)。

Last Updated: 10/3/2022, 5:50:00 PM