Java 中有多种锁类型,用于实现并发控制,确保多个线程在访问共享资源时能够安全地协调和同步。以下是一些常见的 Java 锁的类型:

  1. 内置锁(Intrinsic Lock)/ 互斥锁(Mutex Lock)

    • synchronized 关键字:Java 中最基本的锁机制之一,用于实现线程同步。它可以用来修饰方法或代码块,确保同一时间只有一个线程可以访问被锁定的方法或代码块。
  2. 重入锁(Reentrant Lock)

    • ReentrantLock 类:提供了更灵活的锁定机制,相对于 synchronized,它具有更多的功能,如可中断性、公平性等。开发者可以显式地锁定和解锁,以更精细地控制并发访问。
  3. 读写锁(Read-Write Lock)

    • ReentrantReadWriteLock 类:用于实现读多写少的场景,允许多个线程同时读取共享资源,但只允许一个线程写入。这可以提高读取操作的性能。
  4. 条件锁(Condition Lock)

    • Condition 接口:通常与重入锁一起使用,它允许线程等待某个条件的满足,然后再继续执行。ReentrantLock 提供了与条件锁相关的方法。
  5. 倒计数器锁(CountDownLatch)

    • CountDownLatch 类:用于等待一组线程完成某个操作。它允许一个或多个线程等待其他线程执行完特定操作后再继续执行。
  6. 信号量(Semaphore)

    • Semaphore 类:用于控制同时访问某个资源的线程数量。它允许限制并发线程的数量,适用于一些资源有限的场景。
  7. 栅栏(CyclicBarrier)

    • CyclicBarrier 类:用于在多个线程之间设置一个栅栏,所有线程必须在栅栏处等待,直到所有线程都到达后,栅栏才会打开,允许它们继续执行。
  8. 锁分段(Lock Striping)

    • 在一些高并发的数据结构中,锁分段技术用于将共享数据分成多个段,每个段有自己的锁。这可以降低锁的争用,提高并发性能。例如,ConcurrentHashMap 就使用了锁分段技术。
  9. 乐观锁(Optimistic Locking)

    • 乐观锁不是基于传统的锁机制,而是基于版本控制。在并发更新时,线程首先读取数据的版本号,然后在更新时检查版本号是否一致。如果一致,才执行更新操作,否则需要重新尝试。
  10. 自旋锁(Spin Lock)

    • 自旋锁是一种忙等待锁,线程在尝试获取锁时不会立即阻塞,而是反复检查锁是否可用。自旋锁通常用于低竞争情况下,以减少线程上下文切换的开销。

悲观锁(Synchronized、ReentrantLock)

在 Java 中,悲观锁是一种并发控制机制,它假设在并发环境下会发生竞争,并且默认情况下会将资源锁住,阻止其他线程访问该资源,直到当前线程完成操作。悲观锁的实现主要依赖于以下几种方式:

  1. synchronized 关键字

    Java 中最基本的悲观锁实现是使用 synchronized 关键字。通过将关键代码块或方法标记为synchronized,只有一个线程能够获得锁并执行代码,其他线程将被阻塞。

    synchronized (lockObject) {
        // 悲观锁保护的临界区代码
    }
    
  2. ReentrantLock 类

    ReentrantLockjava.util.concurrent 包中提供的一个可重入锁的实现,它允许更精细的控制和灵活性,相对于 synchronized 更为强大。

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    Lock lock = new ReentrantLock();
    
    lock.lock(); // 获取锁
    try {
        // 悲观锁保护的临界区代码
    } finally {
        lock.unlock(); // 释放锁
    }
    

synchronized

同步代码块

    public void m0() {
        synchronized (this) {
            System.out.println("方法0");
        }
    }

字节码:

 0 aload_0
 1 dup
 2 astore_1
 3 monitorenter
 4 getstatic #7 <java/lang/System.out : Ljava/io/PrintStream;>
 7 ldc #13 <方法0>
 9 invokevirtual #15 <java/io/PrintStream.println : (Ljava/lang/String;)V>
12 aload_1
13 monitorexit
14 goto 22 (+8)
17 astore_2
18 aload_1
19 monitorexit
20 aload_2
21 athrow
22 return
  • 从字节码的角度来看,一般一个同步代码块将会有一个monitorenter和两个monitorexit
  • 同步代码块中,如果发生了异常,将会释放锁,这就是第二个monitorexit所起的作用

普通的同步方法

    public synchronized void m1() {
        System.out.println("方法1");
    }

字节码:

  public synchronized void m1();
    descriptor: ()V
    flags: (0x0021) ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=1, args_size=1
         0: getstatic     #7                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #21                 // String 方法1
         5: invokevirtual #15                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
         3: ldc           #23                 // String 方法2
         5: invokevirtual #15                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
      LineNumberTable:
        line 13: 0
        line 14: 8

  • 可以看到有一个ACC_SYNCHRONIZED的标记

静态的同步方法:

    public synchronized static void m2() {
        System.out.println("方法2");
    }

公平锁和非公平锁

在 Java 中,锁可以分为两种类型:公平锁(Fair Lock)和非公平锁(Non-fair Lock)。它们的主要区别在于线程在尝试获取锁时是否考虑等待队列中其他线程的等待时间。以下是这两种锁的特点和区别:

1. 公平锁(Fair Lock):

  • 特点:公平锁是一种按照线程请求锁的顺序来分配锁的锁机制。当多个线程按顺序请求锁时,公平锁会按照请求的顺序分配锁,确保较早请求锁的线程会较早地获得锁。

  • 优点:公平锁能够避免线程饥饿(Starvation),即某个线程长时间等待锁的情况。

  • 缺点:由于要维护一个等待队列以保证公平性,因此公平锁的实现可能会引入一些额外的开销,可能降低性能。

  • 使用方式:在 Java 中,ReentrantLockReentrantReadWriteLock 可以通过构造函数的参数来创建公平锁。

    // 创建公平锁
    Lock fairLock = new ReentrantLock(true);
    

2. 非公平锁(Non-fair Lock):

  • 特点:非公平锁是一种不考虑线程请求锁的顺序,而是允许新请求的线程直接竞争锁的锁机制。在非公平锁下,新请求的线程有机会在等待队列中排在已等待的线程前面获取锁。

  • 优点:非公平锁通常比公平锁具有更高的吞吐量,因为它不需要维护严格的等待队列顺序,能够充分利用CPU的时间片。

  • 缺点:非公平锁可能导致已经等待较长时间的线程被插队,可能会引发线程饥饿问题。

  • 使用方式:在 Java 中,默认情况下,ReentrantLockReentrantReadWriteLock 创建的是非公平锁。可以显式地将其设置为公平锁,如上面的示例所示。

在实际开发中,要根据具体的需求选择公平锁还是非公平锁。如果需要确保线程请求锁的顺序,可以选择公平锁。如果追求更高的性能并可以接受一些线程插队,可以选择非公平锁。在某些场景下,根据具体需求灵活选择锁的类型可以最大程度地提高系统的性能。

乐观锁

Java 中的乐观锁是一种并发控制机制,与悲观锁不同,它假设并发冲突的发生概率较低,因此不会立即阻塞线程,而是在更新操作前检查资源版本或状态,如果没有冲突则执行操作,否则进行回滚或重试。乐观锁通常适用于读多写少的场景,可以提高系统的并发性能。在 Java 中,常见的乐观锁实现方式包括:

  1. 版本号机制

    乐观锁的常见实现方式之一是使用版本号机制。在数据表中添加一个版本号字段,每次更新操作都伴随着版本号的递增。在读取数据时,记录当前版本号,在更新数据时,检查记录的版本号是否与当前数据库中的版本号匹配。如果匹配,表示没有冲突,可以执行更新操作,否则需要处理冲突。

    // 伪代码示例
    int expectedVersion = currentVersionFromDatabase;
    int newVersion = expectedVersion + 1;
    if (updateRecordWithVersionCheck(expectedVersion, newVersion)) {
        // 更新成功
    } else {
        // 处理冲突,可能进行回滚或重试
    }
    
  2. CAS(Compare and Swap)操作

    Java 提供了 java.util.concurrent.atomic 包,其中包含了一组原子操作类,用于实现乐观锁。最常用的类是 AtomicIntegerAtomicLongAtomicReference。这些类使用 CAS 操作来实现线程安全的自增、自减、设置等操作。

    import java.util.concurrent.atomic.AtomicInteger;
    
    AtomicInteger counter = new AtomicInteger(0);
    int expectedValue = counter.get();
    int newValue = expectedValue + 1;
    if (counter.compareAndSet(expectedValue, newValue)) {
        // 更新成功
    } else {
        // 处理冲突,可能进行回滚或重试
    }
    

可重入锁

可重入锁(Reentrant Lock)是一种多线程编程中的锁机制,它允许同一个线程多次获得同一个锁,而不会导致死锁。可重入锁通常用于解决线程中嵌套调用同步方法的问题,以确保线程在调用其自身已经拥有锁的方法时不会被锁阻塞。

以下是可重入锁的一些关键特点和用法:

  1. 重入性:可重入锁具有重入性,也就是说同一个线程可以多次获取同一个锁,而不会被阻塞。这使得线程可以在拥有锁的情况下继续访问其他同步代码块。

  2. 互斥性:可重入锁保证了多个线程之间的互斥性,只有一个线程可以拥有锁并执行临界区代码。

  3. lock() 和 unlock() 方法:可重入锁通常提供了 lock()unlock() 方法来手动获取和释放锁。线程需要在锁外部显式地调用这些方法。

  4. tryLock() 方法:可重入锁通常还提供了 tryLock() 方法,允许线程尝试获取锁,如果锁已被其他线程持有,则立即返回失败,而不会阻塞。

  5. 可中断性:可重入锁通常允许线程在等待锁的过程中被中断,以避免长时间的等待。

  6. 超时获取锁:一些可重入锁还支持在一定时间内获取锁,如果在指定时间内无法获取锁,则可以执行其他操作,避免无限等待。

  7. 公平性:可重入锁可以是公平锁(按照线程请求锁的顺序分配锁)或非公平锁(允许新请求的线程直接竞争锁)。

在 Java 中,java.util.concurrent.locks.ReentrantLocksynchronized都是可重入锁的实现。

package cn.yanshangan;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyLock {
    Lock lock = new ReentrantLock();

    public synchronized void m1() {
        System.out.println("m1开始");
        m2();
        System.out.println("m1结束");
    }

    public synchronized void m2() {
        System.out.println("m2开始");
        System.out.println("m2结束");
    }

    public synchronized void b1() {
        synchronized (this) {
            System.out.println("b1开始");
            b2();
            System.out.println("b1结束");
        }
    }

    public synchronized void b2() {
        synchronized (this) {
            System.out.println("b2开始");
            System.out.println("b2结束");
        }
    }

    public void r1() {
        lock.lock();
        try {
            System.out.println("r1开始");
            r2();
            System.out.println("r1结束");
        } finally {
            lock.unlock();
        }
    }

    public void r2() {
        lock.lock();
        try {
            System.out.println("r2开始");
            System.out.println("r2结束");
        } finally {
            lock.unlock();
        }
    }
}



class Pro {
    public static void main(String[] args) {
        MyLock myLock = new MyLock();
        myLock.m1();
        myLock.b1();
        myLock.r1();
    }
}

