Java 锁模型

继上一篇文章讲述JAVA同步模型,如没有看过可以先看上一篇。

JAVA中使用锁确是非常的频繁,与此同时,JAVA还提供了多种方式进行同步互斥和协作。

总的来说,JAVA中锁分为两种

# 隐式锁 / 内置锁 / 监视器锁 / 监视器

监视器是由Per Brich Hansen和Tony Hoare提出的概念,Java以不精确的方式采用了它,也就是Java中的每个对象有一个内部的锁和内部条件。如果一个方法用synchronized关键字声明,那么,它表现的就像一个监视器方法。通过wait/notifyAll/nofify来访问条件变量

其实就是 Synchronized 关键字的用法。Synchronized 修饰符利用的其实就是Object对象内置的监视器,通过获得监视器进行同步操作。它允许线程同时互斥和协作,使用了 Wait-Set 思想,亦即是Object 的 Wait and Notify 监视器进行合作。当线程进入了同步代码块后,由于某些原因需要释放锁并等待(wait()),但是它将会继续监视(进入 wait-set ),当有其他获得锁的线程进行notify()时,该线程可以重新获得锁并继续进行。比如解决生产者/消费者问题,就可以通过该用法轻松解决。

如下,模拟了一个生产者每三秒生产一个字符,同时一个消费者每秒消费一个字符,状态模拟了供不应求的时候,这时候消费者会经常在等待生产。

package syn_demo.blog;

