• 第03篇_并发编程

    第01章_线程基本操作

    第一节 线程的基本概念

    1. 创建和启动线程

    1) 直接自定义线程

    可以通过继承java.lang.Thread类,重写其run方法来直接定义一个线程。

     

    2) 定义Runnable实现类

    由于Java只支持单继承,因此一般都是定义java.lang.Runnable接口的实现类,然后创建Runnable的对象传给Thread对象去执行。

     

     

    2. 线程基本属性

     

    3. 线程状态

    线程状态为Thread.State枚举,有如下六种状态:

    线程状态状态说明
    NEW新建。线程创建但未启动。
    RUNNABLE可运行。线程在运行或具备运行条件只是在等待操作系统调度。
    BLOCKED阻塞。等待锁或IO等资源。
    WAITING等待。线程在等待某个条件,调用wait()/join()方法会进入该状态。
    TIMED_WAITING超时等待。线程在等待某个条件或超时,调用wait(long timeout)/join(long millis)/sleep(long millis)会进入该状态。
    TERMINATED终止。线程已结束。

    注意:

    1. RUNNABLE不代表CPU一定在执行该线程的代码,也可能在等待操作系统分配时间片,只是它没有在等待其他条件。

     

    4. 线程基本方法

     

    5. 线程共享内存

    多个线程可以对同一个变量进行读写操作,但由于读写是非原子性的以及CPU缓存等原因,可能形成竞态条件或出现内存可见性等问题。

     

    1) 竞态条件

    竞态条件(race condition)指当多个线程读写同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确。

    为什么会这样呢?因为counter++不是原子操作,它先取counter的当前值,然后在当前值的基础上加1,再赋值回counter,而在它取到当前值到赋新值之间,当前值也可能被其它线程取走。

     

    2) 内存可见性

    内存可见性指一个线程对共享变量的修改,另一个线程不一定马上就能看到,甚至永远也看不到。

    为什么会这样呢?由于CPU缓存等原因,可能修改没有及时同步到内存,也可能另一个线程根本就没从内存读。

     

     

    6. 线程的优点及成本

    多线程可以充分利用多核CPU的计算能力以及相关的硬件资源,因为一个线程在占用CPU时,另一个线程可以同时进行IO等操作。同理,由于多个执行流,对保持程序的响应性也十分有帮助,如GUI程序中,一个线程处理慢任务时,另一个线程还能够向用户反馈进度信息。

    但是,创建和启动线程需要消耗操作系统的资源,并且,线程调度和切换也是有成本的。因此,如果执行的任务都是CPU密集型的,即主要消耗的都是CPU,那创建超过CPU数量的线程就是没有必要的,并不会加快程序的执行。

     

     

    第二节 线程竞争

    1. 线程竞争及解决方案(synchronized)

    前面说到,当多个线程读写同一个变量时,可能形成竞态条件或出现内存可见性问题,这就是线程之间的竞争,在Java中,最简单的解决方式是使用synchronized关键字。

     

    1) 修饰实例方法

    上节我们介绍竞态条件时,了解到counter++是非原子操作,可能造成结果比预期偏小,现在我们可以对counter其进行包装,通过synchronized关键字修饰的实例方法来对其进行自增和读取操作,这样就始终和预期一致了。

    这里,synchronized到底做了什么呢?在进入synchronized实例方法前,会尝试对this对象进行加锁操作,当前线程不能获得锁的时候,它会进入等待队列(此时线程状态变为BLOCKED),这样来确保只有一个线程执行。

    注意:

    1. synchronized是对this对象加锁,而非对方法加锁

      • 当this对象不同时,是可以执行同一个synchronized实例方法的;

      • 当this对象相同时,即使是其它synchronized实例方法也是不可以执行的。

    2. synchronized机制不能防止非synchronized方法被同时执行,如Counter类添加一个非synchronized的decr方法,进行count--操作,是无法同步的,因此,一般在保护变量时,需要在所有访问该变量的方法上加上synchronized

     

    2) 修饰静态方法

    synchronized同样可以用于静态方法,比如:

    当修饰静态方法时,不能对this对象加锁了,它将对类对象(StaticCounter.class)进行加锁,加锁逻辑基本一致。

    注意:

    1. synchronized静态方法和synchronized实例方法对不同对象进行加锁,也就是说,不同的两个线程,可以一个执行synchronized静态方法,另一个执行synchronized实例方法。

     

    3) 修饰代码块

    除了用于修饰方法外,synchronized还可以用于修饰代码块,比如对于前面的Counter类,等价的代码可以为:

    此时,加锁的对象就是小括号里面的lock变量,它可以是任意对象。但是一般来说,在使用synchronized修饰代码块时,习惯使用this(实例方法中)或类对象(静态方法中)作为锁对象。

     

     

    2. synchronized的特性

    1) 可重入性

    synchronized方法或代码块是可重入的,即某个线程获得锁之后,再次调用synchronized方法可直接调用,它是通过记录锁的持有线程和持有数量来实现的

     

    2) 内存可见性

    synchronized还可以保证共享变量的内存可见性,在获得锁后,会始终从内存读取最新数据,在释放锁后,所有写入都会写回内存。

    但如果只需要保证内存可见性,而无需保证原子性,可以使用更轻量级的方式,那就是给共享变量加修饰符volatile

    加了volatile之后,Java会在操作对应变量时插入特殊的指令,保证读写到内存最新值,而非缓存的值。

     

    3) 可能死锁

    synchronized需进行加锁,当修饰代码块时,如果以不同顺序对多个对象进行加锁时,可能造成死锁。

    死锁可通过jstack -pid命令查看,我们在写程序时,应该尽量避免在持有一个锁的同时去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。在Java中,也有一些显式锁支持tryLock以及加锁超时的设定,也可以一定程度解决死锁问题。

     

    4) 无法响应中断

    获取synchronized锁时,如果未获取到,将进入BLOCK状态,该状态无法响应中断请求,而显式锁Lock则支持以响应中断的方式获取锁。

     

    5) 只有一个条件队列

    synchronized的协作机制wait/notify,只支持一个条件队列,如果有多个等待条件,只能共用,并且在通知时必须通知所有等待的线程。

     

    3. synchronized容器

    1) 基本使用

    Collections类有一些方法可以修饰普通容器,返回线程安全的同步容器,它们是给所有容器方法都加上synchronized来实现安全的。

    这里线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,不需要额外的同步操作,也不会出现错误的结果。

     

    2) 复合操作与伪同步

    加了synchronized,所有方法调用变成了原子操作,客户端在调用时,是不是就绝对安全了呢?不是的,至少有如下情况需要注意:

    上述案例中的原子操作显然是不能实现的,虽然map.get(key)和map.put(key, value)是原子的,但是它们组合后就不是原子的了

    那给putIfAbsent方法加上synchronized呢?也同样也不能实现原子操作,因为putIfAbsent方法的synchronized是对当前的EnhancedMap对象加锁,而map.get(key)和map.put(key, value)方法是对map加锁,这是一种伪同步

    但我们可以在putIfAbsent方法中使用synchronized代码块,并且使用map进行加锁。

     

    3) 并发修改异常

    此外,还需注意,虽然同步容器的单个操作是安全的,但是在增删元素时进行迭代操作依然可能会抛出ConcurrentModificationException

    同步容器并没有解决这个问题,如果要避免这个异常,需要在遍历的时候给整个容器对象加锁,如startIteratorThread可以改为:

     

    4) 性能问题

    除了以上这些注意事项,同步容器的性能也是比较低的,当并发访问量比较大的时候性能很差。在高并发场景,可以使用CopyOnWriteArrayList、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet等专为并发设计的容器类,它们都是线程安全的,但都没有使用synchronized、没有迭代问题、直接支持一些复合操作、性能也高得多。

     

     

    第三节 线程协作

    多线程之间除了竞争,还经常需要相互协作,本节就来介绍下多线程协作的基本机制wait/notify

     

    1. 线程协作的场景

    多线程之间需要协作的场景有很多,比如说:

    我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法wait/notify

     

    2. wait/notify机制

    在Java中,任意对象都可以参与线程协作,它们都有waitnotify方法,可以在synchronized代码块中调用:

    前面提到,当线程等待synchronized锁时,会将线程加入到锁对象的等待队列中。而线程在synchronized代码块内主动调用wait方法时,会将线程加入到另一个条件队列,并进行阻塞。

    调用wait方法后,将释放持有的锁,线程状态变为WAITING/TIMED_WAITING,当被唤醒后,需重新竞争锁。

    被阻塞的线程需要其它线程调用该对象的notify方法来将它唤醒。调用notify后,不会立即释放持有的锁,而是等待synchronzied代码块执行完后才释放。这意味着wait方法必须等到调用notify的那段同步代码执行结束后才可能获得锁响应通知。

    注意:

    1. wait/notify方法只能在synchronized代码块内被调用,否则将抛出IllegalMonitorStateException。

    2. join方法也可进行等待,如前面介绍的主线程等待子线程结束案例,当子线程运行结束时,由java系统调用notifyAll来通知。

    在使用wait/notify时,一定要明确wait等待的是什么?在满足什么条件后调用notify?并且需注意,从wait返回后,不一定表示等待的条件就满足了,仍需进行条件检查,因此,wait方法的一般调用模式为:

    接下来,我们通过一些场景来进一步理解wait/notify的应用。

     

    3. 生产者/消费者模式

    在生产者/消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就wait,而消费者从队列上取数据,如果队列为空也wait。我们将队列作为单独的类进行设计,代码如下:

    在上面代码中,生产者和消费者都调用了wait方法,但它们等待的条件是不一样的。生产者在队列为满时等待,而消费者在队列为空的时候等待,它们等待条件不同但又使用相同的条件队列,所以要调用notifyAll而不能调用notify,因为notify可能唤醒的恰好是同类线程。

    类似的,它们也都调用了notifyAll方法,但是需满足的条件也不一致,生产者在“有数据了”的条件下通知消费者,消费者在“有空位了”的条件下通知生产者。

    注意:synchronized的局限性

    1. synchronized获取锁的

    2. wait/notify协作机制,只能有一个条件等待队列,这是它的局限性,使的分析变得复杂,后面将会介绍支持多个条件的显式锁。

    3. 使用synchronized关键字获取锁的过程中不响应中断请求,这也是synchronized的局限性。

     

    4. 同时开始

    在同时开始模式中,协作的共享变量是一个开始信号。我们用一个类FireFlag来表示这个协作对象:

    子线程应该调用waitForFire()等待枪响,而主线程应该调用fire()发射比赛开始信号。代码如下所示:

     

    5. 等待结束

    在等待结束模式中,主线程与各个子线程协作的共享变量是一个数,这个数表示未结束线程个数。

    应用代码示例如下:

    注意:

    1. 可将线程计数初始值设置为1,由子线程调用await(),主线程调用countDown(),还可实现上面的“同时开始”模式。

     

    7. 集合点

    在集合点模式,协作的共享变量依然是一个数,这个数表示未到集合点的线程个数。

    多个游客线程,各自先独立运行,然后使用该协作对象到达集合点进行同步的示例代码如下:

     

     

    6. 异步结果

    异步结果模式依赖异步调用框架,主要由调用者执行器异步任务异步结果四个部分组成。

    其中异步任务和异步结果代码表示如下:

    执行器用于执行子任务并返回异步结果,使用执行器后调用者就无需创建并管理子线程了,其代码如下:

    调用者只需创建执行器,然后执行异步任务,即可得到异步结果对象。

     

     

    第四节 线程中断

    在Java中,停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,而是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出,本节我们主要就是来理解Java的中断机制。

     

    1. 线程中断的场景

    通过线程的start方法启动一个线程后,线程开始执行run方法,run方法运行结束后线程退出,但有些场景需要对其进行中断:

     

    2. 线程中断相关方法

    每个线程都有一个中断标志位,表示该线程是否被中断,可通过如下方式获取和设置:

    注意:

    1. 线程中还定义了stop()等一些方法,但它们已经过时了,不应该再继续使用。

     

    3. 线程对中断的反应

    线程被中断时,如何进行响应与线程的状态进行的IO操作有关。

    线程状态或IO操作响应
    NEW/TERMINATE状态调用interrupt()对它没有任何效果,中断标志位也不会被设置
    RUNNABLE状态(未执行IO操作)只会设置线程的中断标志位,没有其它任何作用。
    WAITING/TIMED_WAITING状态抛出InterruptedException,同时清空线程的中断标志位。
    BLOCKED状态
    (如进入synchronized代码块时未获取到锁)
    设置线程的中断标志位,但线程依然会处于BLOCKED状态。
    也就是说,interrupt()并不能使一个在等待锁的线程真正"中断"。
    等待IO操作,且IO通道是可中断的
    (即实现了InterruptibleChannel接口)
    抛出ClosedByInterruptException,同时设置线程的中断标志位。
    线程阻塞于Selector调用设置线程的中断标志位,同时,阻塞的调用会立即返回。

    此外,重点介绍下InputStream的read调用,在流中没有数据时,read会阻塞 ,但线程依然是RUNNABLE状态。如果此时被中断,只会设置线程中断标志,并不能真正“中断”它。

    子线程启动后调用read方法,此时标准输入流无数据,会进行阻塞,虽然主线程设置了中断标记,但子线程并不能响应中断。如果输入一个字符,read方法执行通过,随后进入while循环的中断标志判断逻辑,此时发现被中断才会退出。

     

    4. 如何处理线程中断

    线程中断时,可能抛出InterruptedException异常,也可能只设置中断标识位。

    注意:

    1. BLOCKED状态的线程虽然会被设置中断标识位,但依然保持阻塞状态,无法对其进行处理。

     

    5. 如何取消/关闭线程

    线程中断只是一种协作机制,而不一定会真正"中断"线程,如果不明白线程在做什么,不应该贸然的调用线程的interrupt方法。

    那如何取消或关闭线程呢?一般而言,对于以线程提供服务的程序模块,它应该封装取消或关闭的操作,从而使调用者能正确关闭线程

    如上节的InterruptReadDemo,可以在子线程中提供如下cancel方法,调用后关闭流(注意,流在关闭时返回-1),并进行中断退出。

    同样的,Java并发库的一些代码也提供了单独的取消/关闭方法:

    第02章_并发协作工具

    第一节 原子变量

    1. 基础用法

    原子变量可以保证更新操作的原子性,且无需加锁以及上下文切换,效率更高。常用基本类型的原子变量及相关变体如下:

    基本类型原子变量原子数组原子字段更新器
    BooleanAtomicBoolean  
    IntegerAtomicIntegerAtomicIntegerArrayAtomicIntegerFieldUpdater
    LongAtomicLongAtomicLongArrayAtomicLongFieldUpdater
    ReferenceAtomicReferenceAtomicReferenceArrayAtomicReferenceFieldUpdater

     

    1) AtomicInteger

    AtomicInteger可以用作计数器等场景,常用方法如下:

    AtomicBoolean可以用来在程序中表示一个标志位,它的原子操作方法有:

    AtomicLong可以用来在程序中生成唯一序列号,它的方法与AtomicInteger类似。

     

    2) AtomicReference

    AtomicReference用来以原子方式更新引用类型,它有一个类型参数,使用时需要指定引用的类型。以下代码演示了其基本用法:

     

    3) AtomicArray

    原子数组方便以原子的方式更新数组中的每个元素,我们以AtomicIntegerArray为例来简要介绍下。

     

    4) FieldUpdater

    FieldUpdater方便以原子方式更新对象中的字段,字段不需要声明为原子变量,FieldUpdater是基于反射机制实现的,看代码:

     

    2. 实现原理

    原子变量的组合操作都依赖一个特殊的方法,称之为CAS,比较并设置。如AtomicInteger中的CAS方法为:

    它在内部调用了Unsafe类的CAS方法,而该方法依赖底层计算机系统在硬件层次上直接支持的CAS指令

    有了CAS方法后,就可以通过死循环+CAS实现原子更新了,如AtomicInteger的incrementAndGet方法:

     

    3. ABA问题

    使用CAS方式更新有一个ABA问题,指一个线程刚开始看到的值是A,随后使用CAS进行更新,它实际期望的是没有其他线程修改过才更新,但普通的CAS做不到,因为可能在这个过程中,已经有其他线程修改过了,比如先改为了B,然后又改回为了A。

    一般来说,这对业务没啥影响,如果确实需要解决该问题,可以使用AtomicReference的增强类AtomicStampedReference,它在修改值的同时附加一个时间戳,只有值和时间戳都相同才进行修改。

    此外,还可以使用AtomicMarkableReference,它多关联了一个boolean类型的标志位,只有值和标志位都相同的情况下才进行修改

     

    4. 对比synchronized

    原子变量的更新方式与synchronized方式的相比,代表一种不同的思维方式。

    对于大部分比较简单的操作,无论是在低并发还是高并发情况下,这种乐观非阻塞方式的性能都要远高于悲观阻塞式方式。

    实际上,CAS也可以实现悲观阻塞式算法,Java并发包中的所有阻塞式工具、容器、算法都是使用了CAS (当然,也需要一些别的支持)。

    上例中的这种阻塞方式过于消耗CPU,有更为高效的方式,我们后续章节介绍。

     

     

    第二节 显式锁

    Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类有:

    本节主要介绍Lock接口和实现类ReentrantLock,关于读写锁,我们后续章节介绍。

     

    1. LOCK接口

    显式锁接口Lock的定义为:

    可以看出,相比synchronized,显式锁支持以非阻塞方式获取锁可以响应中断可以限时,这使得它灵活的多。

     

    2. ReentrantLock

    1) 加锁/解锁

    Lock接口的主要实现类是ReentrantLock,它的基本方法lock/unlock实现了与synchronized一样的语义,包括:

    注意:是否需要保证公平

    1. 保证公平会让活跃线程得不到锁,进入等待状态,引起上下文切换,降低整体效率,因此一般都是不保证公平的。

    2. 通常情况下,谁先运行关系不大,而且长时间运行,从统计角度而言,虽然不保证公平,也基本是公平的。

    3. 而且,即使fair参数为true,不带参数的tryLock方法也是不保证公平的,它不会检查是否有其他等待时间更长的线程。

     

    2) 避免死锁(tryLock)

    如果以不同顺序获取多个锁,可能造成死锁,可以使用tryLock()以尝试的方式获取锁,如果获取不到,还可以先释放已持有的锁,给其他线程获取锁的机会,然后再重试获取所有锁。

    注意:

    1. 做转账业务时,需在同事务内增加A账户余额并减少B账户余额,要保证AB账户按一致的顺序更新,否则相互转账会死锁。

     

    3) 获取锁信息

    除了实现Lock接口中的方法,ReentrantLock还有一些其他方法,可以获取关于锁的一些信息,这些信息可以用于监控和调试目的,比如:

     

    4) 对比synchronized

    synchronized是一种声明式编程,使用更为简单,不易出错,而且Java编译器和虚拟机可以不断优化synchronized的实现。但是,它不能响应中断,没有尝试加锁或限时加锁的方式避免死锁,并且协作时只有一个条件队列。

    显式锁是一种命令式编程,需程序员实现所有细节,但是更为灵活,可以解决上述一些synchronized的局限。

    简单总结,能用synchronized就用synchronized,不满足要求,再考虑ReentrantLock。

     

    3. ReentrantLock的实现基础

    1) CAS + LockSupport

    ReentrantLock在最底层依赖于前面介绍的CAS方法,另外,还依赖了LockSupport中的一些方法,它的基本方法有:

    这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法,Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为LockSupport中的这些方法就是基本操作

     

    2) AQS (AbstractQueuedSynchronizer)

    AbstractQueuedSynchronizer是Java提供的一个抽象类,它封装了CAS和LockSupport,简化了并发工具的实现。

     

    4. ReentrantLock的实现代码

    ReentrantLock内部定义了一个继承自AQS的抽象类Sync,它有两个实现类NonfairSyncFairSync

     

    1) 实现lock方法

    我们先来看下ReentrantLock的lock方法:

    其中tryAcquire必须被子类重写,NonfairSync的实现为:

    如果tryAcquire返回false,即被其它线程锁定,则AQS会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),其中addWaiter会新建一个节点Node,代表当前线程,然后加入到内部的等待队列中。

    放入等待队列中后,再调用acquireQueued尝试获得锁,代码为:

    以上就是lock方法的基本过程,能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待,这个过程中如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。

     

    2) 实现unlock方法

    ReentrantLock的unlock方法的代码为:

     

     

    第三节 显式条件

    显式条件基于显式锁的线程协作机制,它们之间的关系类似于synchronized与wait/notify协作机制的关系。

     

    1. Condition接口

    创建显式条件需要通过显式锁,Lock接口定义了创建方法:

    其中Condition为显式条件的接口,它的定义为:

    与wait/notify协作机制类似,await()/signal()也具有如下一些特性:

     

    2. 生产者/消费者模式案例

    生产者/消费者模式存在一个与队列满有关的条件,还存在一个与队列空有关的条件,而在前面通过wait/notify机制实现时,不得不共用同一个条件队列,而使用显式锁,则可以分别创建对应的条件队列。

    这样,代码更为清晰易读,同时避免了不必要的唤醒和检查,提高了效率。

     

    3. 显式条件的实现代码

    1) ConditionObject

    ConditionObject是AQS中定义的一个成员内部类,它可以直接访问AQS中的数据,比如AQS中定义的锁等待队列。它通过显式锁创建:

    它内部也有一个条件等待队列,其成员声明为:

     

    2) await实现分析

     

    3) awaitNanos实现分析

    awaitNanos与await的实现是基本类似的,区别主要是会限定等待的时间,如下所示:

     

    4) signal实现分析

     

     

    第四节 线程本地变量(ThreadLocal)

    1. 基本概念和用法

    线程本地变量指与线程绑定的变量,即每个线程都有同一个变量的独有拷贝,在Java中,用ThreadLocal表示。

     

    2. 使用场景

    1) SimpleDateFormat

    ThreadLocal是实现线程安全的一种方案。如前面介绍的SimpleDateFormat类,它是线程不安全的,除了使用锁进行同步或每次都创建新对象外,更好的方式是使用ThreadLocal,每个线程使用自己的DateFormat,就不存在安全问题了,在线程的整个使用过程中,只需要创建一次,又避免了频繁创建的开销。

    注意:

    1. 一般来说,ThreadLocal对象都定义为static,以便于引用。

     

    2) ThreadLocalRandom

    ThreadLocal可以降低线程竞争,提高性能。如前面介绍的Random类,即使它是线程安全的,但如果并发访问竞争激烈的话,性能会下降,因此Java并发包提供了ThreadLocalRandom类。

    其中current方法为静态方法,用于获取线程本地的Random实例:

     

    3) 上下文信息

    ThreadLocal的典型用途是提供上下文信息。如在Web服务器中,一个线程执行用户的请求时,多个方法都会用到请求信息、用户身份信息、数据库连接、当前事务等全局信息,如果作为参数传递则很不方便,这时就可以存放在线程本地变量中。

    在首次获取到信息时,调用set方法进行设置,然后就可以在代码的任意其他地方调用get相关方法进行获取了。

     

     

    3. 基本实现原理

    ThreadLocal对同一个变量是如何实现线程之间的隔离的呢?其实很简单,每个线程(Thread)都有一个ThreadLocalMap,当该线程第一次对某个ThreadLocal变量的操作时,就使用该变量引用作为key,存储值作为value在当前线程的Map中建立一个条目(Entry),后续get/set都将操作该条目,这样,就实现了每个线程都有自己的独立的存储空间。

    其中ThreadLocalMap是一个专门用于ThreadLocal的内部类,它的键类型为WeakReference<ThreadLocal>,我们没有提过WeakReference,它与Java的垃圾回收机制有关,使用它,便于回收内存,具体我们就不探讨了。

     

    1) set方法实现

    每个线程都有一个ThreadLocalMap,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的ThreadLocal对象,值为value。

     

    2) get方法实现

     

    3) remove方法实现

     

     

    4. ThreadLocal与线程池

    1) 未清理数据

    线程池中的线程是会重用的,如果异步任务使用了ThreadLocal,并且未进行清理操作,会将修改后的值带到了下一个异步任务

    上述代码输出为0 0 1,第1个任务将修改后的结果带到了第3个任务,造成了数据混乱。

     

    2) 解决方法

    第一种方法,在Task的run方法开始处,添加set或remove代码,如下所示:

    第二种方法,将Task的run方法包裹在try/finally中,并在finally语句中调用remove,如下所示:

    第三种方法,扩展线程池ThreadPoolExecutor,重写beforeExecute方法(在线程池将任务交给线程执行之前,会在线程中先调用该方法),通过反射清空所有ThreadLocal变量,这种方式无需修改异步任务的代码,推荐使用。

     

     

    第五节 并发容器

    1. CopyOnWrite的List和Set

    1) 写时复制

    写时复制是指每次修改都创建一个新数组,然后复制所有内容,但如果数组比较大,修改操作又比较频繁,那么性能是非常低的。

    写时复制是解决线程安全问题的一种重要思维,用于各种计算机程序中,比如操作系统内部的进程管理和内存管理等。

    注意:

    1. 解决线程安全问题还可以通过加锁(synchronized或ReentrantLock)循环CAS的方式实现。

    2. 写时复制的特点是写写互斥、读写兼容(和读写锁最大的差别)、读读兼容,只能保证最终一致性,不能保证实时一致性。

     

    2) CopyOnWriteArrayList

    CopyOnWriteArrayList实现了List接口,用法与ArrayList基本一致,主要区别有:

    CopyOnWriteArrayList是基于写时复制机制实现的,内部有一个普通数组,在读的时候,直接访问该数组,在写的时候,则会复制一个新数组,在新数组进行修改操作,修改完后再以原子方式设置内部的数组引用。

    如果读的过程中,发生了写操作,可能内部的数组引用已经被修改了,但不会影响读操作,它依旧访问原数组内容。

    换句话说,数组内容是只读的,写操作都是通过新建数组,然后原子性的修改数组引用来实现的

    在CopyOnWriteArrayList中,读不需要锁,可以并行,读和写也可以并行,但多个线程不能同时写,每个写操作都需要先获取锁。

    下面是一些常用方法的代码实现:

    注意:

    1. 在JDK8以前,CopyOnWriteArrayList返回的迭代器不支持修改相关的操作,如listIterator.set(1)、Collections.sort(list)等。

     

    3) CopyOnWriteArraySet

    CopyOnWriteArraySet实现了Set接口,不包含重复元素,用法非常简单,就不再赘述。它是基于CopyOnWriteArrayList实现的,因此性能比较低,不适用于元素个数特别多的集合。

    下面是一些常用方法的代码实现:

    注意:

    1. Java并发包中没有与HashSet对应的并发容器,但可以使用Collections.newSetFromMap方法基于ConcurrentHashMap构建一个

     

     

    2. ConcurrentHashMap

    1) ConcurrentHashMap

    ConcurrentHashMap是HashMap的并发版本,主要区别如下:

    扩展:

    1. HashMap在并发更新的情况下,链表结构可能形成环(在多个线程同时扩容哈希表的时候),出现死循环,占满CPU。

     

    2) 高并发性

    ConcurrentHashMap是如何实现高并发的呢?一般的同步容器使用synchronized,所有方法都竞争同一个锁。而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立的哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。

    采用分段锁,可以大大提高并发度,多个段之间可以并行读写。默认情况下,段是16个,可通过构造方法进行设置:

    在对每个段的数据进行读写时,ConcurrentHashMap也不是简单的使用锁进行同步,内部使用了CAS,对一些写采用原子方式,实现比较复杂,我们就不介绍了,实现的效果是,对于写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写的同时也可以读,这使得ConcurrentHashMap的并行度远远大于同步容器。

     

    3) 弱一致性

    ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性

    类似的情况还会出现在ConcurrentHashMap的另一个方法:

    该方法并非原子操作,而是调用put方法逐个元素进行添加的,在该方法没有结束的时候,部分修改效果就会体现出来。

     

     

    3. 基于SkipList的Map和Set

    1) SkipList

    SkipList称为跳表跳跃表,是一种基于链表的数据结构,在链表的基础上加了多层索引,使其更易于实现高效并发算法。

    下面是一个包含3, 6, 7, 9, 12, 17, 19, 21, 25, 26元素的跳表结构,两条线展示了查找值19和8的过程:

    img

     

    2) ConcurrentSkipListMap

    ConcurrentSkipListMap实现了ConcurrentMapSortedMap等接口,默认按键自然有序,可以传递比较器自定义排序,用法与TreeMap类似,主要区别如下:

    注意:

    1. ConcurrentSkipListMap的size方法不是常量操作,需要遍历所有元素,且遍历结束后size可能已改变,因此一般用处不大。

     

    3) ConcurrentSkipListSet

    TreeSet是基于TreeMap实现的,与此类似,ConcurrentSkipListSet也是基于ConcurrentSkipListMap实现的。

     

     

    5. 并发队列

    Java并发包中提供了丰富的队列类,简单列举如下:

    队列类型队列类名
    无锁非阻塞并发队列ConcurrentLinkedQueue、ConcurrentLinkedDeque
    普通阻塞队列ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque
    优先级阻塞队列PriorityBlockingQueue
    延时阻塞队列DelayQueue
    其他阻塞队列SynchronousQueue、LinkedTransferQueue

    这些队列迭代都不会抛出ConcurrentModificationException,都是弱一致的,后面就不单独强调了。

    注意:

    1. 无锁非阻塞是指这些队列不使用锁,所有操作总是可以立即执行,主要通过循环CAS实现并发安全。

    2. 阻塞队列是指使用锁和条件,很多操作都需要先获取锁或满足特定条件,获取不到锁或等待条件时,会等待(即阻塞)。

     

    1) 无锁非阻塞并发队列
    队列类名实现接口数据结构基本特性
    ConcurrentLinkedQueueQueue(先进先出队列,尾进头出)单向链表无界(没有限制大小)、szie方法需遍历
    ConcurrentLinkedDequeDeque(双端队列)双向链表无界(没有限制大小)、szie方法需遍历

    这两个类最基础的原理是循环CAS,ConcurrentLinkedQueue的算法基于一篇论文Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms,ConcurrentLinkedDeque扩展了ConcurrentLinkedQueue的技术,具体实现都非常复杂,我们就不探讨了。

     

    2) 普通阻塞队列

    除了刚介绍的两个队列,其他队列都是阻塞队列,都实现了接口BlockingQueue,在入队/出队时可能等待,主要方法有:

     

    普通阻塞队列是最常用的队列,可用于生产者/消费者模式等。

    队列类名实现接口数据结构基本特性
    ArrayBlockingQueueQueue(先进先出队列,尾进头出)循环数组有界(创建时指定大小)
    LinkedBlockingQueueQueue(先进先出队列,尾进头出)单向链表默认为无界,可以在创建时指定最大长度
    LinkedBlockingDequeDeque(双端队列)双向链表默认为无界,可以在创建时指定最大长度

    注意:

    1. ArrayBlockingQueue的大小在运行过程中不会改变,而同样基于循环数组实现的ArrayDeque是无界的,会自动扩展。

    上述普通阻塞式队列都是使用显式锁ReentrantLock显式条件Condition实现的。

     

    3) 优先级阻塞队列
    队列类名实现接口数据结构基本特性
    PriorityBlockingQueueBlockingQueue(队列为空时,take阻塞)堆(要求元素可比较)无界(没有限制大小),优先级高的先出队

    PriorityBlockingQueue使用了一个锁ReentrantLock保护所有访问,使用了一个条件协调阻塞等待。

     

    4) 延时阻塞队列
    队列类名实现接口数据结构基本特性
    DelayQueue<E extends Delayed>BlockingQueue堆(要求元素可比较)无界(没有限制大小),按元素的延时时间出队

    DelayQueue是一种特殊的优先级队列要求每个元素都实现Delayed接口,该接口的声明为:

    只有元素的延迟时间到期后,才能被取走,也就是说,take方法总是返回第一个过期的元素,如果没有,则阻塞等待。

    DelayQueue内部是基于PriorityQueue实现的,它使用一个锁ReentrantLock保护所有访问,使用一个条件available表示头部是否有元素,当头部元素的延时未到时,take操作会根据延时计算需睡眠的时间,然后睡眠,如果在此过程中有新的元素入队,且成为头部元素,则阻塞睡眠的线程会被提前唤醒然后重新检查。以上是基本思路,DelayQueue的实现有一些优化,以减少不必要的唤醒,具体我们就不探讨了。

     

    5) 实时队列

    SynchronousQueue不是传统意义上的存储队列,它不存储元素,它的入队操作要等待另一个线程的出队操作,反之亦然。

    如果没有其他线程在等待从队列中接收元素,put操作就会等待,take操作需要等待其他线程往队列中放元素,如果没有,也会等待。

    适用于两个线程之间直接传递信息、事件或任务

     

    6) 传递队列

    LinkedTransferQueue实现了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些额外功能,生产者在往队列中放元素时,可以等待消费者接收后再返回适用于一些消息传递类型的应用中。TransferQueue的接口定义为:

    LinkedTransferQueue是基于链表实现的,无界的TransferQueue,具体实现比较复杂,我们就不探讨了。

     

     

    第六节 其它协作工具

    在一些特定的同步协作场景中,相比使用最基本的wait/notify或显示锁/条件,使用并发同步协作工具更为方便,效率更高。

     

    1. ReentrantReadWriteLock

    1) 基本概念

    前面介绍的synchronized锁或ReentrantLock锁,不区分读操作或写操作,在访问前都需要获取锁。但在一些场景中,多个线程的读操作完全可以并行,在读多写少时,可以明显提高性能,这就是读写锁。

    在Java并发包中,接口ReadWriteLock表示读写锁,主要实现类是可重入读写锁ReentrantReadWriteLock

     

    2) 基本示例

     

    3) 基本原理

    读写锁内部有一个等待队列存放等待的线程,有一个整数变量表示锁的状态,读锁和写锁各用16位表示。

    在获取写锁时,必须确保当前没有其他线程持有任何锁,否则就等待。

    在写锁释放时,会将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。

    在获取读锁时,只要写锁没有被其它线程持有,就可以获取成功。此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程为止。

    在读锁释放时,会检查读锁和写锁数是否都变为了0,如果是,则会唤醒等待队列中的下一个线程。

     

     

    2. 信号量(Semaphore)

    1) 基本概念

    前面介绍的锁一般只允许一个线程同时访问资源,但是部分场景需要允许指定数目的线程访问,类似于限制最大并发数

    信号量类Semaphore就是用来解决这类问题的,它可以限制对资源的并发访问数:

     

    2) 基本示例

    注意:

    1. Semaphore是不可重入的,即使在同一个线程,每一次的acquire调用都会消耗一个许可。

    2. 一般锁只能由持有锁的线程释放,而Semaphore表示的只是一个许可数,任意线程都可以调用其release方法

    3. 因此,即使将permits设置为1,它和一般的锁还是有本质的不同。

     

    3) 基本原理

    Semaphore的基本原理比较简单,也是基于AQS实现的,permits表示共享的锁个数,acquire方法就是检查锁个数是否大于0,大于则减一,获取成功,否则就等待,release就是将锁个数加一,唤醒第一个等待的线程。

     

     

    3. 倒计时门栓(CountDownLatch)

    1) 基本概念

    倒计时门栓CountDownLatch类似于一次性开关,一开始是关闭的,所有希望通过该门的线程都需要等待。它有一个倒计时,当变为0后,门栓打开,等待的所有线程都可以通过,并且开关是一次性的,打开后就不能再关上了。

     

    2) 基本示例

    前面介绍过门栓的两种应用场景,一种是同时开始,另一种是等待结束,它们都有两类线程,互相需要同步,重写代码如下:

    注意:

    1. countDown的调用应该放到finally语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。

     

     

    4. 循环栅栏(CyclicBarrier)

    1)基本概念

    循环栅栏CyclicBarrier类似于一个集合点,只有全部线程都到达后才能进行下一步,它是循环的,可以用作重复的同步,特别适用于并行迭代计算,每个线程负责一部分计算,然后集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

    注意:

    1. 只要有一个线程抛出BrokenBarrierException,就会导致所有在调用await的线程都抛出BrokenBarrierException。

    2. 此外,如果栅栏动作(集合点动作)抛出了异常,也会破坏栅栏。

     

    2) 基本示例

     

    3) 对比CountDownLatch

    CyclicBarrier与CountDownLatch可能容易混淆,它们主要有两点区别:

    第03章_异步执行服务

    第一节 异步任务执行服务

    异步任务执行服务是一套框架,将要执行的并发任务线程的管理相分离。对于使用者,只需关注任务本身,如提交任务、获取结果、取消任务等,而由服务提供者执行任务,如创建线程、任务调度、关闭线程等,大大简化了并发业务的开发。

     

    1. 基本接口

    1) Runnable和Callable

    RunnableCallable表示要执行的异步任务,其中Runnable没有返回结果,且不可以抛异常,而Callable有返回结果,也允许抛异常。

     

    2) Executor和ExecutorService

    Executor表示最简单的执行服务(执行器),可以执行一个Runnable,没有返回结果。

    注意:

    1. Executor接口并没有规定如何执行任务,它可以创建新的线程,复用线程池中的线程或在调用者线程执行。

     

    ExecutorService扩展了Executor,支持提交Runnable及Callable等多种类型的异步任务,并用Future封装返回结果。

     

    3) Future

    Future表示异步任务的结果,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。

    关于Futrue的get方法,在调用后有四种可能情形:

    注意:

    1. Future是一个重要的概念,是实现"任务的提交"与"任务的执行"相分离的关键,是其中的"纽带",使用者和服务提供者通过它隔离各自的关注点,同时进行协作。

     

     

    2. ExecutorService的使用用法

    1) 基本用法

     

    2) invokeAll示例

     

     

    3. ExecutorService的实现原理

    ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,关于线程池我们下节再介绍。ExecutorService有一个抽象实现类AbstractExecutorService,本节,我们简要分析其原理,并基于它实现一个简单的ExecutorService。

     

    1) AbstractExecutorService

    ExecutorService最基本的方法是submit,它的实现代码如下:

    其中RunnableFuture是一个接口,既扩展了Runnable,又扩展了Future。作为Runnable,它表示要执行的任务,传递给execute方法进行执行;作为Future,它又表示任务执行的异步结果。

     

    2) FutureTask

    FutureTask是RunnableFuture的实现类,它有如下一些变量:

    在构造时,初始化待执行任务callable和任务状态state:

    任务执行服务会使用一个线程执行FutureTask的run方法,run()代码为:

    对于任务提交者,它通过FutureTask的get方法获取结果,限时等待的get方法代码如下:

    其中report根据状态返回结果或抛出异常:

    FutureTask的cancel方法可以取消任务,代码如下:

     

    3) invokeAll和invokeAny

    AbstractExecutorService的invokeAll,实现逻辑很简单,对每个任务,创建一个FutureTask,并调用execute执行,然后等待所有任务结束。

    invokeAny的实现稍微复杂些,它利用了ExecutorCompletionService,关于这个类及invokeAny的实现,我们后续章节再介绍。

     

     

    第二节 线程池服务

    线程池实现资源共享的一种方式,主要由任务队列工作线程两个概念组成,它可以重用线程,减少线程创建的开销。

     

    1. 线程池的基本使用

    1) 创建线程池

    Java中的线程池实现类为ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor,构造方法如下:

     

    2) 获取线程池信息

     

    3) 配置线程池大小

    除了在创建时配置线程池大小外,还可以通过getter/setter方法获取和设置线程池大小。

     

    4) 选择任务队列

    ThreadPoolExecutor要求的队列类型必须是阻塞队列BlockingQueue,可以是:

    注意:

    1. 如果使用无界队列,则线程个数最大只能达到corePoolSize,设置maximumPoolSize参数将无意义。

     

    5) 配置任务拒绝策略

    线程池一般使用有界队列maximumPoolSize是有限的,当两者都达到上限时,就会拒绝任务的提交(execute/submit/invokeAll),可以通过构造方法或setter方法设置拒绝策略

    ThreadPoolExecutor内部实现了四种拒绝策略可供选择:

    注意:

    1. 拒绝策略只有在队列有界,且最大线程数有限的情况下才会触发,让拒绝策略有机会执行对保证系统稳定非常重要。

    2. 在任务量非常大的场景中,如果队列无界,可能会导致请求处理队列积压过多任务,消费非常大的内存;如果队列有界但不限制最大线程数,可能会创建过多的线程,占满CPU和内存。

     

    6) 自定义线程工厂

    线程工厂可以对创建的线程进行一些配置,它的接口如下:

    默认实现类为Executors类中的静态内部类DefaultThreadFactory,它主要就是创建一个线程,给线程设置一个名称( pool-<线程池编号>-thread-<线程编号>),设置daemon属性为false,设置线程优先级为标准默认优先级等。

     

    7) 线程池的其他配置

     

     

    2. 线程池工厂类

    线程池工厂类Executors提供了一些静态工厂方法,可以方便的创建一些预配置的线程池,主要方法有:

    在系统负载可能极高的情况下,newFixedThreadPool的问题是队列过长,而newCachedThreadPool的问题是线程过多,这时,应根据具体情况自定义ThreadPoolExecutor,传递合适的参数。

     

     

    3. 线程池的死锁

    如果提交的任务之间存在依赖,在线程池打满时,可能会出现死锁。

    上述问题可以将线程池类型替换为newCachedThreadPool来解决,让创建的线程不再受限。

    另外,创建线程池时使用SynchronousQueue队列也可解决上述问题:

    对于普通队列,入队只是把任务放到了队列中,而对于SynchronousQueue来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到maximumPoolSize,如果达到了maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。

     

     

    第三节 CompletionService

    CompletionService适用于提交多个异步任务,然后按完成顺序逐个处理任务结果的场景,不同于invokeAll,它无需等待所有异步任务全部完成才开始处理。

     

    1. 基本用法

    1) 接口

     

    2) 实现类

    主要实现类为ExecutorCompletionService,它对Executor进行了装饰,并额外添加了一个阻塞队列来负责结果的排队和处理

     

    3) 基本示例

    前面通过ExecutorService的invokeAll方法实现并发下载并分析URL标题的示例中,必须等到所有任务都完成才开始处理结果,而使用CompletionServiceDemo可以在第一个任务完成后就开始处理结果

     

     

    2. 实现原理

    ExecutorCompletionService是如何实现结果实时按序处理的呢?它主要依赖内部的一个阻塞队列以及重写了FutureTask的done()方法

    ExecutorCompletionService的take/poll方法就可以从该队列中获取已结束任务的结果,如下所示:

     

     

    3. 实现invokeAny

    通过ExecutorCompletionService,可以实现invokeAny方法,基本思路是:在提交任务后,通过take方法获取结果,获取到第一个有效结果后,取消所有其他任务。

     

     

    第四节 定时任务

    1. 应用场景

    定时任务的应用场景是非常多的,比如:

    在Java中,可以使用java.util包中的TimerTimerTask实现,也可以使用并发包中的ScheduledExecutorService实现。

    注意:

    1. 上述两者都不能胜任复杂的定时任务调度,如每周一和周三晚上18:00到22:00,每半小时执行一次。 对于类似这种需求,可以使用更为强大的第三方类库,比如Quartz(http://www.quartz-scheduler.org/)。

     

    2. Timer和TimerTask

    1) 基本用法

    TimerTask表示一个定时任务,它是一个抽象类,实现了Runnable接口,具体的定时任务需要继承该类,实现run方法。

    Timer表示一个定时器,负责定时任务的调度和执行,它有如下主要方法:

     

    2) 基本原理

    Timer内部主要由任务队列Timer线程两部分组成。

    Timer线程主体是一个循环,从队列中拿任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。如果睡眠过程中队列上添加了新任务且新任务是第一个任务,Timer线程会被唤醒,重新进行检查。

    在执行任务之前,Timer线程判断任务是否为周期任务,如果是,就设置下次执行的时间并添加到优先级队列中,对于固定延时的任务,下次执行时间为当前时间加上period,对于固定频率的任务,下次执行时间为上次计划执行时间加上period。

    需要强调是,下次任务的计划是在执行当前任务之前就做出了的,对于固定延时的任务,延时相对的是任务执行前的当前时间,而不是任务执行后,这与后面讲到的ScheduledExecutorService的固定延时计算方法是不同的,后者的计算方法更合乎一般的期望。

    另一方面,对于固定频率的任务,它总是基于最先的计划计划的,所以,很有可能会出现前面例子中一下子执行很多次任务的情况。

     

    3) 基本示例

     

    4) 注意事项

    一个Timer对象只有一个Timer线程,这意味着,定时任务不能耗时太长,更不能是无限循环,看个例子:

    Timer线程在执行任何一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有定时任务都会被取消

    所以,如果希望各个定时任务不互相干扰,一定要在run方法内捕获所有异常

     

     

    2. ScheduledExecutorService

    1) 接口

    由于Timer/TimerTask的一些问题,Java并发包引入了ScheduledExecutorService,它是一个接口:

    返回类型都是ScheduledFuture,它也是一个接口,实现了Future和Delayed,没有定义额外方法。

    注意:

    1. 与Timer不同,ScheduledExecutorService不支持以绝对时间作为首次运行的时间。

     

    2) 实现类

    主要实现类为ScheduledThreadPoolExecutor,它是线程池ThreadPoolExecutor的子类,是基于线程池实现的,构造方法如下:

    参数含义与ThreadPoolExecutor一样,我们就不赘述了。

    工厂类Executors也提供了一些方便的方法,以方便创建ScheduledThreadPoolExecutor,如下所示:

    注意:

    1. 它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使corePoolSize设为0,它也会至少运行一个线程。

     

    3) 基本示例

    由于可以有多个线程执行定时任务,一般任务就不会被某个长时间运行的任务所延迟了,比如,对于前面的TimerFixedDelay,如果改为:

    再次执行,第二个任务就不会被第一个任务延迟了。

    另外,与Timer不同,单个定时任务的异常不会再导致整个定时任务被取消了,即使背后只有一个线程执行任务,我们看个例子:

    TaskA和TaskB都是每秒执行一次,TaskB两秒后执行,但一执行就抛出异常,屏幕的输出类似如下:

    这说明,定时任务TaskB被取消了,但TaskA不受影响,即使它们是由同一个线程执行的。不过,需要强调的是,与Timer不同,没有异常被抛出来,TaskB的异常没有在任何地方体现。所以,与Timer中的任务类似,应该捕获所有异常

     

    3) 对比Timer/TimerTask

    ScheduledThreadPoolExecutor的实现思路与Timer基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,主要不同有:

     

     

    第五节 组合式异步编程

    组合式异步编程,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化了多异步任务的开发。

    在Java中的实现类为CompletableFuture,它是Future接口的扩展和增强,表示一个可主动完成的异步任务结果,同时实现了CompletionStage接口,表示某个完成阶段,支持直接对多个任务进行链式和组合处理。

    每个完成阶段都有一个执行任务,它可能需要等待其它一个或多个阶段完成才能开始,在完成后也可能会触发其他阶段开始运行。

     

    1. 基本使用

    1) 创建和执行

    下面是一些简单示例:

    注意:

    1. Async后缀的函数表示该任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的,下同。

    2. executor参数表示执行该任务的线程池,如果省略,将使用ThreadPerTaskExecutor(单核cpu)或ForkJoinPool.commonPool()执行。

     

    2) 完成和取消任务

     

    3) 获取任务状态

     

    4) 获取任务结果

     

    5) 任务完成回调

    任务完成回调方法可以接收前一个任务正常结束时的结果值,或前面链路中的任务异常结束时的异常(抛出异常后,链路中的后续任务将不会继续执行),无返回值,不会改变原结果或覆盖原异常

    注意:

    1. 同步处理函数(不带Async)被注册后,如果任务线程未结束,将会使用任务线程执行,如果已结束,将会由当前注册线程执行。

    2. 注册的异常处理逻辑对前面链路中的任务都生效,可参考下方“顺序任务流”案例。

     

    6)结果或异常处理

    结果或异常处理方法也可以接收正常结束时的结果值,或异常结束时的异常,但可以修改任务结果,且会覆盖原异常

     

    2.构建任务流

    1) 依赖前一阶段正常完成

    下面一些方法可以用来构建依赖单一阶段的任务流,当前一个阶段正常完成时,自动触发所有依赖该阶段的下一阶段任务,如果前一个阶段发生了异常,所有后续阶段都不会执行,结果会被设为相同的异常,调用join会抛出运行时异常CompletionException

    简单示例如下:

    注意:

    1. 以run、accept、apply开头的方法,参数类型一般为Runnable、Consumer、Function类型。

     

    2) 依赖前两阶段都正常完成

    当一个阶段正常完成,且指定的另一个阶段也正常完成时,才触发下一阶段。注意,这两个阶段可以并行执行,并且没有依赖关系。

     

    3) 依赖前两阶段之一正常完成

    当前阶段和指定的另一个阶段,只要其中一个正常完成,就会启动下一阶段任务。

     

    4) 构建依赖多个阶段的任务流

    如果依赖的阶段不止两个,可以使用如下静态方法,基于多个CompletableFuture构建了一个新的CompletableFuture。

     

    3. 其它方法

     

     

    第六节 Fork/Join框架

    1. Fork/Join框架简介

    Fork/Join框架是一个并行任务执行框架,它把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果,适用于多核环境的可分割计算再合并结果的计算密集型任务。

    img

    注意:

    1. ForkJoinPool不是为了替代ExecutorService,而是它的补充,在某些应用场景下(计算密集型任务)性能比ExecutorService更好。

    2. ForkJoinPool主要用于实现分而治之的算法,特别是分治之后递归调用的函数,例如QuickSort等;

    3. ForkJoinPool的宗旨是使用少量的线程来处理大量的任务,多个线程获取到多个处理器的时间分片,并行的执行子任务。

     

     

    2. Fork/Join框架原理

    Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask作为异步任务、ForkJoinWorkerThread作为执行任务的线程这三者构成的任务调度机制

     

    1) ForkJoinPool

    ForkJoinPool是ExecutorService的一个实现,用于管理工作线程,以及提供获取线程池状态和性能信息的相关方法。

    ForkJoinPool中的每个工作线程都有自己的双端队列(WorkQueue)用于存储任务(ForkJoinTask),并且使用了一种名为工作窃取(work-stealing)算法来平衡线程的工作负载。

    注意:

    1. 工作窃取(work-stealing)算法指空闲的线程试图从繁忙线程的队列中(队首)窃取任务。

     

    2) ForkJoinWorkerThread

    ForkJoinWorkerThread直接继承了Thread,是被ForkJoinPool管理的工作线程,由它来执行ForkJoinTask。

     

    3) ForkJoinTask<V>

    ForkJoinTask抽象类表示一个可分割计算再合并结果的异步任务,常用方法如下:

    它有两个子抽象类RecursiveActionRecursiveTask,分别表示无返回结果和有返回结果的异步任务,我们一般使用它们。

     

     

    3. Fork/Join的使用

    注意:

    1. 如果拆分逻辑比计算逻辑还要复杂时,ForkJoinPool并不会带来性能的提升,反而可能会起到负面作用。

     

     

    第七节 并发篇总结

    多线程开发有两个核心问题,一个是竞争,另一个是协作

     

    1. 线程安全的机制

    线程表示一条单独的执行流,每个线程有自己的执行计数器,有自己的栈,不过可以共享内存,共享内存是实现线程协作的基础,但共享内存有两个问题,竞态条件内存可见性,之前章节探讨了如下一些解决这些问题的思路。

     

    1) synchronized

    它是一个关键字,简单易用,大部分情况下,直接加在方法的声明上即可,既可以解决竞态条件问题,也可以解决内存可见性问题。

    需要理解的是,它保护的是对象,而不是代码,只有对同一个对象的synchronized方法调用,才能保证它们被顺序调用。对于实例方法,这个对象是this,对于静态方法,这个对象是类对象,对于代码块,需要指定哪个对象。

    另外,需要注意,它不能尝试获取锁,也不响应中断,还可能会死锁。不过,相比显式锁,synchronized简单易用,JVM也可以不断优化它的实现,应该被优先使用。

     

    2) 显式锁

    显式锁是相对于synchronized隐式锁而言的,它可以实现synchronzied同样的功能,但需要程序员自己创建锁,调用锁相关的接口,主要接口是Lock,主要实现类是ReentrantLock。

    相比synchronized,它支持以非阻塞方式获取锁、可以响应中断、可以限时、可以指定公平性、可以解决死锁问题,这使得它灵活的多。

    在读多写少且读操作可以完全并行的场景中,可以使用读写锁(ReadWriteLock/ReentrantReadWriteLock)以提高并发度。

     

    3) volatile

    synchronized和显式锁都是锁,使用锁可以实现安全,但使用锁是有成本的,获取不到锁的线程还需要等待,会有线程的上下文切换开销等。保证安全不一定需要锁。如果共享的对象只有一个,操作也只是进行最简单的get/set操作,set也不依赖于之前的值,那就不存在竞态条件问题,而只有内存可见性问题,这时,在变量的声明上加上volatile就可以了。

     

    3) 原子变量和CAS

    使用volatile,set的新值不能依赖于旧值,但很多时候,set的新值与原来的值有关,这时,也不一定需要锁,如果需要同步的代码比较简单,可以考虑原子变量,它们包含了一些以原子方式实现组合操作的方法,适用于并发环境中的计数、产生序列号等需求。

    原子变量的基础是CAS,比较并设置,一般的计算机系统都在硬件层次上直接支持CAS指令。通过循环CAS的方式实现原子更新是一种重要的思维,相比synchronized,它是乐观的,而synchronized是悲观的,它是非阻塞式的,而synchronized是阻塞式的。CAS是Java并发包的基础,基于它可以实现高效的、乐观、非阻塞式数据结构和算法,它也是并发包中锁、同步工具和各种容器的基础

     

    4) 写时复制

    之所以会有线程安全的问题,是因为多个线程并发读写同一个对象,如果每个线程读写的对象都是不同的,或者,如果共享访问的对象是只读的,不能修改,那也就不存在线程安全问题了。

    我们在介绍容器类CopyOnWriteArrayList和CopyOnWriteArraySet时介绍了写时复制技术,写时复制就是将共享访问的对象变为只读的,写的时候,再使用锁,保证只有一个线程写,写的线程不是直接修改原对象,而是新创建一个对象,对该对象修改完毕后,再原子性地修改共享访问的变量,让它指向新的对象。

     

    5) ThreadLocal

    ThreadLocal让每个线程对同一个变量都有自己的独有拷贝,每个线程实际访问的对象都是自己的,自然也就不存在线程安全问题了。

     

     

    2. 线程的协作机制

    多线程之间的核心问题,除了竞争,就是协作

     

    1) wait/notify

    wait/notify与synchronized配合一起使用,是线程的基本协作机制,每个对象都有一把锁和两个等待队列,一个是锁等待队列,放的是等待获取锁的线程,另一个是条件等待队列,放的是等待条件的线程,wait将自己加入条件等待队列,notify从条件等待队列上移除一个线程并唤醒,notifyAll移除所有线程并唤醒。

    需要注意的是,wait/notify方法只能在synchronized代码块内被调用,调用wait时,线程会释放对象锁,被notify/notifyAll唤醒后,要重新竞争对象锁,获取到锁后才会从wait调用中返回,返回后,不代表其等待的条件就一定成立了,需要重新检查其等待的条件。

    wait/notify方法看上去很简单,但往往难以理解wait等的到底是什么,而notify通知的又是什么,只能有一个条件等待队列,这也是wait/notify机制的局限性,这使得对于等待条件的分析变得复杂。

     

    2) 显式条件

    显式条件与显式锁配合使用,与wait/notify相比,可以支持多个条件队列,代码更为易读,效率更高,使用时注意不要将signal/signalAll误写为notify/notifyAll。

     

    3) 线程的中断

    Java中取消/关闭一个线程的方式是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出,线程在不同状态和IO操作时对中断有不同的反应,作为线程的实现者,应该提供明确的取消/关闭方法,并用文档清楚描述其行为,作为线程的调用者,应该使用其取消/关闭方法,而不是贸然调用interrupt。

     

    4) 协作工具类

    除了基本的显式锁和条件,针对常见的协作场景,Java并发包提供了多个用于协作的工具类。

     

    5) 阻塞队列

    阻塞队列有普通的先进先出队列,包括基于数组的ArrayBlockingQueue和基于链表的LinkedBlockingQueue/LinkedBlockingDeque,也有基于堆的优先级阻塞队列PriorityBlockingQueue,还有可用于定时任务的延时阻塞队列DelayQueue,以及用于特殊场景的阻塞队列SynchronousQueue和LinkedTransferQueue。

    对于最常见的生产者/消费者协作模式,可以使用阻塞队列,阻塞队列封装了锁和条件,生产者线程和消费者线程只需要调用队列的入队/出队方法就可以了,不需要考虑同步和协作问题。

     

    6) Future/FutureTask

    在常见的主从协作模式中,主线程往往是让子线程异步执行一项任务,获取其结果,手工创建子线程的写法往往比较麻烦,常见的模式是使用异步任务执行服务,不再手工创建线程,而只是提交任务,提交后马上得到一个结果,但这个结果不是最终结果,而是一个Future,Future是一个接口,主要实现类是FutureTask。

    Future封装了主线程和执行线程关于执行状态和结果的同步,对于主线程而言,它只需要通过Future就可以查询异步任务的状态、获取最终结果、取消任务等,不需要再考虑同步和协作问题。

     

     

    3. 容器类

    线程安全的容器有两类,一类是同步容器,另一类是并发容器

     

    1) 同步容器

    Collections类中有一些静态方法,可以基于普通容器返回线程安全的同步容器,比如:

    它们是给所有容器方法都加上synchronized来实现安全的。同步容器的性能比较低,另外,还需要注意一些问题,比如复合操作和迭代,需要调用方手工使用synchronized同步,并注意不要同步错对象。

    而并发容器是专为并发而设计的,线程安全、并发度更高、性能更高、迭代不会抛出ConcurrentModificationException、很多容器以原子方式支持一些复合操作。

     

    2) 写时拷贝的List和Set

    CopyOnWriteArrayList基于数组实现了List接口,CopyOnWriteArraySet基于CopyOnWriteArrayList实现了Set接口,它们采用了写时拷贝,适用于读远多于写,集合不太大的场合。不适用于数组很大,且修改频繁的场景。它们是以优化读操作为目标的,读不需要同步,性能很高,但在优化读的同时就牺牲了写的性能。

     

    3) ConcurrentHashMap

    HashMap不是线程安全的,在并发更新的情况下,HashMap的链表结构可能形成环,出现死循环,占满CPU。ConcurrentHashMap是并发版的HashMap,通过分段锁和其他技术实现了高并发,读操作完全并行,写操作支持一定程度的并行,以原子方式支持一些复合操作,迭代不用加锁,不会抛出ConcurrentModificationException。

     

    4) 基于SkipList的Map和Set

    ConcurrentHashMap不能排序,容器类中可以排序的Map和Set是TreeMap和TreeSet,但它们不是线程安全的。Java并发包中与TreeMap/TreeSet对应的并发版本是ConcurrentSkipListMap和ConcurrentSkipListSet。

    ConcurrentSkipListMap是基于SkipList实现的,SkipList称为跳跃表或跳表,是一种数据结构,主要操作复杂度为O(log(N)),并发版本采用跳表而不是树,是因为跳表更易于实现高效并发算法。

    ConcurrentSkipListMap没有使用锁,所有操作都是无阻塞的,所有操作都可以并行,包括写。与ConcurrentHashMap类似,迭代器不会抛出ConcurrentModificationException,是弱一致的,也直接支持一些原子复合操作。

     

    5) 并发队列

    各种阻塞队列主要用于协作,非阻塞队列适用于多个线程并发使用一个队列的场合,有两个非阻塞队列,ConcurrentLinkedQueue和ConcurrentLinkedDeque,ConcurrentLinkedQueue实现了Queue接口,表示一个先进先出的队列,ConcurrentLinkedDeque实现了Deque接口,表示一个双端队列。它们都是基于链表实现的,都没有限制大小,是无界的,这两个类最基础的实现原理是循环CAS,没有使用锁。

     

     

    4. 任务执行服务

    1) 基本概念

    任务执行服务大大简化了执行异步任务所需的开发,它引入了一个"执行服务"的概念,将"任务的提交"和"任务的执行"相分离,"执行服务"封装了任务执行的细节,对于任务提交者而言,它可以关注于任务本身,如提交任务、获取结果、取消任务,而不需要关注任务执行的细节,如线程创建、任务调度、线程关闭等。

    任务执行服务主要涉及以下接口:

    使用者只需要通过ExecutorService提交任务,通过Future操作任务和结果即可,不需要关注线程创建和协调的细节。

     

    2) 主要实现方式 - 线程池

    任务执行服务的主要实现机制是线程池,实现类是ThreadPoolExecutor,线程池主要由两个概念组成,一个是任务队列,另一个是工作者线程。任务队列是一个阻塞队列,保存待执行的任务。工作者线程主体就是一个循环,循环从队列中接受任务并执行。

    ThreadPoolExecutor实现了生产者/消费者模式,工作者线程就是消费者,任务提交者就是生产者,线程池自己维护任务队列。当我们碰到类似生产者/消费者问题时,应该优先考虑直接使用线程池,而非重新发明轮子,自己管理和维护消费者线程及任务队列。

     

    3) CompletionService

    在异步任务程序中,一种场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java并发包提供了一个方便的方法,使用CompletionService,这是一个接口,它的实现类是ExecutorCompletionService,它通过一个额外的结果队列,方便了对于多个异步任务结果的处理。

     

    4) 定时任务

    异步任务中,常见的任务是定时任务。在Java中,有两种方式实现定时任务:

    Timer有一些需要特别注意的事项:

    ScheduledExecutorService的主要实现类是ScheduledThreadPoolExecutor,它没有Timer的问题:

    所以,实践中建议使用ScheduledExecutorService。