可重入锁是多线程编程中的重要工具,它允许线程在递归调用和多层嵌套中正确地管理锁,避免了死锁和其他并发问题。它通常在需要更精细的控制和灵活性的并发情况下使用。

中断

线程中断是一种机制,允许一个线程通知另一个线程以停止或终止其执行,一个线程不应该由其他线程强制中断或者停止,应该由线程自身停止。线程中断的主要机制是通过调用 Thread.interrupt() 方法来实现的。

线程中断可以用于以下情况:

  1. 通知线程停止执行:一个线程可以调用另一个线程的 interrupt() 方法,通知它停止执行。被通知的线程需要周期性地检查自身是否被中断,并在适当的时候终止执行。
  2. 解除阻塞:如果一个线程因为等待某些条件而阻塞,可以通过中断来解除阻塞状态,让线程能够继续执行或退出。

主要方法包括:

  • void interrupt():用于协商中断线程,设置线程的中断状态为 true。被中断的线程可以通过检查 Thread.isInterrupted()Thread.currentThread().isInterrupted()不断的检查状态或捕获 InterruptedException 异常来响应中断。
  • boolean isInterrupted():检查线程的中断状态,返回 true 如果线程已经被中断。
  • static boolean interrupted():检查当前线程的中断状态,并清除中断状态(将中断状态重置为 false)。

线程中断是一种协作式的机制,需要线程自己来处理中断请求,通常是通过检查中断状态和进行清理工作来终止线程的执行。

线程中断的实现方式

方式1:volatile + 静态变量

    static volatile boolean flag = false;
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!flag) {
                System.out.println("正在执行");
            }
        });
        thread.start();
        Thread.sleep(20);
        flag = true;
    }

方式2:使用原子类

    static AtomicBoolean flag = new AtomicBoolean(false);
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!flag.get()) {
                System.out.println("正在执行");
            }
        });
        thread.start();
        Thread.sleep(20);
        flag.set(true);
    }

方式3:调用中断方法

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("正在执行");
            }
            System.out.println(Thread.currentThread().getName() + "已被中断");
        });
        thread.start();
        Thread.sleep(20);
        thread.interrupt();
        System.out.println("已停止:" + thread.isInterrupted());
    }

interrupt() 方法调用时,被调用者在 wait()sleep()join() 方法下,那么被等待的线程将收到 InterruptedException 异常,并且线程的中断状态会被清除(重置为 false)。被中断的线程可以在捕获 InterruptedException 异常后,根据需要终止执行或执行其他逻辑,这也** **。

解决方案:可以在捕获异常后再进行一次中断方法的调用

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                System.out.println("收到中断请求。。。");
                Thread.currentThread().interrupt();
            }
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("正在执行");
                System.out.println("Thread.interrupted() = " + Thread.interrupted());
            }
            System.out.println(Thread.currentThread().getName() + "已被中断");
        });
        thread.start();
        Thread.sleep(20);
        thread.interrupt();
        System.out.println("已停止:" + thread.isInterrupted());
    }

LockSupport

LockSupport 是 Java 并发包中的一个工具类,用于线程的阻塞和唤醒操作。它提供了一种更灵活和可靠的线程控制方式,相对于传统的 Object.wait()Object.notify() 方法,LockSupport 具有以下特点:

  1. 无需持有对象监视器锁:与 wait()notify() 方法不同,LockSupport 方法不需要线程持有对象监视器锁(即不需要在 synchronized 块内调用),因此可以更灵活地控制线程。

  2. 可唤醒性LockSupport 提供了一种可唤醒线程的机制,可以随时唤醒一个等待的线程,而不需要像 notify() 方法一样等待某个条件。

  3. 线程中断响应LockSupport 方法可以响应线程的中断请求,即当一个线程在等待时被中断,它会立即返回,并不会抛出 InterruptedException 异常,但线程的中断状态会被设置。

LockSupport 类主要提供以下两个方法:

  • static void park():使当前线程进入阻塞状态,等待许可。如果许可可用,它会立即返回,否则会一直阻塞,直到被中断或其他线程调用 unpark() 方法唤醒它。

  • static void unpark(Thread thread):唤醒指定线程,使其进入可运行状态。如果线程已经在 park() 方法中等待,它会在 unpark() 调用后立即返回。如果线程尚未调用 park(),那么下次调用 park() 时也会立即返回。

以下是一个简单的示例,演示了 LockSupport 的基本用法:

import java.util.concurrent.locks.LockSupport;

public class LockSupportExample {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            System.out.println("Child thread: Parking...");
            LockSupport.park(); // 阻塞当前线程
            System.out.println("Child thread: Unparked!");
        });

        thread.start();

        try {
            Thread.sleep(2000); // 主线程休眠2秒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Main thread: Unpark child thread.");
        LockSupport.unpark(thread); // 唤醒子线程
    }
}

在上述示例中,主线程创建了一个子线程,子线程调用了 LockSupport.park() 进入阻塞状态。主线程休眠2秒后,调用 LockSupport.unpark(thread) 唤醒了子线程,使其继续执行。

unpark()可以在park()之前、之后调用,作用效果相同,即使多次调用,只能生效一次

JMM

Java 内存模型(Java Memory Model,简称 JMM)是一种规范,定义了多线程程序中内存访问的行为规则,确保不同线程之间能够正确地共享变量并保持一致性屏蔽各种硬件和操作系统的内存访问差别。JMM 规定了程序中的操作如何在多线程环境下表现,并为程序员提供了一些关键概念和工具来管理多线程程序的内存访问。

以下是 Java 内存模型的一些关键概念和原则:

  1. 主内存和工作内存:Java 内存模型将内存划分为主内存(Main Memory)和每个线程的工作内存(Thread’s Working Memory)。主内存是共享的,用于存储所有共享变量。每个线程都有自己的工作内存,用于存储变量的本地副本。线程只能直接操作工作内存中的变量,而不是主内存中的变量。

  2. 内存可见性:Java 内存模型确保在一个线程修改共享变量后,其他线程能够看到该变量的最新值。可以通过使用 volatile 关键字、synchronized 关键字以及锁机制等来实现的。JMM规定,Java中所有的变量都存在于内存中

  3. 原子性:JMM 保证一些基本操作的原子性,如读写操作,对 volatile 变量的读写、synchronized 块的执行等。这意味着这些操作在多线程环境下是不可分割的,要么全部执行成功,要么全部不执行。

  4. 有序性:JMM 确保程序中的操作按照一定的顺序进行,尽管在多线程环境下可能会对指令进行重排序。有序性由 volatilesynchronized 提供的内存屏障来保证。

  5. happens-before 规则:JMM 引入了 happens-before 规则来定义操作之间的顺序关系。如果一个操作 happens-before 另一个操作,那么第一个操作的结果对于第二个操作是可见的。

  6. 线程间通信:线程之间通过共享变量进行通信。JMM 确保线程在读写共享变量时的可见性和顺序性。

  7. 同步机制:JMM 提供了 synchronized 关键字、volatile 关键字、wait()notify() 方法等同步机制来协调多线程的执行,以确保线程之间的互斥和协作。

主内存和本地内存

在 Java 内存模型(Java Memory Model,JMM)中,有两个重要的概念:主内存(Main Memory)和本地内存(Thread’s Working Memory),它们用于管理线程之间的内存访问和数据共享。

  1. 主内存(Main Memory)

    • 主内存是多个线程共享的内存区域,用于存储所有的共享变量和对象实例

    • 主内存是整个程序的内存抽象,它包含了所有线程需要访问的共享数据。

    • 主内存是线程之间数据的真正存储地点,是线程之间数据同步的参考点。在主内存中的数据可以被任意线程访问和修改。

    • 主内存的操作是线程安全的,任何一个线程对主内存的写操作都会对其他线程可见

  2. 本地内存(Thread’s Working Memory)

    • 每个线程都有自己的本地内存,本地内存是线程私有的,用于存储变量的本地副本以及线程私有的数据

    • 当线程访问一个变量时,它首先会从主内存中加载变量的副本到本地内存中,然后对该本地副本进行操作

    • 线程只能直接操作本地内存中的数据,而不是直接操作主内存中的数据。这样可以提高线程访问数据的速度。

    • 本地内存中的数据在不同线程之间不可见,每个线程都有自己的本地内存副本

    • 如果一个线程修改了本地内存中的变量副本,必须将其刷新到主内存,以便其他线程能够看到修改。

总之,主内存是多个线程共享的内存区域,用于存储共享变量和对象实例,是线程之间数据同步的参考点。本地内存是每个线程私有的,用于存储变量的本地副本,线程只能直接操作本地内存中的数据。线程之间通过主内存来实现数据共享和通信,但需要注意的是,主内存中的数据修改必须通过特定的同步机制来确保线程之间的可见性和一致性。

指令重排序

Java 中的指令重排序是一种编译器和处理器的优化技术,旨在提高程序的性能和效率。指令重排序可以改变程序中指令的执行顺序,但必须保证程序的最终执行结果与顺序执行时一致

指令重排序可能会导致以下三种类型的重排序:

  1. 编译器重排序:编译器在生成字节码或机器码时可能会重新排列指令,以提高代码执行效率。这种重排序通常不会改变程序的语义,因为编译器必须确保最终执行结果与源代码一致

  2. 处理器重排序:现代处理器通常具有多级流水线和多个执行单元,它们可以根据可用资源和数据依赖关系来重新安排指令的执行顺序,以提高并行度和性能。处理器重排序可以在不改变程序语义的情况下重新排列指令。

  3. 内存系统重排序:内存系统(包括缓存、写缓冲区等)也可以导致指令的执行顺序发生变化。内存系统的重排序可能会导致读取的数据不一致或写入的数据丢失。

Java 内存模型(Java Memory Model,JMM)引入了 happens-before 规则来约束指令重排序,以确保程序的可见性和一致性。根据 happens-before 规则,以下情况被认为是有序的:

  • 在程序中,每个操作都 happens-before 其后的所有操作。
  • volatile 变量的写操作 happens-before 该变量的后续读操作。
  • synchronized 块的解锁操作 happens-before 后续对同一块 synchronized 块的加锁操作。

这些规则确保了程序在不同线程之间的变量读写操作的一致性和可见性。

要注意的是,虽然编译器和处理器可以对指令进行重排序,但在多线程编程中,程序员通常不需要担心指令重排序问题,因为 Java 的同步机制(如 synchronizedvolatile)以及 JMM 规则已经处理了大部分重排序问题。然而,在某些特殊情况下,如果需要细粒度的控制和优化,程序员可能需要使用 volatilesynchronized 和其他同步工具来确保正确性。

Happens Before

“Happens-before” 是 Java 内存模型(Java Memory Model,JMM)中的一个关键概念,用于定义操作之间的顺序关系。它规定了一组规则,确保在多线程程序中,不同线程之间的操作能够按照一定的顺序进行,以保障多线程程序的可预测性和正确性。

想象你有两个操作,A 和 B:“happens-before” 规则告诉我们,如果操作 A “happens-before” 操作 B,那么操作 A 的效果一定会在操作 B 之前变得可见。

举个例子:

假设你有一个共享变量 x,操作 A 给 x 赋值为 42,然后在操作 B 时读取 x

如果操作 A “happens-before” 操作 B,那么在操作 B 读取 x 的时候,它一定会看到 x 的值是 42,因为操作 A 已经在之前将 42 赋值给了 x