import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Slf4j
@SuppressWarnings("InfiniteLoopStatement")
public class SynDemo {
    private static final int MAX_SIZE = 10;
    private static final LinkedList<Character> list = new LinkedList<>();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture
                .allOf(CompletableFuture.runAsync(new Producer()),
                        CompletableFuture.runAsync(new Consumer()))
                .get();
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (list) {
                        while (list.size() >= MAX_SIZE) {
                            log.info("Waiting to add. ");
                            list.wait();
                        }

                        Character ch = (char) (int) (Math.random() * 26 + 97);
                        list.add(ch);
                        log.info("Successfully add: " + ch);
                        list.notify();
                    }
                    Thread.sleep(3000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    synchronized (list) {
                        while (list.isEmpty()) {
                            log.info("Waiting to remove. ");
                            list.wait();
                        }

                        log.info("Successfully remove: " + list.remove());
                        list.notify();
                    }
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在隐式锁中最主要的几个关键字是synchronizedwait()notify()notifyAll()Thread.sleep()

  • synchronized就是获取并进入监视器的声明
  • wait()方法使得已进入监视器的线程放弃锁并等待唤醒
  • notify()方法会唤醒一个正在等待的线程
  • notifyAll()方法会唤醒所有等待的线程并开始竞争锁
  • Thread.sleep()暂停当前线程一定时间,不释放锁,暂停结束后进入就绪状态,也就是运行时间不是精确的。

notify()notifyAll()看上去差不多,但是一个线程和所有线程开始竞争的意义是不同的,所以使用的效果相去甚远。一个新的例子说明区别

package syn_demo.temp;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NotifyEtNotifyAll {
    public static void main(String[] args) throws InterruptedException {
        Object object = new Object();

        // 新建10进程, 并全部进入等待
        // 注意! 这里并不通知等待进程
        int i = 10;
        while (i > 0) {
            int j = i;
            new Thread(() -> {
                synchronized (object) {
                    try {
                        object.wait();
                        log.info("Thread start : " + j);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            i--;
        }

        // 第二个进程获得了锁, 并睡眠三秒
        // notifyAll 然所有进程开始竞争
        new Thread(() -> {
            synchronized (object) {
                try {
                    log.info("thread 1 start sleep");
                    Thread.sleep(1000);
                    log.info("finish");
                    object.notify();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

//18:12:56.445 [Thread-11] INFO  syn_demo.temp.NotifyEtNotifyAll - thread 1 start sleep
//18:12:57.449 [Thread-11] INFO  syn_demo.temp.NotifyEtNotifyAll - finish
//18:12:57.449 [Thread-2] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 9

新建了十个等待线程,同时用另外一个线程notify(),会随机唤醒一个等待的线程,同时其他线程仍旧等待,因为没有线程唤醒他们了。如果换成notifyALL()

package syn_demo.temp;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NotifyEtNotifyAll {
    public static void main(String[] args) throws InterruptedException {
        Object object = new Object();

        // 新建10进程, 并全部进入等待
        // 注意! 这里并不通知等待进程
        int i = 10;
        while (i > 0) {
            int j = i;
            new Thread(() -> {
                synchronized (object) {
                    try {
                        object.wait();
                        log.info("Thread start : " + j);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            i--;
        }

        // 第二个进程获得了锁, 并睡眠三秒
        // notifyAll 然所有进程开始竞争
        new Thread(() -> {
            synchronized (object) {
                try {
                    log.info("thread 1 start sleep");
                    Thread.sleep(1000);
                    log.info("finish");
                    object.notifyAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

//18:20:03.180 [Thread-11] INFO  syn_demo.temp.NotifyEtNotifyAll - thread 1 start sleep
//18:20:04.183 [Thread-11] INFO  syn_demo.temp.NotifyEtNotifyAll - finish
//18:20:04.183 [Thread-10] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 1
//18:20:05.184 [Thread-9] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 2
//18:20:06.185 [Thread-8] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 3
//18:20:07.185 [Thread-7] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 4
//18:20:08.186 [Thread-6] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 5
//18:20:09.187 [Thread-5] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 6
//18:20:10.188 [Thread-4] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 7
//18:20:11.189 [Thread-3] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 8
//18:20:12.190 [Thread-2] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 9
//18:20:13.190 [Thread-1] INFO  syn_demo.temp.NotifyEtNotifyAll - Thread start : 10

可见全部进程都被唤醒了参与到锁的竞争。

# 显式锁

显式锁说的就是 JAVA 中的 Lock接口的一系列内容。JAVA 在 JDK5.0 引入了 Lock 及其子类(如ReentrantLock, ReadWriteLock……),以实现比内置锁更丰富和更细粒度的控制。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;

public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();
    
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

从定义上就能够看到基本的Lock能够实现synchronized所实现不了的中断锁、尝试锁、条件唤醒和等待。

再来看几个常见的类。

  • ReentrantLock

    ReentrantLock为可重入锁的实现,synchronizedReentrantLock都具有可重入性。可重入,指的是以线程为单位,当一个线程获取对象锁之后,这个线程可以再次获取本对象上的锁,而其他的线程是不可以的。

    用一个例子演示

    package syn_demo.blog;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class LockDemo {
       private ReentrantLock reentrantLock = new ReentrantLock();
    
       public static void main(String[] args) {
           LockDemo lockDemo = new LockDemo();
           lockDemo.out();
       }
    
       private void out() {
           reentrantLock.lock();
           log.info("enter out.");
           in();
           reentrantLock.unlock();
       }
    
       private void in() {
           reentrantLock.lock();
           log.info("enter in.");
           reentrantLock.unlock();
       }
    }
    
    // 14:03:20.091 [main] INFO  syn_demo.blog.LockDemo - enter out.
    // 14:03:20.094 [main] INFO  syn_demo.blog.LockDemo - enter in.
    
  • ReentrantReadWriteLock

    ReentrantReadWriteLock 继承了ReadWriteLock,而ReadWriteLock 有两个Lock类型成员readLockwriteLock

    顾名思义该锁是可重入读写锁,内部又读锁和写锁两个锁。多个线程可以同时获得读锁,但是不允许读线程和写线程同时获得锁,和写线程之间同时获得锁。相对于只有一个排它锁,这种方式大大提高了并发性,在读操作远多于写操作时候,使用读写锁是非常高效的。

    package syn_demo.blog;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.LinkedList;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    @Slf4j
    public class LockDemo {
        private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        private LinkedList<Object> linkedList = new LinkedList<>();
    
        public static void main(String[] args) {
    
        }
    
        private Object read() {
            reentrantReadWriteLock.readLock().lock();
            try {
                linkedList.remove();
            } finally {
                reentrantReadWriteLock.readLock().unlock();
            }
        }
    
        private void write(Object o) {
            reentrantReadWriteLock.writeLock().lock();
            try {
                linkedList.add(o);
            } finally {
                reentrantReadWriteLock.readLock().unlock();
            }
        }
    }
    

# 区别

显式锁需要手动在finally块中释放锁,而synchronized可以自动释放,所以显式锁在编程上会比隐式锁稍微麻烦。

lock.lock();
try {
    // do something
} finally {
    lock.unlock();
}

显式锁可以非阻塞加锁,通过tryLock()方法,一定情境下可以提高很大的并发性能。

package syn_demo.blog;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

@Slf4j
public class LockDemo {
    private ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        LockDemo lockDemo = new LockDemo();
        CompletableFuture
                .allOf(IntStream
                        .range(0, 10)
                        .mapToObj(i -> CompletableFuture.runAsync(lockDemo::tryyLock))
                        .toArray(CompletableFuture[]::new))
                .get();
    }

    private void tryyLock() {
        if (!lock.tryLock()) {
            log.info("failed");
            return;
        }

        log.info("success");
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

//15:01:23.542 [ForkJoinPool.commonPool-worker-11] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-8] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-9] INFO  syn_demo.blog.LockDemo - success
//15:01:23.542 [ForkJoinPool.commonPool-worker-13] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-15] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-2] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-1] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-10] INFO  syn_demo.blog.LockDemo - failed
//15:01:23.542 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.LockDemo - failed

显式锁可以响应中断,使用lockInterruptibly当线程被中断将会抛出InterruptedException,可以进行其他操作。而synchronized尝试获取锁的时候无法响应中断。

如图,b线程尝试获取锁的时候,主线程无法中断b线程,b线程依旧阻塞。

  • synchronized

    package syn_demo.blog;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class LockDemo {
        private final Object object = new Object();
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo lockDemo = new LockDemo();
            new Thread(lockDemo::a).start();
    
            Thread.sleep(1000);
            Thread thread = new Thread(lockDemo::b);
            thread.start();
    
            log.info("try interrupt");
            thread.interrupt();
        }
    
        private void a() {
            log.info("run a");
            synchronized (object) {
                log.info("get lock");
                try {
                    Thread.sleep(1000000000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void b() {
            log.info("run b");
            synchronized (object) {
                log.info("b");
            }
            log.info("exit b");
        }
    }
    
    //14:53:31.240 [Thread-1] INFO  syn_demo.blog.LockDemo - run a
    //14:53:31.243 [Thread-1] INFO  syn_demo.blog.LockDemo - get lock
    //14:53:32.235 [main] INFO  syn_demo.blog.LockDemo - try interrupt
    //14:53:32.235 [Thread-2] INFO  syn_demo.blog.LockDemo - run b
    
  • lock

    package syn_demo.blog;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    public class LockDemo {
        private final ReentrantLock reentrantLock = new ReentrantLock();
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo lockDemo = new LockDemo();
            new Thread(lockDemo::a).start();
    
            Thread.sleep(1000);
            Thread thread = new Thread(lockDemo::b);
            thread.start();
    
            log.info("try interrupt");
            thread.interrupt();
        }
    
        private void a() {
            log.info("run a");
            reentrantLock.lock();
            log.info("get lock");
            try {
                Thread.sleep(1000000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }
    
        private void b() {
            log.info("run b");
            try {
                reentrantLock.lockInterruptibly();
            } catch (InterruptedException e) {
                log.info("exit b");
            }
            try {
                log.info("b");
            } finally {
                reentrantLock.unlock();
                log.info("exit b");
            }
        }
    }
    
    //15:06:15.887 [Thread-1] INFO  syn_demo.blog.LockDemo - run a
    //15:06:15.889 [Thread-1] INFO  syn_demo.blog.LockDemo - get lock
    //15:06:16.884 [main] INFO  syn_demo.blog.LockDemo - try interrupt
    //15:06:16.884 [Thread-2] INFO  syn_demo.blog.LockDemo - run b
    //15:06:16.885 [Thread-2] INFO  syn_demo.blog.LockDemo - exit b
    

显式锁具有Condition,通常译作条件对象,Condition的作用类似监视器锁中的wait()/notify() ,不过粒度更加细,功能更丰富。

  • 粒度

    还是拿消费者/生产者例子说明,上文中只有生产者和消费者分别都只有一个,使用notify()唤醒的其确实对方,而如果情况转变为有多个生产者和消费者,就有可能唤醒的不是想唤醒的对象,比如生产者线程想要唤醒消费者线程,但是synchronized它只能够唤醒想要获取该锁的线程,却不能分得清消费者和生产者。这时候Condition就可以做得到如此粒度,还可以更加精细。

    package syn_demo.blog;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.LinkedList;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    @Slf4j
    @SuppressWarnings("InfiniteLoopStatement")
    public class SynDemo {
        private static final int MAX_SIZE = 10;
        private static final LinkedList<Character> list = new LinkedList<>();
        private static ReentrantLock reentrantLock = new ReentrantLock();
        private static Condition canConsume = reentrantLock.newCondition();
        private static Condition canProduce = reentrantLock.newCondition();
    
        public static void main(String[] args) {
            CompletableFuture.allOf(
                    CompletableFuture.runAsync(new Producer()),
                    CompletableFuture.runAsync(new Producer()),
                    CompletableFuture.runAsync(new Producer()),
                    CompletableFuture.runAsync(new Consumer()),
                    CompletableFuture.runAsync(new Consumer()),
                    CompletableFuture.runAsync(new Consumer()))
                    .join();
        }
    
        static class Producer implements Runnable {
            @Override
            public void run() {
                try {
                    while (true) {
                        reentrantLock.lock();
                        while (list.size() >= MAX_SIZE) {
                            log.info("Waiting to add. ");
                            canProduce.await();
                        }
    
                       Character ch = (char) (int) (Math.random() * 26 + 97);
                        list.add(ch);
                        log.info("Successfully add: " + ch);
                        canConsume.signalAll();
                        reentrantLock.unlock();
                        Thread.sleep(3000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Consumer implements Runnable {
            @Override
            public void run() {
                try {
                    while (true) {
                        reentrantLock.lock();
                        while (list.isEmpty()) {
                            log.info("Waiting to consume. ");
                            canConsume.await();
                        }
    
                       log.info("Successfully consume: " + list.remove());
                        canProduce.signalAll();
                        reentrantLock.unlock();
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    //16:41:01.773 [ForkJoinPool.commonPool-worker-9] INFO  syn_demo.blog.SynDemo - Successfully add: c
    //16:41:01.778 [ForkJoinPool.commonPool-worker-2] INFO  syn_demo.blog.SynDemo - Successfully add: c
    //16:41:01.779 [ForkJoinPool.commonPool-worker-11] INFO  syn_demo.blog.SynDemo - Successfully add: u
    //16:41:01.779 [ForkJoinPool.commonPool-worker-13] INFO  syn_demo.blog.SynDemo - Successfully consume: c
    //16:41:01.780 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.SynDemo - Successfully consume: c
    //16:41:01.781 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.SynDemo - Successfully consume: u
    //16:41:02.780 [ForkJoinPool.commonPool-worker-13] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:02.781 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:02.781 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:04.778 [ForkJoinPool.commonPool-worker-9] INFO  syn_demo.blog.SynDemo - Successfully add: t
    //16:41:04.779 [ForkJoinPool.commonPool-worker-13] INFO  syn_demo.blog.SynDemo - Successfully consume: t
    //16:41:04.779 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:04.779 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:04.779 [ForkJoinPool.commonPool-worker-2] INFO  syn_demo.blog.SynDemo - Successfully add: r
    //16:41:04.779 [ForkJoinPool.commonPool-worker-11] INFO  syn_demo.blog.SynDemo - Successfully add: p
    //16:41:04.780 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.SynDemo - Successfully consume: r
    //16:41:04.780 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.SynDemo - Successfully consume: p
    //16:41:05.779 [ForkJoinPool.commonPool-worker-13] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:05.780 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:05.780 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:07.779 [ForkJoinPool.commonPool-worker-9] INFO  syn_demo.blog.SynDemo - Successfully add: a
    //16:41:07.779 [ForkJoinPool.commonPool-worker-13] INFO  syn_demo.blog.SynDemo - Successfully consume: a
    //16:41:07.779 [ForkJoinPool.commonPool-worker-4] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:07.779 [ForkJoinPool.commonPool-worker-6] INFO  syn_demo.blog.SynDemo - Waiting to consume. 
    //16:41:07.780 [ForkJoinPool.commonPool-worker-2] INFO  syn_demo.blog.SynDemo - Successfully add: z
    //16:41:07.780 [ForkJoinPool.commonPool-worker-11] INFO  syn_demo.blog.SynDemo - Successfully add: h
    
  • awaitUntil(Date deadline):若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态(响应中断)。若指定时间内未收到通知,则返回0或负数。

  • awaitUninterruptibly():调用该方法后,结束等待的唯一方法是其它线程调用该条件对象的signal()signalALL()方法。等待过程中如果当前线程被中断,该方法仍然会继续等待,同时保留该线程的中断状态(不响应中断) 。

显式锁可以指定公平策略,而隐式锁是不公平策略而且不可改变的。亦即是synchronized 是可以插队的,当某个申请锁的线程如果锁恰好可用,则直接进入监视器。非公平策略可以一定程度上节省了上下文的切换,在锁竞争激烈的时候,非公平策略可以提高吞吐量。

要注意的是Condition一样有waitsignalAllsignal方法,使用方法如同waitnotifynotifyAll

同时在 JDK1.6 以后的版本中synchronized改进了不少,一样条件下是比Lock会块的。但是如果你需要读写分离又或者粒度控制的话,就可以用Lock锁实现比synchronized更高效的代码。

# 总结

本篇讲的是Java 中的同步机制的语法以及使用,但是我觉得这个东西我越学越难=。=,因为再往下去涉及到类似AQS、CAS之类的算法,自己还没有能力完全分析和解读。等以后还有时间通读,再写下篇吧。