JUC

是指java.util.concurrent包下的工具类,JDK1.5之后加入的

concurrent中文为同时发生

线程的状态:

  • new,新建
  • runnable,准备就绪
  • blocked,阻塞
  • waiting,等待,直到有反应
  • timed waiting,也是等待,但过时不候
  • terminated,终止

sleep和wait区别:

  • sleep不会释放锁,是Thread类下的静态方法
  • wait会释放锁,并且只能在synchronized中使用,是Object类下的静态方法
  • 两者都可以被Thread.interrupted();进行中断
  • 在哪里睡就在哪里醒

串行:

  • 任务按照一定的顺序执行
  • 一次只能执行一个任务

并行:

  • 多个任务同时执行

并发:

  • 同一时刻,多个线程访问同一个资源

管程:monitor

  • 中文为监视器
  • 就是平时所说的锁
  • 是一种同步机制,保证同一时间只有一个线程访问被保护的代码

用户线程:

  • 自定义的线程都是属于用户线程

守护线程:

  • 后台中特殊的线程,例如垃圾回收线程

  • 查看当前线程是否是守护线程

    System.out.println(Thread.currentThread().isDaemon());
    
  • 设置为守护线程

    Thread.currentThread().setDaemon(true)
    

JVM会将在所有的用户线程执行完之后自动退出,如果所有的线程都是守护线程JVM也会自动关闭

public static void main(String[] args) {
    Thread thread = new Thread(() -> System.out.println("一个线程" + Thread.currentThread().getName()), "aa");
    // 设置为守护线程
    thread.setDaemon(true);
    thread.start();
    System.out.println("主线程结束");
}

以上代码只会执行主线程,只会输出主线程结束

设置一个线程是否是守护线程只能在线程启动之前,即调用这个线程的start()方法之前

import lombok.SneakyThrows;

import java.util.concurrent.*;

public class Main {
    @SneakyThrows
    public static void main(String[] args) {
        Thread thread = new Thread(() -> System.out.println("一个线程" + Thread.currentThread().getName()), "aa");
        Thread thread2 = new Thread(() -> System.out.println("一个线程" + Thread.currentThread().getName()), "22");
        thread.setDaemon(true);
        thread.start();
        thread2.start();
        System.out.println("主线程结束");
    }
}

以上代码两个线程都会执行,因为有一个线程不是守护线程

Lock接口

java.util.concurrent.locks包提供,可以实现手动的加锁和解锁

ReentrantLockReentrant读音为ˌriˈɛntrənt,中文为可重入的,表示这个锁可以重复的被使用

Lock接口和synchronized关键字区别:

  • Lock是接口,后者是关键字 是Java内置的实现
  • 如果发生异常,Lock如果不使用unLock()释放锁,可能会造成死锁,synchronized在发生;异常时会自动释放锁
  • Lock可以获取有没有成功的获取到锁,synchronized不可以
  • Lock可以让等待的线程中断
  • Lock可以实现提高多个线程进行操作的效率,如果资源竞争激烈,Lock效率更高,如果竞争不激烈 两者差别不大

线程之间的通信

可以使用锁配合wait()notify()让多个线程交替的执行

wait方法尽量只在循环中使用,如果在判断语句中使用会有虚假唤醒问题

使用Lock接口实现线程之间的通信

即使用这个接口实现wait()notify()方法

condition中文为条件,读音为kənˈdɪʃn

可以通过Lock实例的newCondition()方法获取一个Condition类型的实例,通过获取到的实例,调用:

  • condition.signal()随机唤醒一个线程,signal读音为ˈsɪɡnəl中文为信号
  • condition.signalAll()唤醒所有线程
  • condition.await()可让当前线程等待

下例为两个线程交替

import lombok.SneakyThrows;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        // 获取一个Condition实例
        Condition condition = lock.newCondition();
        Runnable runnable = () -> {
            for (int i = 0; i < 100; i++) {
                // 加锁
                lock.lock();
                try {
                    // 唤醒其他线程
                    condition.signalAll();
                    System.out.println(Thread.currentThread().getName());
                } finally {
                    try {
                        // 让当前线程等待
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lock.unlock();
                }
            }
        };
        Thread thread1 = new Thread(runnable, "线程1");
        Thread thread2 = new Thread(runnable, "线程2");
        thread1.start();
        thread2.start();
    }
}

ReentrantLock(false/true)

  • false是非公平锁,缺点是容易造成线程饿死,优点是效率高
  • true是公平锁,缺点是效率低,使用的是队列用来维护顺序