// 操作A
x = 42;
// 操作b
b = x + 10;

“happens-before” 规则确保了在多线程程序中,操作之间有一定的顺序性和可见性,这样你就可以更容易地理解和控制多线程程序的行为,避免出现意外的错误。这个规则对于确保多线程程序的正确性非常重要。

下面是 “happens-before” 的一些重要规则和概念:

Java 内存模型(Java Memory Model,JMM)中定义了一系列 “happens-before” 规则,用于确保多线程程序中操作之间的顺序关系和可见性。以下是 JMM 中的 “happens-before” 规则的八个基本原则:

  1. 程序顺序规则(Program Order Rule)一个线程内,写在前面的代码一定比写在后边的代码先执行前面的操作 happens-before 于后面的操作。也就是说,线程内的操作顺序是有序的,不会发生重排序。

  2. 监视器锁规则(Monitor Lock Rule):一个解锁操作 unlock happens-before 后续的对同一把锁的加锁操作 lock。这确保了对共享资源的互斥访问,保证了锁的顺序性,也就是先解锁、再加锁

  3. volatile 变量规则:对 volatile 变量的写操作 happens-before 后续的读操作。这确保了对 volatile 变量的修改对其他线程是可见的,保证了可见性。

  4. 传递性(Transitivity):如果操作 A happens-before 操作 B,且操作 B happens-before 操作 C,那么操作 A happens-before 操作 C。这表示 happens-before 具有传递性,可以构建操作之间的链条。

  5. start 规则:线程的启动操作 start happens-before 该线程的每个操作。这确保了线程的所有操作都在线程启动之后执行,只有执行第4行的start()方法后,第2行才能够执行

    Thread thread = new Thread(() -> {
        System.out.println("abc")
    });
    thread.start();
    
  6. join 规则:线程的结束操作 join happens-before 当前线程接收到线程结束的信号。这确保了线程的结束对其他线程是可见的。

  7. 线程中断规则(Thread Interruption Rule):对线程的中断操作 interrupt happens-before 通过检测中断状态或调用 isInterrupted() 方法等方式来查看线程的中断状态。这确保了中断的信息能够被其他线程正确感知。也就是先进行调用中断,再进行中断标志的判断

  8. 对象终结规则(Finalization Rule):一个对象的初始化操作 happens-before 它的 finalize 方法的执行。这确保了在对象被回收前,其初始化的操作已经完成。

这些 “happens-before” 规则提供了一种可预测的方式来控制多线程程序中操作之间的正确顺序,以确保程序的可见性和一致性。程序员可以根据这些规则编写正确的多线程代码,并避免出现竞态条件、脏读等并发问题。

volatile

  • volatile写之前的操作,禁止重排序到volatile之后
  • volatile读之后的操作,禁止重排序到volatile之前
  • volatile写之后的volatile读,禁止重排序

volatile 是 Java 中的一个关键字,用于修饰变量,主要用于确保变量的可见性和禁止指令重排序,当修改某个共享变量的值时会立即将共享变量的值刷新回主内存,当读某个共享变量的值时,重新回主内存读最新的共享变量

  • 写屏障(Write Barrier):确保写入操作将数据刷新到主内存,而不是仅保存在本地缓存中。这可以保证写操作对其他线程是可见的。
  • 读屏障(Read Barrier):确保读取操作从主内存获取最新的值,而不使用本地缓存中的旧值。这可以保证读操作能够看到其他线程写入的值。

四种屏障:写写、读读、读写、写读

volatile 变量具有以下特性和用途:

  1. 可见性(Visibility)volatile 变量保证了对变量的写操作对其他线程是可见的。这意味着当一个线程修改了 volatile 变量的值,其他线程能够立即看到这个变化,而不会使用本地缓存中的旧值。

  2. 禁止指令重排序(Prevents Reordering)(有序性)volatile 变量的读写操作前后会插入内存屏障(memory barrier),这阻止了编译器和处理器对操作进行重排序。这确保了操作的有序性,避免了意外的执行顺序。

  3. 不保证原子性(Not Atomic)volatile 只保证单个读/写操作的原子性,但不能保证复合操作的原子性。如果一个操作包含多个步骤,并且需要保证这些步骤一起执行或一起失败,那么需要使用更强的同步机制,如 synchronized 块或 java.util.concurrent 中的类。

  4. 适用场景volatile 通常用于标识状态标志或标识是否停止线程的变量,这些变量需要被多个线程共享,并且需要确保可见性。它也可以用于简单的双重检查锁定(Double-Checked Locking)单例模式。

  5. 性能开销较小:与其他同步机制(如 synchronized)相比,volatile 的性能开销较小,因为它不涉及线程的阻塞和唤醒。

使用 volatile 时需要注意以下几点:

  • 不要将 volatile 用于复合操作,因为它不能保证复合操作的原子性。对于复杂的同步需求,应该使用更强的同步机制。

  • volatile 适用于状态标志和简单的数据共享,但不能替代其他同步工具,如 synchronizedjava.util.concurrent 中的类。

  • 在多线程编程中,使用 volatile 变量时要小心确保可见性,但不要过度使用,因为它可能会导致代码复杂化。选择合适的同步机制取决于具体的需求。

public class Volatile {
    public static boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {
            }
            System.out.println("已停止");
        }, "线程1").start();

        Thread.sleep(2000);
        flag = false;
        System.out.println("flag = " + flag);
    }
}
  • 上例中,线程1无法感知到flag的变化,由于循环内没有其他可执行的代码,这个循环变成了一个空闲的忙等待(busy-waiting)循环,不会释放 CPU 资源。
  • 在任务管理器中,可以一直看到CPU资源在占用。
  • 主线程在休眠 2000 毫秒后将 flag 设置为 false,但由于新线程一直在忙等待,它无法检测到 flag 变为 false 的变化,因此不会停止。
public class Volatile {
    public static boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {
                System.out.println(666);
            }
            System.out.println("已停止");
        }, "线程2").start();

        Thread.sleep(2000);
        flag = false;
        System.out.println("flag = " + flag);
    }
}
  • 这个例子中,新线程也会不断地检查 flag 变量是否为 true,但与第一个示例不同,它在循环内输出了 “666”。这个输出操作会使得新线程不仅检查 flag 变量的状态,还会进行输出操作,这导致了线程的时间片分配。
  • 在这个示例中,新线程会周期性地让出 CPU 时间片,让其他线程有机会执行。这意味着在一段时间后,主线程有机会将 flag 设置为 false,新线程能够检测到这个变化并停止执行。因此,程序的输出会包括 “已停止”。

volatile保证可见性:

public class Volatile {
    public static volatile boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (flag) {
            }
            System.out.println("已停止");
        }).start();

        Thread.sleep(2000);
        flag = false;
        System.out.println("flag = " + flag);
    }
}

volatile不满足原子性:

以下输出结果不确定,并且idea会有以下警告

volatile 字段 ‘count’ 上的非原子操作
检查信息: 报告对 volatile 字段的非原子操作。
非原子操作的示例之一是使用增量运算符更新字段。 由于操作涉及读写,并且在此之间可能发生其他修改,因此可能损坏数据。 使用 synchronized 块进行环绕,或使用 java.util.concurrent.atomic 软件包中的一个类,可以使该项操作成为原子操作。

public class Volatile {
    public static volatile int count = 100;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> count--);
        }
        pool.shutdown();
        Thread.sleep(2000);
        System.out.println("count = " + count);
    }
}

字节码的角度

从字节码的角度来看:

  public static volatile int count;
    descriptor: I
    flags: (0x0049) ACC_PUBLIC, ACC_STATIC, ACC_VOLATILE

使用volatile修饰的变量会有一个ACC_VOLATILE的标识

单例模式的双检锁方式

在单例模式的双重检查锁(Double-Checked Locking)中,将静态常量设置为 volatile 的目的是确保单例对象的初始化是可见的和有序的,以避免潜在的问题。

双重检查锁的主要目标是在需要创建单例对象时,确保只有一个线程创建该对象。但是,由于指令重排序的存在,有可能出现在对象尚未完全初始化之前,其他线程已经获得了对尚未完全初始化的对象的引用。这可能导致访问到一个不完整或不正确的对象状态,从而引发错误。

通过将单例对象的引用设置为 volatile,可以确保以下两个关键的原子性操作:

  1. 分配内存空间:在 JVM 中为对象分配内存空间是一个独立的操作,可能会在构造函数执行之前完成。如果不使用 volatile,其他线程可能会在对象完全初始化之前获得对该对象的引用。

  2. 初始化对象:构造函数的执行可能包含多个步骤,如果不使用 volatile,其他线程可能会在构造函数的部分代码执行之前获得对该对象的引用。

通过将静态常量设置为 volatile,可以禁止指令重排序,并确保单例对象的初始化在引用被返回之前完成。这样,其他线程在获取到对象引用时,可以确保对象已经完全初始化,避免了潜在的问题。

以下是一个使用双重检查锁的单例模式的示例,其中 instance 被声明为 volatile

public class Singleton {
    private volatile static Singleton instance;

    private Singleton() {}

    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

CAS

CAS(Compare-and-Swap)是一种并发编程中常用的原子操作,它是一种基本的硬件原语,通常由处理器提供支持。CAS 操作用于实现多线程环境下的无锁算法,用于解决共享变量的原子性操作问题。

CAS 操作包括以下三个步骤:

  1. 比较(Compare):首先,CAS 会比较内存中的某个值与预期的值是否相等。如果相等,表示内存中的值没有被其他线程修改,可以进行下一步。

  2. 交换(Swap):如果比较成功,CAS 会尝试将内存中的值更新为新的值。这个更新操作是原子的,即只有一个线程可以成功执行更新操作,其他线程将失败。

  3. 返回结果(Return):CAS 操作会返回更新前的旧值,以供调用者检查和进一步处理。

CAS 的关键特性包括:

  • 原子性:CAS 操作是原子的,只有一个线程能够成功执行交换操作,其他线程的操作将失败。

  • 无锁:CAS 是一种无锁算法,它不需要像锁一样阻塞线程。当多个线程同时尝试执行 CAS 操作时,只有一个线程会成功,其他线程需要重试或执行其他逻辑。

  • 自旋等待:如果 CAS 操作失败,线程通常会自旋等待,即不断重复 CAS 操作直到成功或达到一定的重试次数。

  • ABA 问题:CAS 操作可能存在 ABA 问题,即在某个线程执行 CAS 操作时,另一个线程可能已经将共享变量的值从 A 变为 B,然后再变回 A,这样 CAS 操作可能会错误地认为没有变化。为了解决 ABA 问题,通常需要使用版本号或标记来辅助 CAS 操作。

CAS 在并发编程中广泛用于实现各种同步数据结构和算法,例如无锁队列、计数器、单例模式、非阻塞算法等。Java 中的 java.util.concurrent.atomic 包提供了一组原子类,如 AtomicIntegerAtomicLongAtomicReference 等,用于执行 CAS 操作,简化了在多线程环境下的原子性操作。

优缺点

CAS(Compare-and-Swap)是一种用于实现多线程并发操作的原子性操作,具有以下优点和缺点:

优点

  1. 原子性操作:CAS 操作是原子性的,可以保证多个线程同时进行 CAS 操作时只有一个线程成功,避免了竞态条件。

