Skip to content

并发工具类(上)

并发编程的核心问题:

  • 互斥: 同一时刻只允许一个线程访问共享资源==>Lock
  • 同步: 线程之间的通信以及协作==>Condition

隐藏在并发包中的管程

再造管程的理由

再造管程的理由 为啥有了synchronized还要lock 针对死锁问题, 有破坏"不可抢占条件"的方案, 即当线程申请不到资源的时候, 可以释放已占有的资源 但synchronized做不到, 如果申请不到, 线程会直接进入阻塞状态, 如果重新设计一把锁解决此问题, 有三种方案: 1.能够响应中断. 当给阻塞的线程发送中断的信号时, 能够唤醒他, 那他就有机会释放占用的资源; 2.支持超时. 若线程一段时间内没获取到锁, 不是进入阻塞状态, 而是返回一个错误, 那这个线程也有机会释放曾经持有的锁; 3.非阻塞的获取锁. 如果尝试获取锁失败, 并不进入阻塞状态. 以上对应lock接口的三个方法:

// 支持中断的API
void lockInterruptibly() 
  throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) 
  throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

如何保证可见性

  • synchronized: synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。
  • Lock: 利用了 volatile 相关的 Happens-Before 规则
  • Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)
class SampleLock {
  volatile int state;
  // 加锁
  lock() {
    // 省略代码无数
    state = 1;
  }
  // 解锁
  unlock() {
    // 省略代码无数
    state = 0;
  }
}

如下面示例代码: 1.顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock(); 2.volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作; 3.传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。 因此, 线程 T1 对 value 进行了 +=1 操作,后续的线程 T2 能够看到 value 的正确结果

class X {
  private final Lock rtl =
  new ReentrantLock();
  int value;
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value+=1;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

什么是可重入锁

ReentrantLock, 可重入锁, 线程可以重复获取同一把锁 示例代码: 在1处加锁, 在2处再次加锁

class X {
  private final Lock rtl =
  new ReentrantLock();
  int value;
  public int get() {
    // 获取锁
    rtl.lock();         ②
    try {
      return value;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value = 1 + get(); ①
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

可重入函数: 多个线程可以同时调用该函数, 每个线程都能得到正确结果; 同时在一个线程内支持线程切换, 无论被切换多少次, 结果都是正确的. 即可重入函数是线程安全的

公平锁与非公平锁

ReentrantLock的两个构造函数, true表示构造一个公平锁

//无参构造函数:默认非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
//根据公平策略参数创建锁
public ReentrantLock(boolean fair){
    sync = fair ? new FairSync() 
                : new NonfairSync();
}

管程中谈到入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。 如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

用锁的最佳实践 出自Doug Lea《Java 并发编程:设计原则与模式》一书:

  • 永远只在更新对象的成员变量时加锁
  • 永远只在访问可变的成员变量时加锁
  • 永远不在调用其他对象的方法时加锁

示例代码: 存在活锁, A,B两账户相互转账,各自持有自己lock的锁,都一直在尝试获取对方的锁,形成了活锁。这个例子可以稍微改下,成功转账后应该跳出循环。加个随机重试时间避免活锁

class Account {
  private int balance;
  private final Lock lock
          = new ReentrantLock();
  // 转账
  void transfer(Account tar, int amt){
    while (true) {
      if(this.lock.tryLock()) {
        try {
          if (tar.lock.tryLock()) {
            try {
              this.balance -= amt;
              tar.balance += amt;
            } finally {
              tar.lock.unlock();
            }
          }//if
        } finally {
          this.lock.unlock();
        }
      }//if
    }//while
  }//transfer
}

Dubbo如何用管程实现异步转同步

Condition实现了管程模型里的条件变量 Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的

同步与异步 调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。 同步是Java代码默认的处理方式, 让程序支持异步: 1.调用方创建一个子线程, 在子线程中执行方法调用, 这种调用称为异步调用; 2.方法实现的时候, 创建一个新的线程执行主要逻辑, 主线程直接return, 这种方法我们一般称为异步方法.

利用两个条件变量快速实现阻塞队列:

public class BlockedQueue<T>{
  final Lock lock =new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty =lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。 但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。

Dubbo的异步转同步: 1.通过对调用时等待结果的堆进行快照分析, 与DefaultFuture.get()有关 1744875416025 2.分析get()方法

public class DubboInvoker{
  Result doInvoke(Invocation inv){
    // 下面这行就是源码中108行
    // 为了便于展示,做了修改
    return currentClient 
      .request(inv, timeout)
      .get();
  }
}
// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
  long start = System.nanoTime();
  lock.lock();
  try {
  while (!isDone()) {
    done.await(timeout);
      long cur=System.nanoTime();
    if (isDone() || cur-start > timeout){
      break;
    }
  }
  } finally {
  lock.unlock();
  }
  if (!isDone()) {
  throw new TimeoutException();
  }
  return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
  return response != null;
}
// RPC结果返回时调用该方法   
private void doReceived(Response res) {
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signalAll();
    }
  } finally {
    lock.unlock();
  }
}

本质: 当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。

Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。

Semaphore: 如何快速实现一个限流器

Semaphore, 信号量 1744875509071 描述代码化:

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      //将当前线程插入等待队列
      //阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      //移除等待队列中的某个线程T
      //唤醒线程T
    }
  }
}