可重入锁(递归锁)

特点是如果有多层锁,那么这个锁可以进入内层,也就是说用的同一把锁

递归例子:

public synchronized void fun() {
    fun()
}

根据同步方法的概念得知,如果没有释放锁,那么这个方法不能够被调用,但上述代码却可以在没有释放锁的情况下自己调用自己,并最终抛出栈溢出的异常,由此得名:递归锁

image-20220225093032285
  • synchronized是隐式的可重入锁

    • 即能够自动解锁
  • Lock是显式的可重入锁

    • public static void main(String[] args) {
          new Thread(() -> {
              lock.lock();
              try {
                  System.out.println("外层");
                  lock.lock();
                  try {
                      System.out.println("中层");
                      lock.lock();
                      try {
                          System.out.println("内层");
                      } finally {
                          lock.unlock();
                      }
                  } finally {
                      lock.unlock();
                  }
      
              } finally {
                  lock.unlock();
              }
          }).start();
      }
      
  • 也可以在内层只上锁,不解锁,如果将以上代码改以下形式也能正常执行,不过也存在隐患

    • public static void main(String[] args) {
          new Thread(() -> {
              lock.lock();
              try {
                  System.out.println("外层");
                  lock.lock();
                  try {
                      System.out.println("中层");
                      lock.lock();
                      try {
                          System.out.println("内层");
                      } finally {
                      }
                  } finally {
                  }
      
              } finally {
                  lock.unlock();
              }
          }).start();
      }
      
  • 隐患:如果再有一个线程使用这把锁,会造成死锁,这个线程会一直处于等待状态而不往下执行

    •     public static Lock lock = new ReentrantLock(true);
      
      
          public static void main(String[] args) {
              new Thread(() -> {
                  lock.lock();
                  try {
                      System.out.println("外层");
                      lock.lock();
                      try {
                          System.out.println("中层");
                          lock.lock();
                          try {
                              System.out.println("内层");
                          } finally {
      //                        lock.unlock();
                          }
                      } finally {
      //                    lock.unlock();
                      }
      
                  } finally {
                      lock.unlock();
                  }
              }).start();
      
              new Thread(() -> {
                  lock.lock();
                  try {
                      System.out.println("第二个");
                  } finally {
                      lock.unlock();
                  }
              }).start();
          }
      
    • 以上代码会在第二个线程处于等待状态,原因是无法获得锁,因为未解锁

线程之间定制化通信

例如为每个线程指定最多的执行次数,设置每个线程先后执行的顺序

image-20220222171404530

集合的线程安全

ArrayList是线程不安全的

import lombok.SneakyThrows;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock(true);

    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 4));
                System.out.println(Thread.currentThread().getName() + ", list.size() = " + list.size() + ", list = " + list);
            }, "线程" + i).start();
        }
    }
}

以上代码如果多执行几次会抛出并发修改异常,以下为异常的内容

Exception in thread "线程8353" java.util.ConcurrentModificationException
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967)
	at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:456)
	at java.base/java.lang.StringConcatHelper.stringOf(StringConcatHelper.java:453)
	at Main.lambda$main$0(Main.java:19)
	at java.base/java.lang.Thread.run(Thread.java:833)

针对以上情况有3种解决方案:

  • 方案1 使用Vector,这个类的所有方法都是线程安全的

  • 方案2 使用Collections工具类,这个工具类有一个静态方法Collections.synchronizedList(List实例)可以使其返回一个线程安全的List实例

    • List<String> list = Collections.synchronizedList(new ArrayList<>());
      
    • 上两种方案都是古老的解决方法

  • 方案3 使用CopyOnWriteArrayList

    • 这个类使用了写时复制
    • 读的时候支持并发读
    • 写支持独立写
    • 过程:
      • 写的时候先复制一遍之前的集合
      • 写的时候在新复制的集合中写
      • 写完之后再合并到之前的集合中

HashSetHashMap都是线程不安全的

  • 针对于HashSet的解决方案为:使用CopyOnWriteArraySet
  • 针对于HashMap的解决方案为:使用ConcurrentHashMap

不同的锁

如果在方法上添加synchronized关键字(没有static),此时锁的是当前对象this

如果有两个不同的对象,那么会有两个不同的锁分别锁两个对象

静态方法添加synchronized,此时锁的是当前类的Class

如果是一个同步的静态方法和一个同步的普通方法,此时两者不是相同的锁

死锁

两个或者两个以上的线程在争夺资源造成相互等待现象,如果没有外力干涉,将无法继续执行

image-20220225164925879