  2. 无锁操作:CAS 操作不需要使用锁来保护共享资源,因此避免了锁的开销和线程切换的成本。在低竞争情况下,性能较好。

  3. 避免死锁:由于 CAS 操作不使用锁,因此不会导致死锁问题,这使得代码更容易编写和维护。

  4. 减少上下文切换:CAS 操作可以在同一线程内多次尝试,减少了线程之间的上下文切换,有助于提高程序的性能。

  5. 支持非阻塞算法:CAS 操作支持非阻塞算法的实现,这些算法在高并发环境中能够更好地扩展。

缺点

  1. 自旋等待:CAS 操作失败时,线程会自旋等待,不断尝试操作,直到成功或达到一定的重试次数。在高竞争情况下,自旋等待可能会导致大量的 CPU 时间浪费。

  2. ABA 问题:CAS 操作只比较值,不考虑值的变化过程。这可能导致 ABA 问题,即一个值在某个时刻变为 B,然后又变回 A,CAS 操作可能会错误地认为没有变化。为了解决这个问题,通常需要使用版本号或标记等方法来辅助 CAS 操作。

  3. 限制:CAS 操作只能用于原子性更新一个变量的值,不能用于复合操作。如果需要实现多步骤的原子操作,可能需要额外的手段来确保原子性。

  4. 平台依赖性:CAS 操作依赖于底层硬件的支持,不同的硬件平台和 JVM 实现可能具有不同的行为和性能。

  5. 不支持阻塞操作:CAS 操作通常不适用于需要等待某个条件满足再继续的场景,因为它不支持线程的阻塞和唤醒。

总之,CAS 是一种有着明显优点的原子性操作,特别适用于低竞争的情况,但在高竞争和复杂的多步骤原子操作中可能会有一些限制和问题。在使用 CAS 操作时,开发者需要根据具体场景和需求来权衡其优点和缺点,并选择合适的并发控制策略。

底层实现

AtomicInteger类在底层依赖于Unsafe类,而Unsafe类中提供了一系列操作内存的方法,以下为CAS进行自旋的一些代码,在不加锁的情况下实现了的i++的功能

    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        return v;
    }
  1. do-while 循环:这段代码使用了一个 do-while 循环,表示会一直尝试执行操作,直到操作成功。这是典型的自旋等待机制,如果操作失败,会一直重试直到成功。

  2. getIntVolatile:这是一个调用 Unsafe 类的方法,用于获取指定对象 o 中偏移量 offset 处的整数值,而且是以volatile方式获取,确保从主内存中读取最新的值。v 变量用于存储这个获取到的整数值。

  3. weakCompareAndSetInt:这是另一个 Unsafe 方法,用于尝试以 CAS(比较-交换)操作来更新指定对象 o 中偏移量 offset 处的整数值。它的参数包括对象 o、偏移量 offset、预期的旧值 v 和新值 v + delta

    • 如果当前偏移量处的值等于预期的旧值 v,则将新值 v + delta 写入该位置,并返回 true 表示更新成功。

    • 如果当前值与预期的旧值 v 不等(说明其他线程已经修改了该值),则不执行更新,返回 false 表示更新失败。

整个操作的目标是在循环中,不断获取当前值 v,然后尝试以 CAS 操作更新为 v + delta如果更新失败(即有其他线程在这之间修改了该值),就继续循环尝试。只有在更新成功后才会退出循环

最终,这段代码返回了原来的值 v,而不是更新后的值 v + delta。这是因为它实现的是"获取-加"操作,返回的是操作前的原始值,而不是更新后的值。

这种原子的自增操作通常用于实现计数器、标记位、并发集合等场景,它确保了多线程环境下对整数值的操作是原子的,避免了竞态条件和数据不一致性的问题。

AtomicInteger类

AtomicInteger 是 Java 中 java.util.concurrent.atomic 包中的一个原子整数类,它提供了一种线程安全的方式来执行整数的原子操作。以下是 AtomicInteger 的主要用法:

  1. 创建 AtomicInteger 对象

    AtomicInteger atomicInt = new AtomicInteger();
    // 也可以初始化一个特定的值
    AtomicInteger atomicIntWithInitialValue = new AtomicInteger(10);
    
  2. 原子性操作AtomicInteger 提供了一系列原子性操作方法,可以执行常见的整数操作,如增加、减少、设置、获取等。

    • int get():获取当前值。
    • void set(int newValue):设置新的值。
    • int getAndSet(int newValue):设置新的值并返回旧值。
    • int getAndIncrement():先获取当前值,然后增加 1 并返回旧值。
    • int getAndDecrement():先获取当前值,然后减少 1 并返回旧值。
    • int getAndAdd(int delta):先获取当前值,然后增加指定的增量并返回旧值。
    • int incrementAndGet():增加 1 并返回新值。
    • int decrementAndGet():减少 1 并返回新值。
    • int addAndGet(int delta):增加指定的增量并返回新值。
  3. 原子性比较与设置AtomicInteger 也提供了一组方法,可以执行比较与设置操作,通常与 CAS(Compare-and-Swap)相关。

    • boolean compareAndSet(int expect, int update):如果当前值等于预期值 expect,则将当前值更新为 update。如果更新成功,返回 true,否则返回 false
    • boolean weakCompareAndSet(int expect, int update):类似于 compareAndSet,但对于不支持强 CAS 操作的平台,性能更好。
  4. 原子性更新操作AtomicInteger 还提供了一组方法,可以执行原子性更新操作,例如按位与、按位或、按位异或等。

    • int getAndUpdate(IntUnaryOperator operator):使用指定的操作函数更新当前值并返回旧值。
    • int updateAndGet(IntUnaryOperator operator):使用指定的操作函数更新当前值并返回新值。
  5. 其他方法:除了上述方法外,AtomicInteger 还提供了其他一些方法,如 getAndAccumulateaccumulateAndGet 等,用于执行更复杂的原子性操作。

使用 AtomicInteger 可以避免手动编写同步代码来保证整数的线程安全性,这对于高并发的多线程程序尤其有用。它通常用于计数器、标记位、并发集合等场景,其中需要对整数进行原子性操作。在使用 AtomicInteger 时,开发者不需要担心加锁和同步问题,因为这些操作已经内置在原子类中。

AtomicReference类型

AtomicReference 是 Java 中 java.util.concurrent.atomic 包中的一个原子引用类,它提供了一种线程安全的方式来操作引用类型(对象引用)的原子操作。AtomicReference 允许多个线程并发地更新和访问引用,并确保这些操作是原子性的,从而避免竞态条件和数据不一致性的问题。

以下是 AtomicReference 类的主要用法和方法:

  1. 创建 AtomicReference 对象

    AtomicReference<String> atomicRef = new AtomicReference<>();
    // 也可以初始化一个特定的引用值
    AtomicReference<String> atomicRefWithInitialValue = new AtomicReference<>("initialValue");
    
  2. 原子性引用操作

    • V get():获取当前引用的值。
    • void set(V newValue):设置新的引用值。
    • V getAndSet(V newValue):设置新的引用值并返回旧值。
    • boolean compareAndSet(V expect, V update):如果当前引用值等于预期值 expect,则将引用值更新为 update。如果更新成功,返回 true,否则返回 false比较的是引用地址
  3. 原子性更新操作

    • boolean weakCompareAndSet(V expect, V update):类似于 compareAndSet,但对于不支持强 CAS 操作的平台,性能更好。
    • V getAndUpdate(UnaryOperator<V> updateFunction):使用指定的函数更新当前引用值,并返回旧值。
    • V updateAndGet(UnaryOperator<V> updateFunction):使用指定的函数更新当前引用值,并返回新值。
  4. 其他方法

    • boolean weakCompareAndSetPlain(V expect, V update):类似于 weakCompareAndSet,但使用非 volatile 的语义。
    • boolean weakCompareAndSetRelease(V expect, V update):类似于 weakCompareAndSet,但使用释放模式的语义。
    • boolean weakCompareAndSetAcquire(V expect, V update):类似于 weakCompareAndSet,但使用获取模式的语义。

AtomicReference 主要用于解决多线程环境下的引用类型的原子性问题,它适用于各种场景,如实现线程安全的单例模式、并发数据结构、状态机等。通过 AtomicReference,可以避免手动编写同步代码和锁定,从而提高多线程程序的性能和可维护性。

需要注意的是,尽管 AtomicReference 提供了一种线程安全的引用操作方式,但仍需要根据具体场景和需求来选择合适的并发控制机制和数据结构。

AtomicStampedReference

AtomicStampedReference 是 Java 中 java.util.concurrent.atomic 包中的一个原子引用类,它扩展了 AtomicReference 类,允许开发者在引用的同时,为引用关联一个整数(戳记或标记),以实现更复杂的原子操作。这个整数可以用于标识引用对象的版本或状态,以解决 ABA 问题,其中一个线程修改了引用对象的状态,然后又改回原来的状态。

AtomicStampedReference 主要用于解决以下问题:

  1. ABA 问题的解决:当多个线程同时更新引用对象时,可能出现 ABA 问题,即一个线程修改引用对象的值,然后又改回原始值,而另一个线程在此期间无法区分这个变化。AtomicStampedReference 通过关联戳记值,可以检测到这种情况,因为每次更新都需要提供预期的戳记值。

  2. 解决引用对象版本问题:有时候需要在引用对象上维护版本信息或状态信息,以确保操作的正确性。AtomicStampedReference 可以用来维护这些信息,以便进行原子性的读取和更新。

以下是 AtomicStampedReference 的主要方法和用法:

  • boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp):这个方法用于比较引用对象和戳记是否与预期值匹配,如果匹配,则更新引用和戳记为新值,如果引用或戳记与预期值不匹配,则操作失败返回 false

  • V getReference():获取当前引用对象的值。

  • int getStamp():获取当前引用对象的戳记值。

  • V get(int[] stampHolder):获取当前引用对象的值,并将戳记值存储在 stampHolder 数组中。

  • boolean attemptStamp(V expectedReference, int newStamp):尝试将引用对象的戳记值更新为新值,如果引用与预期值匹配,则戳记值更新成功返回 true,否则返回 false

实现自旋锁

    public static AtomicReference<Thread> l = new AtomicReference<>();

    public static void lock() {
        Thread thread = Thread.currentThread();
        // 自旋
        while (!l.compareAndSet(null, thread)) {

        }
    }

    public static void unlock() {
        Thread thread = Thread.currentThread();
        l.compareAndSet(thread, null);
    }

    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = () -> {
            System.out.println(Thread.currentThread().getName() + " - 来了");
            lock();
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            unlock();
            System.out.println(Thread.currentThread().getName() + " - 走了");
        };

        new Thread(runnable, "线程1").start();
        new Thread(runnable, "线程2").start();
    }

原子类

Java 中的原子类(Atomic classes)主要用于实现多线程环境下的原子操作,它们提供了一种线程安全的方式来进行常见的原子性操作。这些原子类可以根据其用途和操作类型进行分类。以下是对16个主要原子类的分类:

整数原子类

  1. AtomicInteger:用于原子性操作整数值。
  2. AtomicLong:用于原子性操作长整数值。

