# 第2节:Nacos中的CAS并发的使用
CAS的全称为Compare-And-Swap,它是一条CPU并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。他会比较当前工作内存中的值和主内存中的值,如果相同则执行规定操作,否则继续比较直到主内存和工作内存中的值一致为止。
在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。
CAS 是一条 CPU 的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe 提供的 CAS 方法(如compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg
先来看一下Java中关于CAS经常使用的一些类,位于 java.util.concurrent.atomic包下面,常用的如下:
AtomicBoolean
AtomicInteger
AtomicLong
AtomicStampedReference
。。。。。
在看看Nacos中的CAS并发都在哪里用了。
# 1、Nacos寻址机制
AbstractMemberLookup抽象类使用CAS做开始和停止
package com.alibaba.nacos.core.cluster;
public abstract class AbstractMemberLookup implements MemberLookup {
protected AtomicBoolean start = new AtomicBoolean(false);
@Override
public void destroy() throws NacosException {
if (start.compareAndSet(true, false)) {
doDestroy();
}
}
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
doStart();
}
}
//子类实现doStart和doDestory
}
可以看到Nacos中这块采用了模板方法设计模式,并基于CAS做了统一的上层控制判断,底层真实实现业务无需关心CAS并发操作问题。
# 2、Nacos中资源的处理
采用CAS的方式优雅的进行控制,比如ThreadPoolManager类的代码实现:
package com.alibaba.nacos.common.executor;
public final class ThreadPoolManager {
private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/**
* Shutdown thread pool manager.
*/
public static void shutdown() {
if (!CLOSED.compareAndSet(false, true)) {
return;
}
Set<String> namespaces = INSTANCE.resourcesManager.keySet();
for (String namespace : namespaces) {
INSTANCE.destroy(namespace);
}
}
}
同样逻辑的实现还有NotifyCenter、HttpClientBeanHolder、TaskExecuteWorker。
其中TaskExecuteWorker类的设计和使用方式很值得我们学习,尤其在任务类的开发模式中,我们的日常开发中比较常用,优雅的进行了线程的关闭处理。核心代码如下:
public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
/**
* Max task queue size 32768.
*/
private static final int QUEUE_CAPACITY = 1 << 15;
private final Logger log;
private final String name;
private final BlockingQueue<Runnable> queue;
private final AtomicBoolean closed;
public TaskExecuteWorker(final String name, final int mod, final int total) {
this(name, mod, total, null);
}
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
this.name = name + "_" + mod + "%" + total;
this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
new InnerWorker(name).start();
}
public String getName() {
return name;
}
@Override
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
public int pendingTaskCount() {
return queue.size();
}
/**
* Worker status.
*/
public String status() {
return name + ", pending tasks: " + pendingTaskCount();
}
@Override
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
}
/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
}
}
}
}
}
# 3、一致性协议实现
JRaftProtocol的类,采用了2个AtomicBoolean对象进行控制,核心代码如下:
public class JRaftProtocol extends AbstractConsistencyProtocol<RaftConfig, RequestProcessor4CP>
implements CPProtocol<RaftConfig, RequestProcessor4CP> {
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean shutdowned = new AtomicBoolean(false);
@Override
public void init(RaftConfig config) {
if (initialized.compareAndSet(false, true)) {
this.raftConfig = config;
NotifyCenter.registerToSharePublisher(RaftEvent.class);
this.raftServer.init(this.raftConfig);
this.raftServer.start();
// There is only one consumer to ensure that the internal consumption
// is sequential and there is no concurrent competition
NotifyCenter.registerSubscriber(new Subscriber<RaftEvent>() {
。。。。。。
});
}
}
@Override
public void shutdown() {
if (initialized.get() && shutdowned.compareAndSet(false, true)) {
Loggers.RAFT.info("shutdown jraft server");
raftServer.shutdown();
}
}
}
# 4、客户端Naming选择Server
客户端模块中的ServerListManager类AtomicInteger类存储当前服务节点,核心代码如下所示:
package com.alibaba.nacos.client.naming.core;
public class ServerListManager implements ServerListFactory, Closeable {
private final AtomicInteger currentIndex = new AtomicInteger();
private final List<String> serverList = new ArrayList<>();
public ServerListManager(Properties properties, String namespace) {
this.namespace = namespace;
initServerAddr(properties);
if (!serverList.isEmpty()) {
currentIndex.set(new Random().nextInt(serverList.size()));
} else {
throw new NacosLoadException("serverList is empty,please check configuration");
}
}
@Override
public String genNextServer() {
int index = currentIndex.incrementAndGet() % getServerList().size();
return getServerList().get(index);
}
@Override
public String getCurrentServer() {
return getServerList().get(currentIndex.get() % getServerList().size());
}
}
可以看到这个类基于AtomicInteger实现了服务选择,初始化情况为随机选择,下一台服务器时采用+1取余的方式。
# 5、自定义线程工厂
我们知道,在自定义线程工厂的时候都会指定线程的名字,以便于排查问题
Nacos中的NameThreadFactory使用AtomicInteger来维护名字中的自增顺序维护,代码如下:
public class NameThreadFactory implements ThreadFactory {
private final AtomicInteger id = new AtomicInteger(0);
private String name;
public NameThreadFactory(String name) {
if (!name.endsWith(StringUtils.DOT)) {
name += StringUtils.DOT;
}
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
String threadName = name + id.getAndIncrement();
Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
}
类似的操作类还有指标监控类MetricsMonitor等等