线程1持有锁A,线程2持有锁B,线程1想要获取锁B,线程2想要获取锁A

死锁代码演示

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock(true);


    public static void main(String[] args) {
        Runnable runnable1 = () -> {
            synchronized (Main.class) {
                System.out.println("持有Main.class,试图获取String.class");
                synchronized (String.class) {
                    System.out.println("持有Main.class和String.class");
                }
            }
        };
        Runnable runnable2 = () -> {
            synchronized (String.class) {
                System.out.println("持有String.class,试图获取Main.class");
                synchronized (Main.class) {
                    System.out.println("持有String.class和Main.class");
                }
            }
        };
        new Thread(runnable1).start();
        new Thread(runnable2).start();
    }
}

此时输出结果为:

持有Main.class,试图获取String.class
持有String.class,试图获取Main.class

程序不会停止

验证是否死锁

可以在终端使用JVM自带的命令jps -l列出现在的正在运行的类

例如:

PS C:\Users\singx\OneDrive\Java\IDEA\AJAX> jps -l
19648 jdk.jcmd/sun.tools.jps.Jps
3872 org.jetbrains.jps.cmdline.Launcher
10996 Main
10684
14380 org.jetbrains.idea.maven.server.RemoteMavenServer36

在使用JVM的堆栈追踪工具jstack id

PS C:\Users\singx\OneDrive\Java\IDEA\AJAX> jstack 10996
2022-02-25 17:02:08
Full thread dump Java HotSpot(TM) 64-Bit Server VM (17.0.1+12-LTS-39 mixed mode, sh
aring):

Threads class SMR info:
_java_thread_list=0x00000270aac1b530, length=15, elements={
0x00000270a9fb9590, 0x00000270a9fba310, 0x00000270a9fd9c10, 0x00000270a9fda4d0,
0x00000270a9fdb3a0, 0x00000270a9fdbc60, 0x00000270a9fe8630, 0x00000270a9fe8fe0,
0x00000270a9ff40c0, 0x00000270a9fa25e0, 0x00000270aac31f80, 0x00000270aac32450,
0x00000270aab88bf0, 0x00000270aab890c0, 0x0000027086f56c20
}