引用类型原子类

  1. AtomicReference:用于原子性操作引用类型对象。
  2. AtomicReferenceArray:用于原子性操作引用类型的数组。
  3. AtomicStampedReference:扩展自 AtomicReference,允许在引用对象上关联戳记或标记。

布尔值原子类

  1. AtomicBoolean:用于原子性操作布尔值。

数字原子类

  1. AtomicIntegerArray:用于原子性操作整数数组中的元素。

    AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[10]);
    for (int i = 0; i < 10; i++) {
        System.*out*.println("atomicIntegerArray.incrementAndGet(i) = " + atomicIntegerArray.incrementAndGet(i));
    }
    System.*out*.println("atomicIntegerArray = " + atomicIntegerArray);
    atomicIntegerArray.compareAndSet(2, 1, 1000);
    System.*out*.println("atomicIntegerArray = " + atomicIntegerArray);
    atomicIntegerArray.addAndGet(8, 1000);
    System.*out*.println("atomicIntegerArray = " + atomicIntegerArray);
    
  2. AtomicLongArray:用于原子性操作长整数数组中的元素。

  3. AtomicIntegerFieldUpdater:用于原子性更新类的字段,只能更新 volatile int 类型的字段。

  4. AtomicLongFieldUpdater:用于原子性更新类的字段,只能更新 volatile long 类型的字段。

  5. AtomicReferenceFieldUpdater:用于原子性更新类的字段,只能更新 volatile 引用类型的字段。

