# 第2节:Nacos中的CAS并发的使用

CAS的全称为Compare-And-Swap,它是一条CPU并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。他会比较当前工作内存中的值和主内存中的值,如果相同则执行规定操作,否则继续比较直到主内存和工作内存中的值一致为止。

在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。

CAS 是一条 CPU 的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe 提供的 CAS 方法(如compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg

先来看一下Java中关于CAS经常使用的一些类,位于 java.util.concurrent.atomic包下面,常用的如下:

AtomicBoolean
AtomicInteger
AtomicLong
AtomicStampedReference
。。。。。

在看看Nacos中的CAS并发都在哪里用了。

# 1、Nacos寻址机制

AbstractMemberLookup抽象类使用CAS做开始和停止

package com.alibaba.nacos.core.cluster;

public abstract class AbstractMemberLookup implements MemberLookup {

 		protected AtomicBoolean start = new AtomicBoolean(false);
	
		@Override
    public void destroy() throws NacosException {
        if (start.compareAndSet(true, false)) {
            doDestroy();
        }
    }
    
    @Override
    public void start() throws NacosException {
        if (start.compareAndSet(false, true)) {
            doStart();
        }
    }
		
		//子类实现doStart和doDestory
}

可以看到Nacos中这块采用了模板方法设计模式,并基于CAS做了统一的上层控制判断,底层真实实现业务无需关心CAS并发操作问题。

# 2、Nacos中资源的处理

采用CAS的方式优雅的进行控制,比如ThreadPoolManager类的代码实现:

package com.alibaba.nacos.common.executor;

public final class ThreadPoolManager {
		private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
		private static final AtomicBoolean CLOSED = new AtomicBoolean(false);

		 /**
     * Shutdown thread pool manager.
     */
    public static void shutdown() {
        if (!CLOSED.compareAndSet(false, true)) {
            return;
        }
        Set<String> namespaces = INSTANCE.resourcesManager.keySet();
        for (String namespace : namespaces) {
            INSTANCE.destroy(namespace);
        }
    }

}

同样逻辑的实现还有NotifyCenter、HttpClientBeanHolder、TaskExecuteWorker。

其中TaskExecuteWorker类的设计和使用方式很值得我们学习,尤其在任务类的开发模式中,我们的日常开发中比较常用,优雅的进行了线程的关闭处理。核心代码如下:

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
    
    /**
     * Max task queue size 32768.
     */
    private static final int QUEUE_CAPACITY = 1 << 15;
    
    private final Logger log;
    
    private final String name;
    
    private final BlockingQueue<Runnable> queue;
    
    private final AtomicBoolean closed;
    
    public TaskExecuteWorker(final String name, final int mod, final int total) {
        this(name, mod, total, null);
    }
    
    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        this.name = name + "_" + mod + "%" + total;
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        this.closed = new AtomicBoolean(false);
        this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
        new InnerWorker(name).start();
    }
    
    public String getName() {
        return name;
    }
    
    @Override
    public boolean process(NacosTask task) {
        if (task instanceof AbstractExecuteTask) {
            putTask((Runnable) task);
        }
        return true;
    }
    
    private void putTask(Runnable task) {
        try {
            queue.put(task);
        } catch (InterruptedException ire) {
            log.error(ire.toString(), ire);
        }
    }
    
    public int pendingTaskCount() {
        return queue.size();
    }
    
    /**
     * Worker status.
     */
    public String status() {
        return name + ", pending tasks: " + pendingTaskCount();
    }
    
    @Override
    public void shutdown() throws NacosException {
        queue.clear();
        closed.compareAndSet(false, true);
    }
    
    /**
     * Inner execute worker.
     */
    private class InnerWorker extends Thread {
        
        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }
        
        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e.toString(), e);
                }
            }
        }
    }
}

# 3、一致性协议实现

JRaftProtocol的类,采用了2个AtomicBoolean对象进行控制,核心代码如下:

public class JRaftProtocol extends AbstractConsistencyProtocol<RaftConfig, RequestProcessor4CP>
        implements CPProtocol<RaftConfig, RequestProcessor4CP> {
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    
    private final AtomicBoolean shutdowned = new AtomicBoolean(false);
        
    @Override
    public void init(RaftConfig config) {
        if (initialized.compareAndSet(false, true)) {
            this.raftConfig = config;
            NotifyCenter.registerToSharePublisher(RaftEvent.class);
            this.raftServer.init(this.raftConfig);
            this.raftServer.start();
            
            // There is only one consumer to ensure that the internal consumption
            // is sequential and there is no concurrent competition
            NotifyCenter.registerSubscriber(new Subscriber<RaftEvent>() {
              。。。。。。
            });
        }
    }
    
    @Override
    public void shutdown() {
        if (initialized.get() && shutdowned.compareAndSet(false, true)) {
            Loggers.RAFT.info("shutdown jraft server");
            raftServer.shutdown();
        }
    }
}        

# 4、客户端Naming选择Server

客户端模块中的ServerListManager类AtomicInteger类存储当前服务节点,核心代码如下所示:

package com.alibaba.nacos.client.naming.core;
public class ServerListManager implements ServerListFactory, Closeable {
	 
	 private final AtomicInteger currentIndex = new AtomicInteger();
    
   private final List<String> serverList = new ArrayList<>();
   
   public ServerListManager(Properties properties, String namespace) {
        this.namespace = namespace;
        initServerAddr(properties);
        if (!serverList.isEmpty()) {
            currentIndex.set(new Random().nextInt(serverList.size()));
        } else {
            throw new NacosLoadException("serverList is empty,please check configuration");
        }
    }
		
  	 @Override
    public String genNextServer() {
        int index = currentIndex.incrementAndGet() % getServerList().size();
        return getServerList().get(index);
    }
    
    @Override
    public String getCurrentServer() {
        return getServerList().get(currentIndex.get() % getServerList().size());
    }
    
}

可以看到这个类基于AtomicInteger实现了服务选择,初始化情况为随机选择,下一台服务器时采用+1取余的方式。

# 5、自定义线程工厂

我们知道,在自定义线程工厂的时候都会指定线程的名字,以便于排查问题

Nacos中的NameThreadFactory使用AtomicInteger来维护名字中的自增顺序维护,代码如下:

public class NameThreadFactory implements ThreadFactory {
    
    private final AtomicInteger id = new AtomicInteger(0);
    
    private String name;
    
    public NameThreadFactory(String name) {
        if (!name.endsWith(StringUtils.DOT)) {
            name += StringUtils.DOT;
        }
        this.name = name;
    }
    
    @Override
    public Thread newThread(Runnable r) {
        String threadName = name + id.getAndIncrement();
        Thread thread = new Thread(r, threadName);
        thread.setDaemon(true);
        return thread;
    }
}

类似的操作类还有指标监控类MetricsMonitor等等

Last Updated: 7/1/2022, 1:08:38 PM