"Reference Handler" #2 daemon prio=10 os_prio=2 cpu=0.00ms elapsed=451.47s tid=0x00
000270a9fb9590 nid=0x1b24 waiting on condition  [0x000000c32afff000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.ref.Reference.waitForReferencePendingList(java.base@17.0.1/Nat
ive Method)
        at java.lang.ref.Reference.processPendingReferences(java.base@17.0.1/Refere
nce.java:253)
        at java.lang.ref.Reference$ReferenceHandler.run(java.base@17.0.1/Reference.
java:215)

"Finalizer" #3 daemon prio=8 os_prio=1 cpu=0.00ms elapsed=451.47s tid=0x00000270a9f
ba310 nid=0x1ec0 in Object.wait()  [0x000000c32b0fe000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(java.base@17.0.1/Native Method)
        - waiting on <0x000000071240d5a8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(java.base@17.0.1/ReferenceQueue.java
:155)
        - locked <0x000000071240d5a8> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(java.base@17.0.1/ReferenceQueue.java
:176)
        at java.lang.ref.Finalizer$FinalizerThread.run(java.base@17.0.1/Finalizer.j
ava:172)

"Signal Dispatcher" #4 daemon prio=9 os_prio=2 cpu=0.00ms elapsed=451.46s tid=0x000
00270a9fd9c10 nid=0x4380 waiting on condition  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Attach Listener" #5 daemon prio=5 os_prio=2 cpu=15.62ms elapsed=451.46s tid=0x0000
0270a9fda4d0 nid=0x279c waiting on condition  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Service Thread" #6 daemon prio=9 os_prio=0 cpu=0.00ms elapsed=451.46s tid=0x000002
70a9fdb3a0 nid=0x4d90 runnable  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Monitor Deflation Thread" #7 daemon prio=9 os_prio=0 cpu=0.00ms elapsed=451.46s ti
d=0x00000270a9fdbc60 nid=0x4f30 runnable  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #8 daemon prio=9 os_prio=2 cpu=46.88ms elapsed=451.46s tid=0x0
0000270a9fe8630 nid=0x49bc waiting on condition  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   No compile task

"C1 CompilerThread0" #11 daemon prio=9 os_prio=2 cpu=46.88ms elapsed=451.46s tid=0x
00000270a9fe8fe0 nid=0x4e58 waiting on condition  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   No compile task

"Sweeper thread" #12 daemon prio=9 os_prio=2 cpu=0.00ms elapsed=451.46s tid=0x00000
270a9ff40c0 nid=0x48e4 runnable  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Common-Cleaner" #13 daemon prio=8 os_prio=1 cpu=15.62ms elapsed=451.42s tid=0x0000
0270a9fa25e0 nid=0x25cc in Object.wait()  [0x000000c32b8ff000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(java.base@17.0.1/Native Method)
        - waiting on <0x00000007125212c0> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(java.base@17.0.1/ReferenceQueue.java
:155)
        - locked <0x00000007125212c0> (a java.lang.ref.ReferenceQueue$Lock)
        at jdk.internal.ref.CleanerImpl.run(java.base@17.0.1/CleanerImpl.java:140)
        at java.lang.Thread.run(java.base@17.0.1/Thread.java:833)
        at jdk.internal.misc.InnocuousThread.run(java.base@17.0.1/InnocuousThread.j
ava:162)

"Monitor Ctrl-Break" #14 daemon prio=5 os_prio=0 cpu=15.62ms elapsed=451.35s tid=0x
00000270aac31f80 nid=0x4fd8 runnable  [0x000000c32bafe000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.SocketDispatcher.read0(java.base@17.0.1/Native Method)
        at sun.nio.ch.SocketDispatcher.read(java.base@17.0.1/SocketDispatcher.java:
46)
        at sun.nio.ch.NioSocketImpl.tryRead(java.base@17.0.1/NioSocketImpl.java:261
)
        at sun.nio.ch.NioSocketImpl.implRead(java.base@17.0.1/NioSocketImpl.java:31
2)
        at sun.nio.ch.NioSocketImpl.read(java.base@17.0.1/NioSocketImpl.java:350)
        at sun.nio.ch.NioSocketImpl$1.read(java.base@17.0.1/NioSocketImpl.java:803)

        at java.net.Socket$SocketInputStream.read(java.base@17.0.1/Socket.java:966)

        at sun.nio.cs.StreamDecoder.readBytes(java.base@17.0.1/StreamDecoder.java:2
70)
        at sun.nio.cs.StreamDecoder.implRead(java.base@17.0.1/StreamDecoder.java:31
3)
        at sun.nio.cs.StreamDecoder.read(java.base@17.0.1/StreamDecoder.java:188)
        - locked <0x00000007121dfe70> (a java.io.InputStreamReader)
        at java.io.InputStreamReader.read(java.base@17.0.1/InputStreamReader.java:1
77)
        at java.io.BufferedReader.fill(java.base@17.0.1/BufferedReader.java:162)
        at java.io.BufferedReader.readLine(java.base@17.0.1/BufferedReader.java:329
)
        - locked <0x00000007121dfe70> (a java.io.InputStreamReader)
        at java.io.BufferedReader.readLine(java.base@17.0.1/BufferedReader.java:396
)
        at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:49)


"Notification Thread" #15 daemon prio=9 os_prio=0 cpu=0.00ms elapsed=451.35s tid=0x
00000270aac32450 nid=0x12a8 runnable  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Thread-0" #16 prio=5 os_prio=0 cpu=0.00ms elapsed=451.34s tid=0x00000270aab88bf0 n
id=0x45a0 waiting for monitor entry  [0x000000c32bdff000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at Main.lambda$main$0(Main.java:14)
        - waiting to lock <0x0000000712400c38> (a java.lang.Class for java.lang.Str
ing)
        - locked <0x0000000712161ef8> (a java.lang.Class for Main)
        at Main$$Lambda$14/0x0000000800c01200.run(Unknown Source)
        at java.lang.Thread.run(java.base@17.0.1/Thread.java:833)

"Thread-1" #17 prio=5 os_prio=0 cpu=0.00ms elapsed=451.34s tid=0x00000270aab890c0 n
id=0x120c waiting for monitor entry  [0x000000c32befe000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at Main.lambda$main$1(Main.java:22)
        - waiting to lock <0x0000000712161ef8> (a java.lang.Class for Main)
        - locked <0x0000000712400c38> (a java.lang.Class for java.lang.String)
        at Main$$Lambda$15/0x0000000800c01418.run(Unknown Source)
        at java.lang.Thread.run(java.base@17.0.1/Thread.java:833)

"DestroyJavaVM" #18 prio=5 os_prio=0 cpu=125.00ms elapsed=451.34s tid=0x0000027086f
56c20 nid=0x1228 waiting on condition  [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"VM Thread" os_prio=2 cpu=0.00ms elapsed=451.47s tid=0x00000270a9fb5a90 nid=0x16cc
runnable

"GC Thread#0" os_prio=2 cpu=0.00ms elapsed=451.48s tid=0x0000027086fa7670 nid=0x43c
4 runnable

"G1 Main Marker" os_prio=2 cpu=0.00ms elapsed=451.48s tid=0x0000027086fb8240 nid=0x
4dac runnable

"G1 Conc#0" os_prio=2 cpu=0.00ms elapsed=451.48s tid=0x0000027086fb8b60 nid=0x1a94
runnable

"G1 Refine#0" os_prio=2 cpu=0.00ms elapsed=451.47s tid=0x00000270a9e844f0 nid=0x10e
4 runnable

"G1 Service" os_prio=2 cpu=0.00ms elapsed=451.47s tid=0x000002708701a0f0 nid=0x83c
runnable

"VM Periodic Task Thread" os_prio=2 cpu=0.00ms elapsed=451.35s tid=0x0000027086fc07
90 nid=0x4168 waiting on condition

JNI global refs: 15, weak refs: 0


Found one Java-level deadlock:
=============================
"Thread-0":
  waiting to lock monitor 0x00000270aaa370b0 (object 0x0000000712400c38, a java.lan
g.Class),
  which is held by "Thread-1"

"Thread-1":
  waiting to lock monitor 0x00000270aaa359f0 (object 0x0000000712161ef8, a java.lan
g.Class),
  which is held by "Thread-0"

Java stack information for the threads listed above:
===================================================
"Thread-0":
        at Main.lambda$main$0(Main.java:14)
        - waiting to lock <0x0000000712400c38> (a java.lang.Class for java.lang.Str
ing)
        - locked <0x0000000712161ef8> (a java.lang.Class for Main)
        at Main$$Lambda$14/0x0000000800c01200.run(Unknown Source)
        at java.lang.Thread.run(java.base@17.0.1/Thread.java:833)
"Thread-1":
        at Main.lambda$main$1(Main.java:22)
        - waiting to lock <0x0000000712161ef8> (a java.lang.Class for Main)
        - locked <0x0000000712400c38> (a java.lang.Class for java.lang.String)
        at Main$$Lambda$15/0x0000000800c01418.run(Unknown Source)
        at java.lang.Thread.run(java.base@17.0.1/Thread.java:833)

Found 1 deadlock.

此时看最后一行,给出了Found 1 deadlock.的提示,中文为发现一个死锁

Callable接口

重写的是call()方法,如果无法计算将抛出异常,是有返回值的

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock(true);


    public static void main(String[] args) {
        Callable<String> callable = () -> {
            Thread.sleep(1000);
            System.out.println("123");
            return "good";
        };
        // 作为中间类,这个类实现了Runnable接口
        FutureTask<String> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            // 只有这个任务完成后才能够取出值,否则将会一直处于阻塞状态
            // 可以使用futureTask.isDone()进行判断是否完成
            System.out.println("返回值为:" + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

FutureTask原理:

  • 在不影响主线程的进行的情况下,再开一个线程,并使这个线程做一些事,在需要时可以在另一个线程取出需要的值
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock(true);


    public static void main(String[] args) {
        Callable<String> callable = () -> {
            System.out.println("123");
            return "good";
        };
        // 作为中间类,这个类实现了Runnable接口
        FutureTask<String> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            // 只有这个任务完成后才能够取出值,否则将会一直处于阻塞状态
            // 可以使用futureTask.isDone()进行判断是否完成
            while (!futureTask.isDone()) {
                System.out.println("等待中");
            }
            System.out.println("返回值为:" + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

输出结果:

等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
等待中
123
等待中
等待中
等待中
等待中
等待中
等待中
等待中
返回值为:good

只会执行一次,第二次如果再获取结果的时候将会直接返回结果,不会再执行

JUC辅助类

有3个:

  • 减少计数CountDownLatch
  • 循环栅栏
  • 信号灯

减少计数 CountDownLatch

实例化

CountDownLatch count = new CountDownLatch(10);

可以调用countDown()方法将值减1,调用getCount()方法获取值

如果值大于0,调用await()方法会阻塞,直到值为0之后,才调用这个之后的语句

System.out.println("count.getCount() = " + count.getCount());
count.await();
// 只有值为0时才能继续往下调用
System.out.println(111);
System.out.println(111);
System.out.println(111);
System.out.println(111);
System.out.println(111);

没有await()方法时:

public static void main(String[] args) {
    CountDownLatch count = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            count.countDown();
            System.out.println(Thread.currentThread().getName() +":" + count.getCount());
        }, "线程" + i).start();
    }
    System.out.println("主线程:" + count.getCount());
}

此时主线程输出的值不固定

有了await()之后,主线程输出的值一直是0

public static void main(String[] args) {
    CountDownLatch count = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            count.countDown();
            System.out.println(Thread.currentThread().getName() +":" + count.getCount());
        }, "线程" + i).start();
    }
    try {
        count.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("主线程:" + count.getCount());
}

循环栅栏 CuclicBarrier

构造方法:

CyclicBarrier cyclicBarrier = new CyclicBarrier(数量: int, Runnable);

第二个参数可以看作是回调函数

在需要的地方调用await()方法,每调用一次,数量-1,这个方法之后的语句将会在值变为0之后进行,runnable也是会在值变为0之后执行

import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock(true);

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("回调到");
        });
        for (int i = 0; i < 7; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "await之前");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "await之后");
            }, "线程" + i).start();
        }
    }
}