累加器原子类

  1. LongAccumulator:用于实现累加操作,支持用户定义的累加函数。/əˈkjuːmjəleɪt/

        // 相加
    	public static void main(String[] args) throws InterruptedException {
            LongAccumulator longAccumulator = new LongAccumulator((a, b) -> a + b, 10);
            CountDownLatch countDownLatch = new CountDownLatch(100);
            ExecutorService pool = Executors.newFixedThreadPool(20);
            for (int i = 0; i < 100; i++) {
                pool.submit(() -> {
                    longAccumulator.accumulate(1);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            System.out.println("longAccumulator = " + longAccumulator);
            pool.shutdown();
        }
    
    
        // 相乘
        public static void main(String[] args) throws InterruptedException {
            LongAccumulator longAccumulator = new LongAccumulator((a, b) -> a * b, 1);
            CountDownLatch countDownLatch = new CountDownLatch(10);
            ExecutorService pool = Executors.newFixedThreadPool(20);
            for (int i = 0; i < 10; i++) {
                pool.submit(() -> {
                    longAccumulator.accumulate(2);
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            System.out.println("longAccumulator = " + longAccumulator);
            pool.shutdown();
        }
    
  2. DoubleAccumulator:用于实现累加操作,支持用户定义的累加函数。

  3. LongAdder

        public static void main(String[] args) throws InterruptedException {
            LongAdder longAdder = new LongAdder();
            CountDownLatch countDownLatch = new CountDownLatch(100);
            ExecutorService pool = Executors.newFixedThreadPool(20);
            for (int i = 0; i < 100; i++) {
                pool.submit(() -> {
                    longAdder.increment();
                    // longAdder.decrement();
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            System.out.println("longAdder = " + longAdder);
            pool.shutdown();
        }
    
  4. DoubleAdder

变量引用原子类

  1. AtomicMarkableReference:扩展自 AtomicReference,允许在引用对象上关联标记。

原子字段更新器

  1. AtomicReferenceArray:用于原子性更新对象数组中的元素。
  2. AtomicMarkableReference:用于原子性更新对象引用和标记。

这些原子类可以帮助开发者在多线程环境中实现高效和线程安全的操作,从简单的整数操作到复杂的数组和对象操作,都有相应的原子类可供选择。根据具体需求和场景,开发者可以选择合适的原子类来确保数据的一致性和并发性。

LongAdder和LongAccumulator性能更好

LongAdderLongAccumulator 在高并发环境下之所以快速,主要归功于它们采用了分段的方式来减小竞争。

LongAdder

  1. 分段累加LongAdder 内部维护了一个分段的累加器数组,每个段(Cell)独立负责一部分累加操作。当多个线程同时进行累加操作时,它们会分散到不同的段上,从而减小了竞争。

  2. 无锁操作:由于采用分段方式,LongAdder 不需要使用锁来保护整个累加操作,这避免了锁的开销和竞争条件。

  3. 局部性原理:分段累加器的设计遵循了局部性原理,每个线程在自己的段上进行累加,减少了线程间的通信和竞争,提高了内存访问的局部性。

  4. 高并发性:因为分段累加器的设计,LongAdder 在高并发情况下具有很好的扩展性,多个线程可以同时进行累加操作,而不会互相干扰,从而提高了性能。

LongAccumulator

  1. 支持自定义累加函数LongAccumulator 允许用户自定义累加操作的函数,这意味着可以执行更复杂的累加操作,例如求和、求最大值、求最小值等。

  2. 分段累加:类似于 LongAdderLongAccumulator 也使用了分段的方式来减小竞争,不同线程可以独立地在不同的段上执行累加操作。

  3. 自定义函数的初始化值:用户可以在创建 LongAccumulator 时指定初始值,用于初始化累加器,这有助于更灵活地处理不同类型的累加操作。

  4. 条件操作支持:与 LongAdder 不同,LongAccumulator 支持条件操作,允许用户执行诸如 getAndSet()accumulateAndGet() 等操作。

总之,LongAdderLongAccumulator 的高性能主要得益于它们的分段设计,这种设计降低了线程之间的竞争,减少了锁的使用,提高了并发性。在高并发环境下,这两个类可以更高效地处理累加操作,而不会出现性能瓶颈。这使它们成为处理计数和累加等操作的优秀选择。

LongAdderLongAccumulator的性能要比AtomicLong好,AtomicLong又比synchronized

public class Adder {
    private static int count = 0;
    public static void main(String[] args) throws InterruptedException {
        LongAccumulator longAccumulator = new LongAccumulator((a, b) -> a + b, 0);
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ExecutorService pool = Executors.newFixedThreadPool(20);

        long start = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> {
                for (int i1 = 0; i1 < 10000000; i1++) {
                    longAccumulator.accumulate(1);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("longAccumulator = " + longAccumulator);
        System.out.println("longAccumulator = " + (System.currentTimeMillis() - start));

        System.out.println("--------------------------------------------");

        CountDownLatch countDownLatch3 = new CountDownLatch(100);
        LongAdder longAdder = new LongAdder();
        start = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> {
                for (int i1 = 0; i1 < 10000000; i1++) {
                    longAdder.increment();
                }
                countDownLatch3.countDown();
            });
        }
        countDownLatch3.await();
        System.out.println("longAdder = " + longAdder);
        System.out.println("longAdder = " + (System.currentTimeMillis() - start));


        System.out.println("--------------------------------------------");



        CountDownLatch countDownLatch2 = new CountDownLatch(100);

        AtomicLong atomicLong = new AtomicLong();

        start = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> {
                for (int i1 = 0; i1 < 10000000; i1++) {
                    atomicLong.incrementAndGet();
                }
                countDownLatch2.countDown();
            });
        }
        countDownLatch2.await();
        System.out.println("atomicLong = " + atomicLong);
        System.out.println("AtomicLong = " + (System.currentTimeMillis() - start));



        System.out.println("--------------------------------------------");

        CountDownLatch countDownLatch4 = new CountDownLatch(100);


        start = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            pool.submit(() -> {
                for (int i1 = 0; i1 < 10000000; i1++) {
                    synchronized (Adder.class) {
                        count++;
                    }
                }
                countDownLatch4.countDown();
            });
        }
        countDownLatch4.await();
        System.out.println("synchronized = " + count);
        System.out.println("synchronized = " + (System.currentTimeMillis() - start));

        System.out.println("--------------------------------------------");

        pool.shutdown();
    }
}

可以看到时间对比:

longAccumulator = 1000000000
longAccumulator = 1152
--------------------------------------------
longAdder = 1000000000
longAdder = 1057
--------------------------------------------
atomicLong = 1000000000
AtomicLong = 18401
--------------------------------------------
synchronized = 1000000000
synchronized = 32326
--------------------------------------------

AtomicMarkableReference和AtomicStampedReference

AtomicMarkableReferenceAtomicStampedReference 都是 Java 中 java.util.concurrent.atomic 包中的原子引用类,它们允许在引用对象上关联额外的信息,以解决某些复杂的并发问题。但它们在如何关联额外信息和解决问题上存在一些差异。

以下是对比 AtomicMarkableReferenceAtomicStampedReference 的主要区别和用法:

1. 关联信息的类型:

  • AtomicMarkableReference:关联的额外信息是一个布尔标记(mark),用于表示引用对象的某种状态或条件。标记只有两种值:truefalse

  • AtomicStampedReference:关联的额外信息是一个整数戳记(stamp),用于表示引用对象的版本或状态信息。戳记可以是任意整数值,通常用于解决 ABA 问题。

2. 原子操作的粒度:

  • AtomicMarkableReference:原子操作针对引用对象和标记的同时更新或比较。这意味着在一次原子操作中,可以更新引用对象的同时设置或检查标记。

  • AtomicStampedReference:原子操作针对引用对象和戳记的同时更新或比较。这意味着在一次原子操作中,可以更新引用对象的同时设置或检查戳记。

3. 解决的问题:

  • AtomicMarkableReference:通常用于解决一种二进制状态的问题,例如对象是否被删除、是否可用等。标记只有两种状态。

  • AtomicStampedReference:通常用于解决需要维护多个状态或版本信息的问题,例如解决 ABA 问题,其中戳记可以是不同的整数值。

4. 方法和用法:

  • AtomicMarkableReference 提供了 compareAndSetgetgetReferenceisMarked 等方法,用于操作引用对象和标记。

  • AtomicStampedReference 提供了 compareAndSetgetgetReferencegetStamp 等方法,用于操作引用对象和戳记。

5. 示例用途:

  • AtomicMarkableReference 可以用于标记对象是否被删除、对象是否可用、对象是否被锁定等。

  • AtomicStampedReference 常用于解决 ABA 问题,其中戳记用于标识引用对象的不同版本。

选择使用哪个原子引用类取决于具体的需求和问题,如果只需要简单的二进制标记信息,可以使用 AtomicMarkableReference。如果需要维护更复杂的状态或版本信息,或者需要解决 ABA 问题,可以考虑使用 AtomicStampedReference。不过需要注意,使用这些原子引用类时,需要小心设置和处理标记或戳记,以确保线程安全和正确性。

AtomicIntegerUpdater

AtomicIntegerFieldUpdater 是 Java 中的一个原子字段更新器类,它用于原子性地更新类的字段,但仅限于 volatile int 类型的字段,不能用于private权限的zi’dua。这个类允许在不使用锁的情况下对特定字段进行原子更新操作,从而提高多线程环境下的性能。

以下是 AtomicIntegerFieldUpdater 的主要用法:

  1. 创建 AtomicIntegerFieldUpdater 对象

    首先,需要通过 newUpdater 方法创建一个 AtomicIntegerFieldUpdater 对象。这个方法接受三个参数:

    • tclass:表示包含字段的类的类型。
    • fieldName:表示要更新的字段的名称。
    • fieldName:表示字段所属类的类类型。

    示例代码:

    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    
    public class MyClass {
        public volatile int myField;
    
        public static void main(String[] args) {
            AtomicIntegerFieldUpdater<MyClass> updater = AtomicIntegerFieldUpdater.newUpdater(MyClass.class, "myField");
        }
    }
    
  2. 原子更新字段

    使用 AtomicIntegerFieldUpdater 对象,可以在多个线程之间原子性地更新字段的值。通常,会使用 compareAndSet 方法来进行原子更新。

    示例代码:

    AtomicIntegerFieldUpdater<MyClass> updater = AtomicIntegerFieldUpdater.newUpdater(MyClass.class, "myField");
    
    MyClass obj = new MyClass();
    updater.set(obj, 10); // 设置字段的初始值
    
    int newValue = 20;
    boolean result = updater.compareAndSet(obj, 10, newValue); // 原子性更新字段值
    

    在上面的示例中,compareAndSet 方法会比较字段的当前值是否等于预期值(这里是 10),如果等于,则将字段更新为新值(这里是 20)。如果更新成功,compareAndSet 返回 true,否则返回 false

  3. 其他原子更新方法

    AtomicIntegerFieldUpdater 还提供了其他一些原子更新方法,例如 getAndAddgetAndDecrementgetAndIncrement 等,可以用于对字段进行不同类型的原子操作。

需要注意以下几点:

  • AtomicIntegerFieldUpdater 只能用于实例变量,不能用于静态变量。
  • 要更新的字段必须是 volatile 类型的,以确保可见性。
  • 使用 AtomicIntegerFieldUpdater 可以提高性能,但需要小心使用,确保不会出现并发问题。

总之,AtomicIntegerFieldUpdater 是一个有用的工具类,用于在多线程环境下对特定字段进行原子更新操作,以提高性能和减少锁的使用。

例子:账户类中包含余额信息,初始余额1000,有50个线程,每次对余额减去20

public class Atomic {
    public static CountDownLatch countDownLatch = new CountDownLatch(50);

    public static void main(String[] args) throws InterruptedException {
        Account account = new Account("admin", "大人", 1000);
        ExecutorService pool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 50; i++) {
            pool.submit(() -> {
                account.setAmount(account.getAmount() - 20);
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("account = " + account);
        pool.shutdown();
    }

}

@Data
@ToString
@AllArgsConstructor
class Account {
    private String username;
    private String name;
    public volatile int amount;
}

  • 此时结果肯定是不正确的,多个线程同时对amount进行修改操作,并且没有加锁

可以借助AtomicIntegerFieldUpdater完成在没有加锁的情况下确保多个线程能够正确的修改amount属性

public class Atomic {
    public static CountDownLatch countDownLatch = new CountDownLatch(50);

    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerFieldUpdater<Account> updater = AtomicIntegerFieldUpdater.newUpdater(Account.class, "amount");
        Account account = new Account("admin", "大人", 1000);
        ExecutorService pool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 50; i++) {
            pool.submit(() -> {
                updater.addAndGet(account, -20);
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("account = " + account);
        pool.shutdown();
    }

}

@Data
@ToString
@AllArgsConstructor
class Account {
    private String username;
    private String name;
    public volatile int amount;
}

ThreadLocal

ThreadLocal 是 Java 中的一个线程局部变量类,它可以在多线程环境中为每个线程存储独立的变量副本,每个线程只能访问自己的副本,从而避免了多线程之间的数据共享和竞争条件。

以下是 ThreadLocal 的主要特点和用法:

  1. 线程局部变量ThreadLocal 可以在每个线程中创建独立的变量,每个线程都可以独立访问和修改自己的变量副本,而不会影响其他线程的副本。

  2. 线程安全性:每个线程的变量副本都是线程本地的,因此不需要额外的同步操作,不会出现多线程竞争条件,从而提高了程序的并发性能。

  3. 使用方式:通常,可以通过 ThreadLocal 类的静态方法 withInitial() 或者直接使用 ThreadLocal 的子类来创建线程局部变量,并通过 get()set() 方法来访问和修改变量的值。

    // 创建线程局部变量
    ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
    
    // 在线程中设置和获取变量值
    threadLocal.set(42);
    int value = threadLocal.get(); // 获取线程局部变量的值
    
  4. 生命周期管理:线程局部变量的生命周期与线程的生命周期一致。当线程终止时,与该线程关联的线程局部变量也会被垃圾回收,因此不会导致内存泄漏。

  5. 常见用途ThreadLocal 在多线程环境中常用于保存线程特定的上下文信息,例如数据库连接、会话信息、用户身份验证状态等。它可以避免在每个方法中显式传递这些信息,使代码更简洁。

需要注意的是,虽然 ThreadLocal 提供了一种线程封闭的机制,但过度使用它可能会导致内存泄漏问题,因为线程局部变量的副本不会自动被垃圾回收。因此,使用 ThreadLocal 时应该小心谨慎,确保在不需要的时候及时清理线程局部变量。

常用方法

ThreadLocal 类提供了一些方法用于管理线程局部变量。以下是一些常用的 ThreadLocal 方法:

  1. get() 方法

    • public T get():获取当前线程的线程局部变量的值。如果当前线程没有设置该变量的值,则返回默认值(通常是 null)。
    ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
    int value = threadLocal.get(); // 获取线程局部变量的值
    
  2. set() 方法

    • public void set(T value):设置当前线程的线程局部变量的值为指定的值。
    ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
    threadLocal.set(42); // 设置线程局部变量的值为 42
    
  3. remove() 方法

    • public void remove():从当前线程的线程局部变量中移除值。这个方法通常用于清理线程局部变量,以避免内存泄漏。
    ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
    threadLocal.set(42);
    threadLocal.remove(); // 移除线程局部变量的值
    
  4. initialValue() 方法

    • protected T initialValue():该方法在首次访问线程局部变量时被调用,用于初始化线程局部变量的值。可以通过重写这个方法来提供自定义的初始化逻辑。
    ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 0; // 自定义初始化值
        }
    };
    
  5. withInitial() 方法(Java 8及以上):

    • public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier):创建一个线程局部变量并指定初始化值的供应函数。这个方法允许您更方便地创建线程局部变量。
    ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0); // 使用 withInitial 创建线程局部变量并初始化值
    
  6. getMap(Thread t) 方法(不常用):

    • protected Map<ThreadLocal<?>, Object> getMap(Thread t):获取指定线程的线程局部变量的映射。通常不需要直接使用此方法。

remove()

在线程池中的线程使用ThreadLocal时,一定要调用remove()

在使用线程池时,如果在线程中使用了 ThreadLocal 变量,并且没有在使用完后显式调用 remove() 方法来清理线程局部变量,可能会导致线程局部变量的值在线程重用时泄漏到下一个任务中,进而引发意外的问题。这是因为线程池中的线程是被复用的,下一个任务可能会在之前的任务中设置的线程局部变量的值的基础上执行,这可能会导致不正确的结果或内存泄漏。

以下是一些可能出现的问题:

  1. 内存泄漏:如果线程局部变量中的值没有及时清理,它们将继续存在于线程的局部变量表中,从而占用内存并可能导致内存泄漏。

  2. 不正确的结果:线程局部变量的值可能会影响任务的执行结果,如果不正确地复用了线程,可能会导致任务产生不正确的结果。

  3. 资源泄漏:如果线程局部变量中存储了资源句柄(例如数据库连接、文件句柄等),没有正确释放这些资源可能导致资源泄漏。

因此,在使用线程池时,如果使用了 ThreadLocal 变量,最佳实践是在任务执行完毕后手动调用 remove() 方法清理线程局部变量的值,以确保线程复用时不会受到之前任务的影响。可以使用 try-finally 块来确保在任务执行完毕时调用 remove(),即使任务发生异常也能够正确清理线程局部变量。

ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

ExecutorService executorService = Executors.newFixedThreadPool(5);

for (int i = 0; i < 5; i++) {
    executorService.submit(() -> {
        try {
            // 在线程中使用ThreadLocal
            int value = threadLocal.get();
            // 进行任务处理
            // ...
        } finally {
            // 在任务执行完毕后清理ThreadLocal
            threadLocal.remove();
        }
    });
}

executorService.shutdown();

通过显式调用 remove() 方法,可以确保线程局部变量在任务执行完毕后被正确清理,从而避免潜在的问题。

源码分析

ThreadLocal 是 Java 标准库中的一个类,用于创建线程局部变量。下面是对 ThreadLocal 的简要源码解读:

1. 基本概述:

ThreadLocal 是一个泛型类,它有一个类型参数,用于指定线程局部变量的类型。例如,ThreadLocal<Integer> 表示创建一个整数类型的线程局部变量。

2. 内部字段:

ThreadLocal 类的源码中,有一个 ThreadLocalMap 类型的字段 threadLocals,用于存储线程局部变量的键值对。

ThreadLocal.ThreadLocalMap threadLocals = null;

3. set 方法:

set(T value) 方法用于设置当前线程的线程局部变量的值。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

首先,它获取当前线程对象 Thread.currentThread()

然后,它尝试从当前线程的 threadLocals 字段获取 ThreadLocalMap 对象,该对象用于存储线程局部变量。

如果 threadLocals 不为空,说明当前线程已经有 ThreadLocalMap,则直接在该 map 中存储键值对。

否则,调用 createMap(t, value) 方法来创建一个新的 ThreadLocalMap 并存储键值对。

4. get 方法:

get() 方法用于获取当前线程的线程局部变量的值。

public T get() {
    ThreadLocalMap map = getMap(Thread.currentThread());
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

首先,它尝试从当前线程的 threadLocals 字段获取 ThreadLocalMap 对象。

然后,它通过 map.getEntry(this) 获取与当前 ThreadLocal 关联的 Entry 对象,Entry 对象包含了线程局部变量的值。

最后,如果找到了 Entry,则将其值返回;否则,调用 setInitialValue() 方法来获取并设置初始值。

5. remove 方法:

remove() 方法用于移除当前线程的线程局部变量。

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

首先,它尝试从当前线程的 threadLocals 字段获取 ThreadLocalMap 对象。

然后,调用 m.remove(this) 来移除与当前 ThreadLocal 关联的键值对。

6. ThreadLocalMap 类:

ThreadLocalMap 类是 ThreadLocal 内部用于存储线程局部变量的键值对的数据结构。它是 ThreadLocal 类的内部类。

7. 总结:

ThreadLocal 类的核心机制是使用 Thread 对象的 threadLocals 字段来存储线程局部变量,ThreadLocalMap 用于实际的存储和管理。

每个线程都有自己独立的 ThreadLocalMap,可以在其中存储多个线程局部变量,每个 ThreadLocal 对象对应一个线程局部变量。

synchronized锁升级

在 Java 中,synchronized 锁升级是指在锁的粒度上进行提升,以提高性能和减少开销。Java 中的 synchronized 关键字有三种状态,锁可以在这些状态之间进行升级:

  1. 无锁状态(无锁):初始状态,表示没有线程持有锁。线程可以直接进入临界区执行代码。

  2. 偏向锁状态(偏向锁):是Java中一种用于提高单线程执行性能的锁优化技术。它的核心思想是,当一个线程获得锁后,JVM会假设这个线程会继续获取锁,因此会将锁标记为偏向该线程,从而使这个线程后续获取锁的操作变得更加高效。多个线程同时访问一个互斥资源,可以发现每次都有一个线程执行的次数最多,那么这个线程相当于用的是偏向锁,因为每次获得或者解锁时,都需要从内核态和用户态进行交换

  3. 轻量级锁状态(轻量级锁):当多个线程尝试进入临界区时,会进行竞争。JVM 使用 CAS 操作(Compare-And-Swap)来尝试获取锁。如果竞争不激烈,锁会升级为轻量级锁,仍然使用 CAS 操作来控制锁。

  4. 重量级锁状态(重量级锁):当多个线程竞争激烈,无法获得轻量级锁时,锁会升级为重量级锁,这时会涉及到操作系统级别的互斥操作,例如操作系统的互斥量(Mutex)来保护临界区,性能相对较差。

锁升级是为了在不同竞争程度下提供不同的性能保障。在低竞争情况下,偏向锁和轻量级锁可以显著提高性能,因为它们不涉及操作系统层面的互斥。只有在高竞争情况下,锁才会升级为重量级锁,以保证线程的公平性和安全性。

下面是对偏向锁(Biased Lock)、轻量级锁(Lightweight Lock)、重量级锁(Heavyweight Lock)的比较,包括它们的优点、缺点以及适用场景:

特性 偏向锁(Biased Lock) 轻量级锁(Lightweight Lock) 重量级锁(Heavyweight Lock)
优点 - 针对单线程场景,性能开销极低,高吞吐量。
- 线程获取锁无竞争时,快速获得锁和释放锁。
- 解锁时不需要CAS操作,提高了性能。
- 针对短时间内多线程竞争同一锁的场景。
- 自旋等待避免了进入操作系统级别的线程阻塞,提高了性能。
- 性能相对于重量级锁较好。
- 适用于多线程长时间竞争同一锁的场景。
- 提供了强大的线程协作和互斥机制。
- 适用于高并发和公平性要求高的情况。
缺点 - 不适用于多线程竞争激烈的场景,偏向锁会频繁撤销。
- 高并发情况下,会降低性能,引入自旋等待。
- 偏向线程撤销时,可能导致全局安全点(Safepoint)的停顿。
- 在多线程竞争激烈时,性能较差,自旋消耗CPU。
- 长时间的自旋等待可能导致性能下降。
- 性能较差,涉及到操作系统级别的线程阻塞和唤醒。
- 线程竞争时,存在较多的互斥操作,开销较大。
- 公平性要求高的情况可能无法满足。
适用场景 - 单线程场景或极少线程竞争的场景。
- 对性能有较高要求的单线程场景。
- 针对对象被频繁访问但竞争不激烈的情况。
- 多线程竞争不激烈的场景,短时间内多线程竞争同一锁的情况。
- 适用于线程持有锁时间短的情况。
- 针对轻量级的同步需求。
- 多线程竞争激烈的场景,长时间内多线程竞争同一锁的情况。
- 对公平性要求高的场景,需要等待队列的互斥。
- 高并发情况下,需提供强大的线程协作和同步机制。

以上比较表格总结了偏向锁、轻量级锁和重量级锁的特点,以及它们适用的场景和性能表现。选择合适的锁类型取决于应用程序的需求和线程竞争的情况。通常情况下,JVM会根据竞争情况自动选择适当的锁类型以优化性能。

Java15开始逐步废弃偏向锁

锁的升级过程

锁升级是指在多线程竞争的情况下,锁的状态由低级别的锁升级为高级别的锁的过程。在Java中,主要涉及到的锁升级过程有以下两种:

  1. 轻量级锁升级为重量级锁: 这是一种常见的锁升级过程,发生在多个线程竞争轻量级锁时。以下是升级的典型过程:

    • 初始状态:锁对象处于无锁状态。
    • 竞争开始:多个线程尝试获取锁,JVM将锁升级为轻量级锁(Lightweight Lock)。
    • 自旋:如果多个线程继续竞争锁,它们会进行自旋尝试获取锁,这时仍然是轻量级锁。
    • 自旋次数超限:如果自旋一定次数后,仍然无法获得锁,JVM会将锁升级为重量级锁(Heavyweight Lock),此时涉及到操作系统级别的互斥操作,等待队列等。
    • 轻量锁会在退出同步代码块时释放锁
  2. 偏向锁升级为重量级锁: 这种情况发生在偏向锁(Biased Lock)被撤销的情况下,如果有多个线程竞争同一个锁对象。以下是升级的典型过程:

    • 初始状态:锁对象处于无锁状态。
    • 偏向锁:一个线程获得锁对象,JVM将锁升级为偏向锁。
    • 竞争开始:其他线程也尝试获取锁对象,JVM撤销偏向锁。
    • 自旋:如果多个线程继续竞争锁,它们会进行自旋尝试获取锁,这时仍然是轻量级锁。
    • 自旋次数超限:如果自旋一定次数后,仍然无法获得锁,JVM会将锁升级为重量级锁,此时涉及到操作系统级别的互斥操作,等待队列等。
    • 偏向锁仅在竞争发生时才释放锁

需要注意的是,锁升级是由JVM自动管理的,不需要开发者手动干预。JVM会根据实际的竞争情况动态选择合适的锁状态,以平衡性能和公平性。锁升级过程中,涉及到CAS操作、自旋、等待队列等机制,这些都是为了提高多线程并发性能和确保线程安全。

AQS

AQS(AbstractQueuedSynchronizer)(抽象的队列同步器)是Java中的一个抽象框架,用于实现同步器(synchronizer)和构建各种并发性的工具和组件,如锁、信号量、倒计时器等。AQS提供了一个灵活的模板,使开发者能够自定义并发控制策略,主要解决锁分配给谁的问题,以适应不同的并发问题。

以下是AQS的主要特点和用法:

  1. 框架设计: AQS是一个抽象框架,提供了一组方法和数据结构,允许开发者实现自定义的同步器。它采用了模板方法设计模式,定义了一组抽象方法,子类必须实现这些方法来定义自己的同步逻辑。
    • 提供了一个先进先出的队列,还有一个state字段表示有没有被占用
    • 队列采用双向链表
  2. 状态管理: AQS中的同步器通常具有一个状态变量,该变量用于表示同步资源的状态。不同的同步器可以使用这个状态变量来表示不同的含义,例如,对于ReentrantLock,状态变量表示锁的占用情况。
  3. FIFO队列: AQS内部维护了一个FIFO(先进先出)的等待队列,用于管理等待获取同步资源的线程。这个队列确保了公平性,先等待的线程将先获得资源。
  4. Condition对象: AQS支持Condition对象,允许线程按条件等待和唤醒。这对于实现高级的线程协作机制非常有用,比如在生产者-消费者问题中使用。
  5. 独占锁和共享锁: AQS可以用于实现独占锁(例如ReentrantLock)和共享锁(例如ReadWriteLock)。独占锁只允许一个线程同时获得资源,而共享锁允许多个线程同时获得资源。
  6. 自定义同步器: 开发者可以继承AQS并实现其中的抽象方法来创建自定义的同步器。这使得开发者能够构建各种并发工具,满足不同的应用需求。
  7. 常见同步器: Java标准库中包含了许多基于AQS的同步器,如ReentrantLock、Semaphore、CountDownLatch等,它们都是通过AQS框架来实现的。

应用

Java中的JUC(Java Util Concurrent)库中有许多类使用了AQS(AbstractQueuedSynchronizer)来实现线程同步和并发控制。以下是一些常见的JUC类,它们使用了AQS作为底层机制:

  1. ReentrantLock: 可重入锁的实现,允许线程获取锁的时候可以嵌套获取,使用AQS来管理锁的状态和等待队列。

  2. Semaphore: 信号量的实现,允许多个线程同时访问一定数量的资源,AQS用于管理许可的获取和释放。

  3. CountDownLatch: 倒计时器,用于等待一组线程完成特定操作,AQS用于管理等待线程和计数。

  4. CyclicBarrier: 循环屏障,用于等待一组线程到达某个共同点后再继续执行,AQS用于管理等待线程和栅栏状态。

  5. Phaser: 分阶段的屏障,允许线程在不同阶段进行同步,AQS用于管理不同阶段的等待线程。

  6. ReentrantReadWriteLock: 读写锁的实现,允许多个线程同时读取共享资源,但只允许一个线程写入共享资源,AQS用于管理读锁和写锁的状态。

  7. LinkedBlockingQueue: 链表阻塞队列的实现,AQS用于管理队列的入队和出队操作。

  8. ThreadPoolExecutor: 线程池的实现,使用AQS来管理线程池的状态和等待队列。

  9. FutureTask: 用于异步任务的实现,AQS用于等待任务的完成。

Node类的设计

AQS(AbstractQueuedSynchronizer)中的Node类是AQS框架的核心数据结构之一,用于表示等待队列中的节点。Node类的设计主要用于实现FIFO(先进先出)的等待队列,其中每个节点代表一个等待获取共享资源的线程。

以下是Node类的主要设计和字段解释:

  1. prev: 该字段用于指向前一个等待线程的Node,形成了一个双向链表结构,方便了在等待队列中的节点的快速删除。

  2. next: 该字段用于指向后一个等待线程的Node,同样用于维护双向链表结构。

  3. thread: 表示持有该节点的线程,通常是等待获取锁的线程。

  4. waitStatus: 用于表示节点的等待状态,具体状态包括以下几种:

    • CANCELLED(值为1):节点已取消,通常由于超时或中断引起。
    • SIGNAL(值为-1):后续节点等待被唤醒。
    • CONDITION(值为-2):节点在Condition等待队列中等待。
    • PROPAGATE(值为-3):表示释放共享锁时需要唤醒其他线程,相当于将同步状态传播下去。
    • 0:表示初始状态或正常等待状态。
  5. nextWaiter: 表示节点的等待类型,具体类型包括以下几种:

    • SHARED:表示线程等待获取共享资源。
    • EXCLUSIVE:表示线程等待获取独占资源。

Node类的设计是为了支持AQS的等待队列和节点状态管理。等待队列中的节点按照线程等待的顺序排列,这种FIFO的结构确保了公平性,先等待的线程将先被唤醒。等待状态和等待类型字段允许AQS对节点的状态进行细致的控制,例如,在释放共享锁时,可以选择唤醒哪些线程。

从ReentrantLock类的角度看AQS

  • 内部有一个Sync sync类型的成员常量

  • Sync extends AbstractQueuedSynchronizer

  •     public ReentrantLock() {
            sync = new NonfairSync();
        }
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
  • ReentrantLocklock()方法调用的就是synclock()方法,而Sync类是一个抽象类,有两个实现类:NonfairSync非公平、FairSync公平

  •     final void lock() {
            if (!initialTryLock())
                acquire(1);
        }
    
  • 整体的思路是:

    • 非公平锁
      • 一上来就直接对当前的状态位进行比较并替换
      • 如果替换成功就代表抢占成功
      • 如果替换失败,调用getExclusiveOwnerThread判断一下当前是否是当前线程
      • 否则返回false,再调用acquire(1)
    • 公平锁
      • 先判断当前状态位是否为0,如果是,那么对当前的状态位进行比较并替换
      • 如果替换成功就代表抢占成功
      • 如果替换失败,调用getExclusiveOwnerThread判断一下当前是否是当前线程
      • 否则返回false,再调用acquire(1)
  •     // 非公平锁
            final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
                setExclusiveOwnerThread(current);
                return true;
            } else if (getExclusiveOwnerThread() == current) {
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }
    
  •     // 公平锁
      		final boolean initialTryLock() {
              Thread current = Thread.currentThread();
              int c = getState();
              if (c == 0) {
                  if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                      setExclusiveOwnerThread(current);
                      return true;
                  }
              } else if (getExclusiveOwnerThread() == current) {
                  if (++c < 0) // overflow
                      throw new Error("Maximum lock count exceeded");
                  setState(c);
                  return true;
              }
              return false;
          }
    
  • 在代码中可以看出,lock()最终调用LockSupport.park(this);将线程进行阻塞的

  • unlock()最终调用LockSupport.unpark(s.waiter);将线程唤醒

读写锁

读写锁的降级是指在获取写锁的情况下,允许线程在不释放写锁的情况下获取读锁,从而实现写锁向读锁的降级。这个过程可以提供更高的灵活性和性能。

下面是读写锁的降级过程:

  1. 一个线程首先获取了写锁,这意味着它独占了共享资源,其他线程无法同时获取读锁或写锁。

  2. 在获取了写锁之后,该线程可以安全地进行写操作,因为它独占了资源。

  3. 但是,如果这个线程需要执行读操作,它可以在不释放写锁的情况下获取读锁。这样,它可以在获取写锁的同时执行读操作,从而避免了写锁的开销。

  4. 当线程完成读操作后,它可以继续执行其他读操作,或者最终释放读锁。

  5. 当线程完成所有读操作后,它可以释放写锁,允许其他线程获取写锁。

读写锁的降级允许写操作的线程在不释放写锁的情况下执行读操作,这对于某些应用场景非常有用,因为它可以减少锁争用和线程切换的开销。然而,降级必须谨慎使用,因为它可能引入复杂性和潜在的问题,例如死锁或数据不一致。

要成功实现读写锁的降级,需要满足以下条件:

  • 写锁必须允许降级为读锁。
  • 降级过程必须保证数据的一致性,不应该引入数据不一致的风险。
  • 线程必须小心释放锁的顺序,以避免死锁。

StampedLock

StampedLock是Java 8引入的一种新型的锁机制,它提供了三种访问模式:读模式、写模式和乐观读模式。StampedLock的设计目标是在某些特定情况下提供比ReentrantReadWriteLock更好的性能,尤其是在读操作非常频繁而写操作相对较少的情况下。

下面是StampedLock的三种访问模式和其特点:

  1. 读模式(Read Lock): 与传统的读写锁相似,多个线程可以同时获取读锁,但不能同时获取写锁。读锁被持有时,其他线程可以继续获取读锁,但不能获取写锁。读锁可以降级为乐观读模式。

  2. 写模式(Write Lock): 写锁是独占锁,同一时刻只能被一个线程持有。当一个线程持有写锁时,其他线程不能获取读锁或写锁。写锁不支持锁降级。

  3. 乐观读模式(Optimistic Read): 乐观读模式是一种特殊的读模式,它不会阻塞其他线程的写操作。在乐观读模式下,线程不会真正获取读锁,而是返回一个标记(stamp),表示当前的版本号。线程可以在后续操作中验证该标记是否仍然有效,如果有效,则认为读操作成功,否则需要进行其他处理。乐观读模式适用于读操作非常频繁而且写操作非常罕见的情况。

StampedLock的一些特点和注意事项:

  • StampedLock是不可重入的,同一个线程不能重复获取同一个锁。
  • 当使用乐观读模式时,需要小心处理标记(stamp)的验证,确保数据一致性。
  • 在写模式下,不支持锁降级,即不能从写锁降级为读锁。
  • StampedLock的性能在某些特定场景下可能会比ReentrantReadWriteLock更好,但在一般情况下,性能可能类似或略逊于后者。

总的来说,StampedLock提供了一种灵活的锁机制,适用于特定的读多写少场景,特别是在乐观读模式下,可以提供更好的性能。但需要谨慎使用,因为它的特性和使用方式与传统的读写锁有一些不同

异步回调

CompletableFuture相当于异步回调,任务一创建就开始执行,supply中文为供应

// 用于没有返回值的异步回调
CompletableFuture.runAsync(Runnable);
// 有返回值的异步回调
CompletableFuture.supplyAsync(xxx)

可以传入第二个参数,第二个参数代表线程池

无参数的:

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "无参数");
});
try {
    completableFuture.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
  • 不调用get()也能够执行,如果后台没有用户线程,那么将看不到执行结果
  • 也可以调用join()join()get()的区别是join()没有受检查的异常

有返回值的:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "有参数");
    return 512;
});
try {
    completableFuture.whenComplete((t, u) -> {
        System.out.println("t(方法返回值) = " + t);
        System.out.println("u(异常的信息) = " + u);
    }).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
// ==========================
        CompletableFuture<Integer> task4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始一段非常耗时的计算...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("计算完成");
            return 10 + 2;
        }, executor);
        System.out.println("task4.get() = " + task4.get());
  • 如果没有异常,参数1为返回的值,参数2为null
  • 如果有异常,参数1为null,参数2为异常的信息
  • 调用get()方法,会使调用这个方法的线程阻塞
        CompletableFuture<String> three = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "6666";
        });
        Thread.sleep(1500);
        System.out.println("three.getNow(\"ssss\") = " + three.getNow("ssss"));
  • getNow(val)计算完成返回完成后的值,没有计算完成返回传入的值
        CompletableFuture<String> three = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "6666";
        });
        System.out.println("打断结果:" + three.complete("打断") + ",取出的值:" + three.join());
  • complete(val)如果方法没有执行完毕,那么就直接打断,并且将返回值设置为val

