whitesock 发表于 2013-2-3 11:25:13

Inside AbstractQueuedSynchronizer (2)

Inside AbstractQueuedSynchronizer (1)
Inside AbstractQueuedSynchronizer (2)
Inside AbstractQueuedSynchronizer (3)
Inside AbstractQueuedSynchronizer (4)
 
3 AbstractQueuedSynchronizer
 
3.1 Inheritance
    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer继承自Object并且实现了Serializable接口,它只有一个成员变量private transient Thread exclusiveOwnerThread以及对应的getter/setter方法。该成员变量用于保存当前拥有排他访问权的线程。需要注意的是,该成员变量没有用volatile关键字修饰。

3.2 State
    AbstractQueuedSynchronizer用一个int(private volatile int state)来保存同步状态,以及对应的getter/setter/compareAndSetState方法。Java6新增了一个AbstractQueuedLongSynchronizer,它用一个long来保存同步状态,貌似目前没有被java.util.concurrent中的其它synchronizer所使用。对于ReentrantLock,state为0则意味着锁没有被任何线程持有;否则state保存了持有锁的线程的重入次数。
 
3.3 WaitQueue
    WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的线程。它的实现是"CLH" (Craig, Landin, and Hagersten) lock queue的一个变种。
 
    标准的CLH lock queue通常被用来实现spin lock,它通过TheadLocal变量pred引用队列中的前一个节点(Node本身没有指向前后节点的引用),以下是标准的CLH lock queue的一个参考实现:
public class ClhSpinLock {    private final ThreadLocal<Node> pred;    private final ThreadLocal<Node> node;    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());    public ClhSpinLock() {      this.node = new ThreadLocal<Node>() {            protected Node initialValue() {                return new Node();            }      };      this.pred = new ThreadLocal<Node>() {            protected Node initialValue() {                return null;            }      };    }    public void lock() {      final Node node = this.node.get();      node.locked = true;      Node pred = this.tail.getAndSet(node);      this.pred.set(pred);      while (pred.locked) {}    }    public void unlock() {      final Node node = this.node.get();      node.locked = false;      this.node.set(this.pred.get());    }    private static class Node {      private volatile boolean locked;    }}    其逻辑并不复杂:对于lock操作,只需要通过一个CAS操作即可将当前线程对应的节点加入到队列中,并且同时获得了predecessor节点的引用,然后就是等待predecessor释放锁;对于unlock操作,只需要将当前线程对应节点的locked成员变量设置为false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node对象,这是对GC友好的一个优化。如果不考虑这个优化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解决了cache-coherence traffic的问题:每个线程在busy loop的时候,并没有竞争同一个状态,而是只判断其对应predecessor的锁定状态。如果你担心false sharing问题,那么可以考虑将锁定状态padding到cache line的长度。此外,CLH spin lock通过FIFO的队列保证了锁竞争的公平性。
 
    AbstractQueuedSynchronizer的静态内部类Node维护了一个FIFO的等待队列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能。sucessor的作用是为了支持线程阻塞:在Inside AbstractQueuedSynchronizer (1)中提到过,AbstractQueuedSynchronizer通过LockSupport实现了线程的block/unblock,因此需要通过successor引用找到后续的线程并将其唤醒(CLH spin lock因为是spin,所以不需要显式地唤醒)。此外,Node中还包括一个volatile int waitStatus成员变量用于控制线程的阻塞/唤醒,以及避免不必要的调用LockSupport的park/unpark方法。需要注意的是,虽然AbstractQueuedSynchronizer在绝大多数情况下是通过LockSupport进行线程的阻塞/唤醒,但是在特定情况下也会使用spin lock,static final long spinForTimeoutThreshold = 1000L这个静态变量设定了使用spin lock的一个阈值。
 
    对WaitQueue进行enqueue操作相关的代码如下:
private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {      node.prev = pred;      if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;      }    }    enq(node);    return node;}private Node enq(final Node node) {    for (;;) {      Node t = tail;      if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;      } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }      }    }}    addWaiter方法中的那个if分支,其实是一种优化,如果失败那么会调用enq方法进行enqueue。需要注意的是,以上代码中对next引用的设定是在enqueue成功之后进行的。这样做虽然没有并发问题,但是在判断一个node是否有sucessor时,不能仅仅通过next == null来判断,因为enqueue和设置next引用这两个步骤不是一个原子操作。
 
    对WaitQueue进行dequeue操作相关的代码如下:
private void setHead(Node node) {    head = node;    node.thread = null;    node.prev = null;}    setHead方法没有用到锁,也没有使用CAS,这样没有并发问题?没有,因为这个方法只会被持有锁的线程所调用,此时只需要将head指向持有锁的线程对应的node即可。
 
页: [1]
查看完整版本: Inside AbstractQueuedSynchronizer (2)