结果:

线程2await之前
线程4await之前
线程5await之前
线程1await之前
线程0await之前
线程3await之前
线程6await之前
回调到
线程4await之后
线程2await之后
线程0await之后
线程6await之后
线程5await之后
线程3await之后
线程1await之后

信号灯 Semaphore

构造方法

Semaphore semaphore = new Semaphore(许可的数量:int);

获取一个许可acquire()方法

  • 如果许可还有剩余的,那么此时授予许可,如果没有,将这个线程处于阻塞状态,直到获取到许可

释放一个许可release()方法

  • 将许可释放掉,此时其他线程可以抢占许可
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static Lock lock = new ReentrantLock(true);

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(10);
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到了许可");
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + "释放了许可");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程" + i).start();
        }
    }
}

输出结果:

线程5抢到了许可
线程2抢到了许可
线程1抢到了许可
线程5释放了许可
线程2释放了许可
线程0抢到了许可
线程4抢到了许可
线程1释放了许可
线程3抢到了许可
线程0释放了许可
线程4释放了许可
线程3释放了许可

读写锁

悲观锁:

  • 当如果遇到同步问题时,会不断的上锁、解锁
  • 缺点是这时候变成单线程了,不支持并发操作,效率低

乐观锁:

  • 支持并发,类似于事务
  • 每次操作时有一个版本号,如果版本号和数据库中相同,就不提交