完成时的回调

  • whenComplete,完成后的回调,可以接收两个参数,返回结果和抛出的异常,相当于直接在之前的线程中继续
  • whenCompleteAsync,完成后回调,相当于异步执行,可以指定线程池
  • exceptionally,发生异常的回调,有一个返回值,如果在执行任务的过程中发生了异常,并且在另一个线程中调用了get()方法,此时另一个线程将会抛出异常,也就是这行语句之后的其他代码将不会执行。可以使用这个回调指定发生异常后返回的一个默认值,此时在get()方法取出的值就是这个回调返回的值。
  • handle,完成后的回调,无论成功或者失败都会调用,可以接收两个参数,返回结果和抛出的异常,需要有一个返回值,返回值为处理结果,返回的结果影响外部get()方法获取的结果
        CompletableFuture<Integer> task4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始一段非常耗时的计算...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("计算完成,线程名称:" + Thread.currentThread().getName());
            return 10 / 0;
        }, executor).whenComplete((r, e) -> {
            System.out.println("这是计算完成后的一个回调,结果为:" + r + "异常为:" + e+ ",线程名称:" + Thread.currentThread().getName());
        }).exceptionally(e -> {
            System.out.println("这是计算完成后发生异常的一个回调,异常为:" + e + ",线程名称:" + Thread.currentThread().getName());
            return -1;
        })
        CompletableFuture<Integer> task4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始一段非常耗时的计算...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("计算完成,线程名称:" + Thread.currentThread().getName());
            return 10 / 0;
        }, executor).handle((r, e) -> {
            System.out.println("当前计算后的结果为:" + r + ",发生的异常为:" + e);
            return r;
        });

