java中无锁cas操作原理分析


[toc]

java中无锁cas操作原理分析

多线程下volatile合AtomicInteger操作实例

实例一:volatile修饰的计数器

volatile无法保证线程的原子性:参考


package com.nemo.thread.automic;

public class Counter1 implements Runnable{

public volatile int count = 0;

public static void main(String[] args) throws InterruptedException {

//同时启动1000个线程,去进行i++计算,看看实际结果 Counter1 count = new Counter1(); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(count,"thread:"+i); thread.start(); } //确保线程都运行完成 Thread.sleep(1000); System.out.println("运行结果:" + count.count); } @Override public void run() { //这里延迟1毫秒,使得结果明显 try { Thread.sleep(1); } catch (InterruptedException e) { } count = count +1; } } //该代执行结果: //运行结果:994

实例二:AtomicInteger修饰的计数器

package com.nemo.thread.automic;

import java.util.concurrent.atomic.AtomicInteger;

public class Counter implements Runnable{ public AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { //同时启动1000个线程,去进行i++计算,看看实际结果 Counter count = new Counter(); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(count,"thread:"+i); thread.start();; } Thread.sleep(1000); //确保线程都运行完成 System.out.println("运行结果:" + count.count.get()); } @Override public void run() { //这里延迟1毫秒,使得结果明显 try { Thread.sleep(1); } catch (InterruptedException e) { } count.incrementAndGet(); } } //结果:运行结果:1000

结果分析

通过上面两个实例分析,可以得出 volatile运行的结果基本上不会得到我们的预期结果值:1000,因为 volatile不能保证操作的原子性。而 AtomicInteger可以保证操作的原子性,因为 AtomicInteger使用了CAS。接下来解释什么是CAS。

cas原理

比较与交换

CAS有三个参数,内存位置V、旧的预期值A、新的预期值B。 当且仅当V符合预期值A的时候,CAS用新值B原子化的更新V的值;否则他什么都不做。在任何情况下都会返回V的真实值。(这个变量称为compare-and-set,无论操作是否成功都会返回。)CAS的意思是,“ 我任务V的值应该是A,如果是A则将其赋值给B,若不是,则不修改,并告诉我应该为多少。”CAS是以项乐观技术--它抱着成功的希望进行更新,并且如果, 另一个线程在上次检查后更新了变量,它能够发现错误。

当多个线程试图使用CAS同时更新相同的变量时,其中一个会胜出,并更新变量的值,而其他的都会失败。失败的线程不会被挂起。

模拟CAS操作

下面的代码用来模拟CAS操作,该操作就是一个竞争,只有一个线程可以竞争成功。

public class SimulatedCAS {
    private int value;
    public synchronized int get() {
        return value;
    }
    public synchronized int compareAndSwap(int expectValue,int newValue){
        int oldValue = value;
        if(oldValue==expectValue){
            value = newValue;
        }
        return oldValue;
    }
    public synchronized boolean compareAndSet(int expectValue,int newValue){
        return (expectValue == compareAndSwap(expectValue,newValue));
    }
}

上述SimulatedCAS简单的模拟了CAS操作。 使用CAS的典型模式是:首先从V中读取A,由A生成新值B,然后使用CAS原子化的把V的值由A变成B,并且期间不能有其他线程改变V的值。因为CAS能够发现来自其他线程的干扰,所以即使不用锁,它也能够解决原子化的实现读-写-改的问题。

使用CAS实现的非阻塞计数器

package com.nemo.thread.cas;

/* * Created by nemo on 15/12/4. / public class CasCounter { private SimulatedCAS value; public CasCounter(){ value = new SimulatedCAS(); } public int getValue(){ return value.get(); } public int increment(){ int v; do{ v = value.get(); }while (v != value.compareAndSwap(v,v+1)); return v+1; } }

客户端调用计数器

public class CasCounterClient implements Runnable{
    CasCounter cas;
    public CasCounterClient(){
        cas = new CasCounter();
    }
    @Override
    public void run() {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cas.increment();
    }
    public static void main(String[] args) throws InterruptedException {
        CasCounterClient client = new CasCounterClient();
        for(int i=0;i<1000;i++){
            Thread thead = new Thread(client,"name:"+i);
            thead.start();
        }
        Thread.sleep(1000);
        System.out.println(client.cas.getValue());
    }

}

//结果:输入1000

AutomicInteger源码分析

参考:http://www.blogjava.net/mstar/archive/2013/04/24/398351.html

1、incrementAndGet的实现

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

首先可以看到他是通过一个无限循环(spin)直到increment成功为止.
循环的内容是 1.取得当前值 2.计算+1后的值 3.如果当前值还有效(没有被)的话设置那个+1后的值 4.如果设置没成功(当前值已经无效了即被别的线程改过了), 再从1开始.

2、compareAndSet的实现

 public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

直接调用的是UnSafe这个类的compareAndSwapInt方法 全称是sun.misc.Unsafe. 这个类是Oracle(Sun)提供的实现. 可以在别的公司的JDK里就不是这个类了

3、compareAndSwapInt的实现

/*
     * Atomically update Java variable to <tt>x</tt> if it is currently
     * holding <tt>expected</tt>.
     * @return <tt>true</tt> if successful
     /
    public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);

可以看到, 不是用Java实现的, 而是通过JNI调用操作系统的原生程序

4、compareAndSwapInt的native实现,如果你下载了OpenJDK的源代码的话在hotspot\src\share\vm\prims\目录下可以找到unsafe.cpp

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

可以看到实际上调用Atomic类的cmpxchg方法

5、 Atomic的cmpxchg 这个类的实现是跟操作系统有关, 跟CPU架构也有关, 如果是windows下x86的架构 实现在hotspot\src\os_cpu\windows_x86\vm\目录的atomic_windows_x86.inline.hpp文件里

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

在这里可以看到是用嵌入的汇编实现的, 关键CPU指令是 cmpxchg 到这里没法再往下找代码了. 也就是说CAS的原子性实际上是CPU实现的. 其实在这一点上还是有排他锁的. 只是比起用synchronized, 这里的排他时间要短的多. 所以在多线程情况下性能会比较好. 代码里有个alternative for InterlockedCompareExchange 这个InterlockedCompareExchange是WINAPI里的一个函数, 做的事情和上面这段汇编是一样的 http://msdn.microsoft.com/en-us/library/windows/desktop/ms683560%28v=vs.85%29.aspx

6、最后再贴一下x86的cmpxchg指定

Opcode CMPXCHG

CPU: I486+ Type of Instruction: User

Instruction: CMPXCHG dest, src

Description: Compares the accumulator with dest. If equal the "dest" is loaded with "src", otherwise the accumulator is loaded with "dest".

Flags Affected: AF, CF, OF, PF, SF, ZF

CPU mode: RM,PM,VM,SMM +++++++++++++++++++++++ Clocks: CMPXCHG reg, reg 6 CMPXCHG mem, reg 7 (10 if compartion fails)

构建原子队列

参考:java.util.concurrent.ConcurrentLinkedQueue类

构建原子栈

public class ConcurrentStack<E> {
    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
    /
     * 入栈,把当前元素复制为新元素的下一个元素,把新元素赋值为当前元素
     *
     * @param item
     */
    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        while (true) {
            oldHead = top.get();
            newHead.next = oldHead;
            if (top.compareAndSet(oldHead, newHead)) {
                return;
            }
        }
    }
    /
     * 出栈,获取当前元素,并且把当前元素的下一个元素作为当前元素
     *
     * @return
     */
    public E pop() {
        while (true) {
            Node<E> oldHead = top.get();
            if (oldHead == null) {
                return null;
            }
            Node<E> newHead = oldHead.next;
            if (top.compareAndSet(oldHead, newHead)) {
                return oldHead.item;
            }
        }
    }
    private static class Node<E> {
        public final E item;
        public Node<E> next;
        public Node(E item) {
            this.item = item;
        }
    }
}

缺点

1、ABA问题:CAS操作容易导致ABA问题,也就是在做a++之间,a可能被多个线程修改过了,只不过回到了最初的值,这时CAS会认为a的值没有变。a在外面逛了一圈回来,你能保证它没有做任何坏事,不能!!也许它讨闲,把b的值减了一下,把c的值加了一下等等,更有甚者如果a是一个对象,这个对象有可能是新创建出来的,a是一个引用呢情况又如何,所以这里面还是存在着很多问题的,解决ABA问题的方法有很多,可以考虑增加一个修改计数,只有修改计数不变的且a值不变的情况下才做a++,也可以考虑引入版本号,当版本号相同时才做a++操作等,这和事务原子性处理有点类似! 2、比较花费CPU资源,即使没有任何争用也会做一些无用功。 3、会增加程序测试的复杂度,稍不注意就会出现问题。

使用场合

可以用CAS在无锁的情况下实现原子操作,但要明确应用场合, 非常简单的操作且又不想引入锁可以考虑使用CAS操作,当想要 非阻塞地完成某一操作也可以考虑CAS。 不推荐在复杂操作中引入CAS,会使程序可读性变差,且难以测试,同时会出现ABA问题

Unsafe类常用CAS方法

/
比较并更新对象的某一个整数类型的域
@param obj 被操作的对象
@param fieldoffset 被操作的域在对象中的偏移量
@param expect 域的期望值
@param update 域的更新值
/
boolean compareAndSwapInt(Object obj,long fieldoffset, int expect, int update);

/ 比较并更新对象的某一个对象类型的域 @param obj 被操作的对象 @param fieldoffset 被操作的域在对象中的偏移量 @param expect 域的期望值 @param update 域的更新值 /
boolean compareAndSwapObject(Object obj,long Fieldoffset, Object expect, Object update);

/ 获得对象某个域的偏移量 @param field 要获得偏移量的域 */
long objectFieldOffset (Field field);

/ 使当前线程在指定的等待时间之前一直等待 @param flag 等待时间类型 true代表绝对时间(用相对于历元 (Epoch) 的毫秒数值表示),false代表相对时间 @param time 等待的时间,单位毫秒 */
void park(boolean flag, long time);

/ 取消指定线程的等待 @param thread 被取消等待的线程 */
void unpark(Thread thread)

/ 通过偏移量,获取某个对象Object类型的域 @param obj 被操作的对象 @param fieldoffset 偏移量 /
Object getObject(Object obj,long fieldoffset);

/ 通过偏移量,获取某个对象整数类型的域 @param obj 被操作的对象 @param fieldoffset 偏移量 /
int getInt(Object obj,long fieldoffset);

nemotan /
Published under (CC) BY-NC-SA in categories 高并发  tagged with 高并发