表锁:

  • 对数据库操作一张表时,需要对这整张表上锁

行锁:

  • 对数据库操作一张表时,操作哪一条记录,就对哪一条记录进行加锁
  • 可能会发生死锁现象,例如第一行需要第二行,第二行需要第一行

读写锁都可能发生死锁现象

读锁:

  • 共享锁
  • image-20220312215000508

写锁:

  • 独占锁
  • image-20220312215132296

读写锁:一个资源可以被多个读的线程进行访问,或者被一个写的线程访问,但不能同时存在读写的线程,即读写互斥、读读共享

synchronizedReentrantLock是独占锁

  • 读读只能一个线程
  • 写写只能一个线程
  • 读写只能一个线程

读写锁缺点:

  • 造成锁的饥饿,即可能会出现一直读或者一直写的操作,例如上例就造成了这个问题

锁的降级:

  • 写的锁可以降级为读锁的操作
  • 读锁不能升级为写锁

例子:下例为模拟的没有锁的情况下对Map进行读写操作:

package com.moyok.ajax.baidu;

import com.moyok.ajax.baidu.com.baidu.translate.demo.TransApi;

import java.util.HashMap;
import java.util.Map;


public class Main {

    public static void main(String[] args) {
        ReadWrite readWrite = new ReadWrite();
        for (int i = 0; i < 5; i++) {
            final String key = "key" + i;
            final String val = "val" + i;
            new Thread(() -> {
                readWrite.put(key, val);
            }, "线程" + i).start();
        }

        for (int i = 0; i < 5; i++) {
            final String key = "key" + i;
            new Thread(() -> {
                readWrite.get(key);
            }, "线程" + i).start();
        }
    }

}

class ReadWrite {
    Map<Object, Object> map = new HashMap<>();

    public void put(Object key, Object value) {
        System.out.println(Thread.currentThread().getName() + "-正在准备写操作");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "-已完成写操作");
    }

    public Object get(String key) {
        System.out.println(Thread.currentThread().getName() + "-正在准备读操作");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Object result = map.get(key);
        System.out.println(Thread.currentThread().getName() + "读到的内容为:" + result);
        return result;
    }
}