线程串行化

应用场景:两个线程,第二个线程需要等到第一个线程执行完成后拿到结果才能继续

  • thenRun,接收一个runnable,相当于不要上一步的执行结果
  • thenAccept,接收一个Consumer函数式接口,相当于拿到上一步的结果
  • thenApply,接收一个Function函数式接口,相当于拿到上一步结果之后还能够再返回一个结果

以上三个都能够异步执行

        CompletableFuture.supplyAsync(() -> {
            System.out.println("正在进行一段非常耗时的计算....");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor).thenRun(() -> {
            System.out.println("运算结束后执行的子任务,这个子任务无法获取运算结果");
        });
        CompletableFuture.supplyAsync(() -> {
            System.out.println("正在进行一段非常耗时的计算....");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor).thenAccept(result -> {
            System.out.println("开始执行子任务,上一步的执行结果为:" + result);
        });
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("正在进行一段非常耗时的计算....");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor).thenApply(result -> {
            System.out.println("开始执行子任务,上一步的执行结果为:" + result);
            return result * 1000;
        });
        System.out.println("integerCompletableFuture.get() = " + integerCompletableFuture.get());

双任务组合

应用场景:有三个线程,线程3需要等线程1和线程2完成后再进行

  • runAfterBoth组合两个线程,需要传入runnable
  • thenAcceptBoth组合两个线程,可以接收两个返回值
  • thenCombine组合两个线程,可以接收两个返回值,并且还可以返回一个值
                CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
		task1.runAfterBoth(task2, () -> {
            System.out.println("前两个任务执行完了,当前所在线程为:" + Thread.currentThread().getName());
        });
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        task1.thenAcceptBoth(task2, (r1, r2) -> {
            System.out.println("前两个任务执行完了,返回值r1 = " + r1 + ",返回值r2 = " + r2 + ",当前所在线程为:" + Thread.currentThread().getName());
        });
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        CompletableFuture<Integer> task3 = task1.thenCombine(task2, (r1, r2) -> {
            System.out.println("前两个任务执行完了,返回值r1 = " + r1 + ",返回值r2 = " + r2 + ",当前所在线程为:" + Thread.currentThread().getName());
            return r1 + r2;
        });
        Integer sum = task3.get();
        System.out.println("sum = " + sum);

有时候也会遇到下面的场景:有三个线程,只要线程1和线程2其中一个线程完成了,线程3就执行

  • runAfterEither:没有参数、没有返回值
  • acceptEither:接收一个参数,没有返回值,这个参数前两个线程中已执行完返回值
  • applyToEither:接收一个参数,有返回值,这个参数前两个线程中已执行完返回值
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        task1.runAfterEither(task2, () -> {
            System.out.println("前两个任务有一个执行完了,当前所在线程为:" + Thread.currentThread().getName());
        });
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        task1.acceptEither(task2, r -> {
            System.out.println("前两个任务有一个执行完了,当前所在线程为:" + Thread.currentThread().getName() + "其中一个的返回值为:" + r);
        });
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        CompletableFuture<Integer> task3 = task1.applyToEither(task2, r -> {
            System.out.println("前两个任务执行完了,返回值r = " + r + ",当前所在线程为:" + Thread.currentThread().getName());
            return r;
        });
        Integer sum = task3.get();
        System.out.println("sum = " + sum);
class Test {
    public static void main(String[] args) {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "a";
        });
        CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "b";
        });
        CompletableFuture<String> result = a.applyToEither(b, f -> f + "更快");
        System.out.println("result.join() = " + result.join());
    }
}

多任务组合

应用场景:有三个线程,线程3需要等线程1和线程2完成后再进行

  • allOf:等待所有的任务完成
  • anyOf:任意一个任务完成
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第三个个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 4;
        }, executor);

        CompletableFuture<Void> wait = CompletableFuture.allOf(task1, task2, task3);
        wait.get();
        System.out.println("全部执行完毕");
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第一个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 2;
        }, executor);
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第二个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 5;
        }, executor);
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始进行第三个个非常耗时的计算,所在线程为:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10 / 3;
        }, executor);

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task1, task2, task3);
        System.out.println("有一个执行完了,其中的返回值为:" + anyOf.get());

Q.E.D.


念念不忘,必有回响。