# 第6节.Nacos中的观察者设计模式的使用

# 一、观察者模式的定义

观察者模式是一种定义对象相互之间依赖关系的一种设计模式,它属于设计模式中的行为模式,通过发布/订阅的方式在对象之间互相传送消息。在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知

设计模式要干的事情就是解耦。创建型模式是将创建和使用代码解耦,结构性模式是将不同功能代码解耦,行为型模式是将不同行为解耦,具体到观察者模式,是将观察者和被观察者代码解耦。

# 二、观察者模式的使用场景

1、一个对象的改变会导致一个或多个对象发生改变,而并不知道具体有多少对象将会发生改变,也不知道这些对象是谁

2、当一个抽象模型有两个方面,其中的一个方面依赖于另一个方面时,可将这两者封装在独立的对象中以使他们可以各自独立地改变和复用

3、需要在系统中创建一个触发链,使得事件拥有跨域通知(跨越两种观察者的类型)

# 三、Nacos中的设计和使用

# 3.1、Nacos中的事件中心处理模型

nacos中NotifyCenter类定义了一套统一的事件处理模型,通过该类可以注册事件、订阅事件、发布事件。

事件抽象类Event如下所示:

/**
* 统一的事件处理模型Event,各个业务模块需要继承这个事件抽象
*/
public abstract class Event implements Serializable {

    private static final long serialVersionUID = -3731383194964997493L;

    private static final AtomicLong SEQUENCE = new AtomicLong(0);

    private final long sequence = SEQUENCE.getAndIncrement();
    
    /**
     * Event sequence number, which can be used to handle the sequence of events.
     *
     * @return sequence num, It's best to make sure it's monotone.
     */
    public long sequence() {
        return sequence;
    }
    
}

Nacos中的常用的事件如下:

事件类 含义作用
ClientChangedEvent 客户端改变事件
ServerConfigChangeEvent 服务端配置文件改变事件
ConfigDataChangeEvent 配置数据改变事件
ServiceChangedEvent 服务改变事件
InstancesChangeEvent 服务实例改变事件

EventPublisher类和DefaultPublisher类定义和实现了事件发布者。

Subscriber类定义了事件订阅者,相关的订阅者需要实现该类。

使用位置1:

比如NacosNamingService类在初始化的时候,注册了一个实例改变的事件的发布者

NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);

同时注册了一个事件的订阅者:

NotifyCenter.registerSubscriber(changeNotifier);

订阅类核心代码如下:

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    
    private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
    
    private final Object lock = new Object();
    
    /**
     * register listener.
     *
     * @param groupName   group name
     * @param serviceName serviceName
     * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
     * @param listener    custom listener
     */
    public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (eventListeners == null) {
            synchronized (lock) {
                eventListeners = listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = new ConcurrentHashSet<EventListener>();
                    listenerMap.put(key, eventListeners);
                }
            }
        }
        eventListeners.add(listener);
    }
    
    /**
     * deregister listener.
     *
     * @param groupName   group name
     * @param serviceName serviceName
     * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
     * @param listener    custom listener
     */
    public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (eventListeners == null) {
            return;
        }
        eventListeners.remove(listener);
        if (CollectionUtils.isEmpty(eventListeners)) {
            listenerMap.remove(key);
        }
    }
    
    /**
     * check serviceName,clusters is subscribed.
     *
     * @param groupName   group name
     * @param serviceName serviceName
     * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
     * @return is serviceName,clusters subscribed
     */
    public boolean isSubscribed(String groupName, String serviceName, String clusters) {
        String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        return CollectionUtils.isNotEmpty(eventListeners);
    }
    
    public List<ServiceInfo> getSubscribeServices() {
        List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
        for (String key : listenerMap.keySet()) {
            serviceInfos.add(ServiceInfo.fromKey(key));
        }
        return serviceInfos;
    }
    
    @Override
    public void onEvent(InstancesChangeEvent event) {
        String key = ServiceInfo
                .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        for (final EventListener listener : eventListeners) {
            final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
            if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
                ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
            } else {
                listener.onEvent(namingEvent);
            }
        }
    }
    
    private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent(
            InstancesChangeEvent instancesChangeEvent) {
        return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
                instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
    }
    
    @Override
    public Class<? extends Event> subscribeType() {
        return InstancesChangeEvent.class;
    }
    
}

对于实例变更事件来说,每个订阅者,内部又维护了多个事件监听器。当某个服务实例变动的时候,触发处于该服务实例下的监听处理信息。

Nacos中很多功能的交互,都是通过事件模型来处理,达到一个解耦的效果。这套模型也值得我们学习。

Last Updated: 10/5/2022, 4:38:08 PM