补充: 信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。

如何使用信号量

static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

理解: 当T1和T2两个线程访问addOne()方法时, 1.由于acquire()是一个原子性操作, 假设T1线程将计数器-1, 计数器变为0; T2线程再-1变成-1, T2线程将阻塞, 进入等待队列; 2.T1执行完后release(), 将计数器加1变为0, 即将T2从等待队列中移除, T2将被唤醒接着执行.

快速实现一个限流器

Semaphore 可以允许多个线程访问一个临界区。

需求: 池化资源, 例如线程池, 连接池等, 在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的. 实现对象池, 一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。限流,指的是不允许多于 N 个线程同时进入临界区. 信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。

class ObjPool<T, R> {
  final List<T> pool;
  // 用信号量实现限流器
  final Semaphore sem;
  // 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}
// 创建对象池
ObjPool<Long, String> pool = 
  new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

ReadWriteLock:如何快速实现一个完备的缓存?

什么是读写锁?

读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则: ● 允许多个线程同时读共享变量; ● 只允许一个线程写共享变量; ● 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

快速实现一个缓存

用 ReadWriteLock 快速实现一个通用的缓存工具类。

class Cache<K,V> {
  final Map<K, V> m =new HashMap<>();
  final ReadWriteLock rwl =new ReentrantReadWriteLock();
  // 读锁
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  // 读缓存
  V get(K key) {
    r.lock();
    try { return m.get(key); }
    finally { r.unlock(); }
  }
  // 写缓存
  V put(K key, V value) {
    w.lock();
    try { return m.put(key, v); }
    finally { w.unlock(); }
  }
}

实现缓存的按需加载

class Cache<K,V> {
  final Map<K, V> m =new HashMap<>();
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  final Lock r = rwl.readLock();
  final Lock w = rwl.writeLock();
 
  V get(K key) {
    V v = null;
    //读缓存
    r.lock();         ①
    try {
      v = m.get(key); ②
    } finally{
      r.unlock();     ③
    }
    //缓存中存在,返回
    if(v != null) {   ④
      return v;
    }  
    //缓存中不存在,查询数据库
    w.lock();         ⑤
    try {
      //再次验证
      //其他线程可能已经查询过数据库
      v = m.get(key); ⑥
      if(v == null){  ⑦
        //查询数据库
        v=省略代码无数
        m.put(key, v);
      }
    } finally{
      w.unlock();
    }
    return v; 
  }
}

读写锁的升级与降级

如下面代码, ①处获取读锁,在②处如果缓存不存在则升级为写锁, 更新缓存后再释放写锁, 最后在③处释放读锁

//读缓存
r.lock();         ①
try {
  v = m.get(key); ②
  if (v == null) {
    w.lock();
    try {
      //再次验证并更新缓存
      //省略详细代码
    } finally{
      w.unlock();
    }
  }
} finally{
  r.unlock();     ③
}

这称为锁的升级。可惜 ReadWriteLock 并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个一定要注意。

但是允许锁的降级, 以下代码来源自 ReentrantReadWriteLock 的官方示例,略做了改动。


class CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReadWriteLock rwl =
    new ReentrantReadWriteLock();
  // 读锁  
  final Lock r = rwl.readLock();
  //写锁
  final Lock w = rwl.writeLock();
  
  void processCachedData() {
    // 获取读锁
    r.lock();
    if (!cacheValid) {
      // 释放读锁,因为不允许读锁的升级
      r.unlock();
      // 获取写锁
      w.lock();
      try {
        // 再次检查状态  
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // 释放写锁前,降级为读锁
        // 降级是可以的
        r.lock(); ①
      } finally {
        // 释放写锁
        w.unlock(); 
      }
    }
    // 此处仍然持有读锁
    try {use(data);} 
    finally {r.unlock();}
  }
}

补充: 写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。

如何解决缓存数据与源头数据的同步问题?

  • 超时机制, 为缓存中的数据设置生存时间
  • 源头数据发生变化时反馈给缓存
  • 操作数据时, 同时写数据库和缓存