会造成还没有写进去就已经都出来了:

线程0-正在准备写操作
线程4-正在准备写操作
线程2-正在准备写操作
线程1-正在准备写操作
线程3-正在准备写操作
线程0-正在准备读操作
线程1-正在准备读操作
线程2-正在准备读操作
线程3-正在准备读操作
线程4-正在准备读操作
线程2-已完成写操作
线程0-已完成写操作
线程1-已完成写操作
线程4-已完成写操作
线程3-已完成写操作
线程1读到的内容为:null
线程3读到的内容为:null
线程4读到的内容为:val4
线程2读到的内容为:null
线程0读到的内容为:null

可以使用读写锁,写的时候使用写锁,读的时候使用读锁

读写锁可以使用private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

readWriteLock.readLock()可以获取一个读锁,可以通过lock()unLock()加锁解锁

readWriteLock.writeLock()可以获取一个写锁,也可以通过lock()unLock()加锁解锁

上例使用读写锁后就变正常了

package com.moyok.ajax.baidu;

import com.moyok.ajax.baidu.com.baidu.translate.demo.TransApi;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class Main {

    public static void main(String[] args) {
        ReadWrite readWrite = new ReadWrite();
        for (int i = 0; i < 5; i++) {
            final String key = "key" + i;
            final String val = "val" + i;
            new Thread(() -> {
                readWrite.put(key, val);
            }, "线程" + i).start();
        }

        for (int i = 0; i < 5; i++) {
            final String key = "key" + i;
            new Thread(() -> {
                readWrite.get(key);
            }, "线程" + i).start();
        }
    }

}

class ReadWrite {
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    Map<Object, Object> map = new HashMap<>();

    public void put(Object key, Object value) {
        readWriteLock.writeLock().lock();
        System.out.println(Thread.currentThread().getName() + "-正在准备写操作");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "-已完成写操作");
        readWriteLock.writeLock().unlock();
    }

    public Object get(String key) {
        readWriteLock.readLock().lock();
        System.out.println(Thread.currentThread().getName() + "-正在准备读操作");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Object result = map.get(key);
        System.out.println(Thread.currentThread().getName() + "读到的内容为:" + result);
        readWriteLock.readLock().unlock();
        return result;
    }
}

输出结果:

线程0-正在准备写操作
线程0-已完成写操作
线程1-正在准备写操作
线程1-已完成写操作
线程2-正在准备写操作
线程2-已完成写操作
线程4-正在准备写操作
线程4-已完成写操作
线程3-正在准备写操作
线程3-已完成写操作
线程0-正在准备读操作
线程1-正在准备读操作
线程2-正在准备读操作
线程4-正在准备读操作
线程3-正在准备读操作
线程3读到的内容为:val3
线程0读到的内容为:val0
线程4读到的内容为:val4
线程2读到的内容为:val2
线程1读到的内容为:val1

因为读锁是共享的,所以可以看到11-15行都在读

阻塞队列

是一个共享队列,可以被多个线程入队和出队,队列是有大小限制的

  • 如果队列满了,在队列中添加元素会被阻塞,直到可以插入元素
  • 如果队空,在队列中获取元素会被阻塞,直到队列不为空

应用情况:在某些条件下阻塞一些线程,在某些条件下唤醒线程,无需手动操作,都是由阻塞队列完成的

BlockingQueue是一个接口,有多个实现类,例如数组、链表、优先队列等实现的阻塞队列

实现类

ArrayBlockingQueue:由数组实现的阻塞队列,包含定长数组

LinkedBlockingQueue:由链表实现的队列,默认大小为Integer.MAX_VALUE

DelayQueue:delay中文为延迟,只有指定的延迟时间到了才能够在队列中获取元素,大小没有限制,即插入数据的操作不会阻塞,获取数据的操作会阻塞,使用优先队列实现的

PriorityBlockingQueue:基于优先级的阻塞队列

image-20220313095824797
public static void main(String[] args) {
    // 创建时指定长度
    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println("blockingQueue.add(\"a\") = " + blockingQueue.add("a"));
    System.out.println("blockingQueue.add(\"b\") = " + blockingQueue.add("b"));
    System.out.println("blockingQueue.add(\"c\") = " + blockingQueue.add("c"));
    // 因为长度是3,此时添加这个元素会抛出异常
    System.out.println("blockingQueue.add(\"d\") = " + blockingQueue.add("d"));
}
blockingQueue.add("a") = true
blockingQueue.add("b") = true
blockingQueue.add("c") = true
Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
	at com.moyok.ajax.baidu.Main.main(Main.java:20)
