# 第3节.Nacos中的生产者消费者多线程模式的使用

Nacos的源码中大量的使用了多线程并发的JUC中的类,很多代码都是基于生产者和消费者的模式,提高了并发请求的效率。

这里分享下相关说明,如何寻找哪些类使用了这种模式呢?可以在全局中搜索LinkedBlockingQueue、ArrayBlockingQueue的引用,就可以知道在哪里使用了。

比如Nacos中的AP模式的一致性处理的时候的核心逻辑类DistroConsistencyServiceImpl中,就使用了,

//调用通知类,向队列中添加变更数据
notifier.addTask(key, DataOperation.CHANGE);

内部类的核心代码如下:

public class Notifier implements Runnable {
    
  	/**
  	* 内存中的服务
  	*/
    private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
    
  	/**
  	* 阻塞队列任务
  	*/
    private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
    
    public void addTask(String datumKey, DataOperation action) {
        
        if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
            return;
        }
        if (action == DataOperation.CHANGE) {
            services.put(datumKey, StringUtils.EMPTY);
        }
      	//放任务到队列中
        tasks.offer(Pair.with(datumKey, action));
    }
    
    public int getTaskSize() {
        return tasks.size();
    }
    
    @Override
    public void run() {
        Loggers.DISTRO.info("distro notifier started");
        
        for (; ; ) {
            try {
              	//线程运行的时候,阻塞从队列中进行取出任务
                Pair<String, DataOperation> pair = tasks.take();
                handle(pair);
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }
    
  	//处理任务
    private void handle(Pair<String, DataOperation> pair) {
        try {
            String datumKey = pair.getValue0();
            DataOperation action = pair.getValue1();
            
            services.remove(datumKey);
            
            int count = 0;
            
            if (!listeners.containsKey(datumKey)) {
                return;
            }
            
            for (RecordListener listener : listeners.get(datumKey)) {
                
                count++;
                
                try {
                  	//触发监听器
                    if (action == DataOperation.CHANGE) {
                        listener.onChange(datumKey, dataStore.get(datumKey).value);
                        continue;
                    }
                    
                    if (action == DataOperation.DELETE) {
                        listener.onDelete(datumKey);
                        continue;
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                }
            }
            
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO
                        .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                datumKey, count, action.name());
            }
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}
Last Updated: 7/10/2022, 1:50:01 PM