# 第3节.Nacos中的生产者消费者多线程模式的使用
Nacos的源码中大量的使用了多线程并发的JUC中的类,很多代码都是基于生产者和消费者的模式,提高了并发请求的效率。
这里分享下相关说明,如何寻找哪些类使用了这种模式呢?可以在全局中搜索LinkedBlockingQueue、ArrayBlockingQueue的引用,就可以知道在哪里使用了。
比如Nacos中的AP模式的一致性处理的时候的核心逻辑类DistroConsistencyServiceImpl中,就使用了,
//调用通知类,向队列中添加变更数据
notifier.addTask(key, DataOperation.CHANGE);
内部类的核心代码如下:
public class Notifier implements Runnable {
/**
* 内存中的服务
*/
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
/**
* 阻塞队列任务
*/
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//放任务到队列中
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
//线程运行的时候,阻塞从队列中进行取出任务
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
//处理任务
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
//触发监听器
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}