// 用来取出元素
System.out.println("blockingQueue.remove() = " + blockingQueue.remove());
// 也用来插入元素,参数去1为元素,参数2为超时时间,参数3为时间单位,也可以不设置时间,如果放不进去,结果为false
System.out.println("blockingQueue.offer(\"s\") = " + blockingQueue.offer("s", 4000L, TimeUnit.MILLISECONDS));
// 用来取元素,参数1为超时时间,参数2为时间单位,也可以不设置时间,如果没有元素,结果为null
System.out.println("blockingQueue.poll() = " + blockingQueue.poll(3000L, TimeUnit.MILLISECONDS));
// 向队列中插入元素,如果无法插入,将会一直阻塞,直到能够插入为止
blockingQueue.put("aaa");
// 在队列中取出元素,如果队列一直为空,将会一直阻塞,直到能够取出为止
System.out.println("blockingQueue.take() = " + blockingQueue.take());

线程池 ThreadPool

线程过多会造成调度的开销,进而影响韩村的局部性和整体性能,线程池用来维护多个线程,将任务放入队列,在线程创建后启动这些任务,如果线程超出最大数量就要排队等候,直到其他线程执行完成,再从队列取出任务执行

固定数量的线程池:

  • 获取一定数量的线程池:

    • ExecutorService executorService = Executors.newFixedThreadPool(5);
      
  • 线程池一定要手动关闭:

    • executorService.shutdown();
      
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 10; i++) {
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
    }
    executorService.shutdown();
}

每个线程用完会被释放,释放之后可以被后续的使用

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 10; i++) {
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5555);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executorService.shutdown();
}

上述代码会出现前5个线程一起使用,时间到了之后,后几个循环再使用

一个线程池一个线程:

  • 特点是这个线程池只有一个线程,仍然需要手动关闭

  • ExecutorService executorService = Executors.newSingleThreadExecutor();
    
  • public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int ii = i;
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + ", i = " + ii);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
    

可以扩容的线程池:

  • ExecutorService executorService = Executors.newCachedThreadPool();
    
  • 如果线程池里有空闲的线程就直接使用,如果没有就会自动扩容

  •     public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i = 0; i < 100; i++) {
                final int ii = i;
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + ", i = " + ii);
                });
            }
            executorService.shutdown();
        }
    

底层都是使用ThreadPoolExecutor实现的,这个类的构造方法参数有:

int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
  • 参数1,常驻线程数量
  • 参数2,最大的线程数量
  • 参数3,空闲线程的存活时间,即线程空闲时间达到限制后将自动关闭
  • 参数4,为时间单位
  • 参数5,为阻塞队列,达到常驻线程数量的上限时会阻塞,然后开启最大线程
  • 参数6,线程工厂
  • 参数7,拒绝策略

分支合并策略 Fork/Join

将一个大任务拆分成多个子任务并行处理,最后将子任务合并成最后的计算结果并输出

  • Fork,将复杂任务拆分
  • Join,将拆分的任务结果合并

Recursive中文为递归

例如斐波那契数列可以使用RecursiveTask

class MyRecursiveTask extends RecursiveTask<Integer> {

    public MyRecursiveTask(int n) {
        this.n = n;
    }
    final int n;
    /**
     * The main computation performed by this task.
     *
     * @return the result of the computation
     */
    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }
        MyRecursiveTask f1 = new MyRecursiveTask(n - 1);
        MyRecursiveTask f2 = new MyRecursiveTask(n - 2);
        // 拆分任务
        f1.fork();
        // 合并任务
        return f2.compute() + f1.join();
    }
}
    public static void main(String[] args) {
        MyRecursiveTask myRecursiveTask = new MyRecursiveTask(6);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> result = forkJoinPool.submit(myRecursiveTask);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        forkJoinPool.shutdown();
    }

异步回调

CompletableFuture类用于异步回调

// 用于没有返回值的异步回调
CompletableFuture.runAsync(Runnable);
// 有返回值的异步回调
CompletableFuture.supplyAsync(xxx)

无参数的:

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "无参数");
});
try {
    completableFuture.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

有返回值的:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "有参数");
    return 512;
});
try {
    completableFuture.whenComplete((t, u) -> {
        System.out.println("t(方法返回值) = " + t);
        System.out.println("u(异常的信息) = " + u);
    }).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
  • 如果没有异常,参数1为返回的值,参数2为null
  • 如果有异常,参数1为null,参数2为异常的信息

Q.E.D.


念念不忘,必有回响。