发布时间:2024-05-06 08:01
并发可以理解为多线程同时工作,一般情况下是要比单线程处理速度更快,但是并发也不是在任何情况下都更优,
使用多线程并发技术编写的代码在运行时可能会
这些问题都会导致并发过程处理速度变慢,jdk的众多并发工具类和框架就是为了去尽量避免这些情况而诞生的
CPU Cache与缓存行 - 钟齐峰 - 博客园 (cnblogs.com)
见下面博文中解决可见性部分
(5条消息) 【JUC】volatile的原理与应用_hu_xiang_1995的博客-CSDN博客
(5条消息) 【JUC】synchronized原理_hu_xiang_1995的博客-CSDN博客
什么是内存模型?
内存模型定义了共享内存系统中多线程程序读写操作的行为规范,用来保证共享内存的操作的正确性(包括可见性,原子性,有序性)
为什么要有内存模型?
Java内存模型是什么,为什么要有Java内存模型,Java内存模型解决了什么问题? - 知乎 (zhihu.com)
CPU从单核到多核,CPU与主存间存在多级缓存导致的缓存一致性(可见性)问题;
各种编译器对指令重排序导致的顺序一致性问题;
并发编程模型
为了使处理器内部的运算单元能够被充分利用,处理器可能会对输入代码进行乱序执行处理;
只要不影响程序单线程、顺序执行的结果,就可以对两个指令重排序;
这里重排序分为编译器级别的重排序和处理器级别的重排序
编译器级别的重排序
Java在执行时会先翻译成字节码指令,字节码指令种 a = 100; 和 a = a + 10都可以简单理解为
如果按照上面的顺序执行,会发现Load a 和 Store a各重复执行了一次,如果改成如下的顺序对结果不会有影响,但指令条数变少
这种顺序上的改动明显使得指令条数变少,加快了执行效率,这种改变顺序的行为就是编译器默认会做的,这种行为就是编译器级别的指令重排序
处理器级别的重排序
没有重排序的一条指令执行的过程大致如下:
- 指令获取
- 如果输入的运算对象是可以获取的(比如已经存在于寄存器中),这条指令会被发送到合适的功能单元。如果一个或者更多的运算对象在当前的时钟周期中是不可获取的(通常需要从主内存获取),处理器会开始等待直到它们是可以获取的
- 指令在合适的功能单元中被执行
- 功能单元将运算结果写回寄存器
进行重排序优化后:
- 指令获取
- 指令被发送到一个指令序列(也称
执行缓冲区
或者保留站
)中- 指令将在序列中等待,直到它的数据运算对象是可以获取的。然后,指令被允许在先进入的、旧的指令之前离开序列缓冲区。(此处表现为乱序)
- 指令被分配给一个合适的功能单元并由之执行
- 结果被放到一个序列中
- 仅当所有在该指令之前的指令都将他们的结果写入寄存器后,这条指令的结果才会被写入寄存器中(重整乱序结果)
具体的示例同编译器重排序,这里就不做示例
重排序的规则在JSR-133模型中用as-if-serial来阐述,即无论怎么重排序,单线程程序执行结果不可变
见(6条消息) 【JUC】volatile的原理与应用_hu_xiang_1995的博客-CSDN博客
编译器级别:volatile
处理器级别:内存屏障
也可以这么说,解决重排序问题的CPU级别的实现就是内存屏障,编译器级别的实现就是为volatile修饰的变量定制了一系列规则,见《Java并发编程艺术》P42
为了实现as-if-serial语义,编译器和处理器不会对存在数据依赖关系的操作做重排序
数据依赖:两个操作访问同一个变量,其中有一个为写操作,这两个操作就称之为存在数据依赖性
JMM规定线程修改共享变量是在本地内存中进行修改,那就存在一个问题,线程A修改了共享变量X后,线程B访问X时,获取到的X值是不是最新的值?
在高并发的情况下如果没有其它保障手段,其实是无法保证线程B获取到的X值是最新值。
因为实际执行代码时上文所述正常步骤中3有可能发生在步骤4之后。
如果线程A无论何时修改了变量X,线程B在其修改后获取X的值都是A修改之后的值,那么就称线程A对X变量的操作对线程B可见
可见性描述的是不同线程之间的并发问题
在JSR-133模型中用happens-before来阐述,即:如果A操作执行结果需要对B操作可见,那么A一定要在B之前执行
原子操作:不会被中断的一个或一系列操作
Java实现原子操作是基于处理器原子操作基础之上
为什么使用多线程技术
线程的基本操作
连接池
public class MyConnectionPool {
private static final sun.misc.Unsafe U;
int size;
static {
U = sun.misc.Unsafe.getUnsafe();
}
private volatile LinkedList<MyConnection> pool;
public MyConnectionPool(int initialSize) {
this.size = initialSize;
}
// 等待超时模式获取连接
public MyConnection fetchConnection(long mills) throws InterruptedException {
// 延迟加载
if(pool == null) {
synchronized (MyConnectionPool.class) {
if(pool == null) {
pool = new LinkedList<MyConnection>();
for(int i = 0; i < size; i++) {
pool.add(new MyConnection());
}
}
}
}
// 获取连接
synchronized (pool) {
if(mills <= 0) {
while(pool.isEmpty()) {
pool.wait();
}
return pool.removeFirst();
} else {
long future = mills + System.currentTimeMillis();
long remaining = mills;
while(remaining > 0 && pool.isEmpty()) { // 无可用连接同时remain还有剩余
pool.wait(mills); // 释放锁(预防死锁,破坏占用并请求条件)
remaining = future - System.currentTimeMillis();
}
// 离开上面while --- pool有可用连接 or remain到期
MyConnection result = null;
if(!pool.isEmpty()) { // remaining = 0退出的while循环时 pool == null
result = pool.getLast();
}
return result;
}
}
}
// 释放连接
public void releaseConnection(MyConnection connection) {
synchronized (pool) {
if(pool != null) {
pool.addLast(connection);
pool.notifyAll(); // 此时并不释放锁
}
}
}
}
class MyConnection {
}
线程池
/**
*
* @param : 由服务器端定义的执行任务的类
*/
public class MyThreadPoolImpl<Job extends Runnable> implements MyThreadPool<Job> {
private static final int MAX_WORKER_NUMS = 10;
private static final int MAX_JOB_NUMS = 20;
private static final int MIN_WORKER_NUMS = 1;
private static final long DEFAULT_WAITING_TIME = 0;
private final LinkedList<Job> jobs = new LinkedList<Job>();
private final List<Worker> workers = new ArrayList<Worker>();
// 线程池对客户端的接口(生产者生产任务)
public void execute(Job job) {
if(job != null) {
synchronized (jobs) {
jobs.addLast(job); // 假设任务队列任务个数无限制
jobs.notify();
}
}
}
// 生产者 等待-超时模型
public void execute(Job job, long mills) throws InterruptedException {
synchronized (jobs) {
if(mills < 0) {
while(jobs.size() >= MAX_JOB_NUMS) {
jobs.wait();
}
} else {
long future = mills + System.currentTimeMillis();
long remaining = mills;
while(jobs.size() >= MAX_JOB_NUMS) {
jobs.wait(remaining);
remaining = future - System.currentTimeMillis();
}
jobs.addLast(job);
workers.notify(); // 唤醒消费者
}
}
}
// 添加线程池中工作者线程
public void addWorkers(int num) {
synchronized (workers) {
int newNum = num + workers.size();
while(workers.size() < newNum && workers.size() < MAX_WORKER_NUMS) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker);
thread.start(); // 直接启动(消费者一直消费)
}
}
}
// 删除线程池中的工作线程
public void removeWorkers(int num) {
synchronized (workers) {
num = Math.min(workers.size(), num);
for(int i = 0; i < num; i++) {
Worker worker = workers.remove(i);
worker.shutDown(); // work.run()方法结束线程会进入dead状态
}
}
}
class Worker implements Runnable {
private volatile boolean running = true;
public void run() {
while(running) {
Job job = null;
synchronized (jobs) {
while(jobs.isEmpty()) {
try {
jobs.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
job = jobs.removeLast();
}
if(job != null) {
job.run();
}
}
}
public void shutDown() {
this.running = false;
}
}
}
Unsafe获取(反射获取)
public static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null); // 返回这个字段的值(即一个Unsafe对象地址,或者说一个Unsafe对象)
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
常规的获取方式不可以,例如如下的代码是错误的
private static final Unsafe unsafe = Unsafe.getUnsafe();
因为
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
获取Unsafe时会看加载器是不是System ClassLoader,而用户编写的程序使用的加载器都不是,所以用户程序无法通过getUnsafe()方法获取,只可以通过反射获取
Unsafe使用
package juc;
import sun.misc.Unsafe;
import sun.nio.cs.US_ASCII;
import java.lang.reflect.Field;
public class TestUnSafe {
private static final long stateOffset;
private static final Unsafe unsafe;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
stateOffset = unsafe.objectFieldOffset(TestUnSafe.class.getDeclaredField("state"));
} catch (Exception e) {
throw new Error(e);
}
}
private volatile int state = 0;
public static void main(String[] args) {
TestUnSafe test = new TestUnSafe ();
System.out.println(stateOffset); // 12
}
}
原子类将数值放入类成员变量value中,内部通过调用Unsafe类的方法对value进行原子处理
以AtomicLong为例
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
// setup to use Unsafe.compareAndSwapLong for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset; // value值在AtomicLong对象中的偏移量
...
private volatile long value;
public AtomicLong(long initialValue) {
value = initialValue;
}
...
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}
...
}
以AtomicLong 的getAndSet()方法为例:
最终通过调用Unsafe类的getAndSetLong(Object var1, long var2, long var4)方法实现原子操作
// java.util.concurrent.atomic.AtomicLong#getAndSet
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}
// sun.misc.Unsafe#getAndSetLong
public final long getAndSetLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2); // 获取volatile语义下的值
} while(!this.compareAndSwapLong(var1, var2, var6, var4));
return var6; // 返回更改前的值
}
为什么有LongAdder类?
AtomicLong类可以保证原子性,但是其保障原子性的手段是多线程冲突时一直进行CAS自旋,这种方式会使得自旋状态的线程占用CPU不释放资源,这会降低并发性能,为了改善这种情况就有了LongAdder类;
LongAdder会将线程竞争同一个共享变量的情况分散,设置一个数组,让多个线程竞争时找到自己对应的位置,对该位置的value进行操作,最后将所有数组中元素累加作为所有线程修改后的值
LongAdder重要属性和方法
LongAdder继承自Striped64类,主要的属性在Striped64类中
abstract class Striped64 extends Number {
// Cell内部类可以看作是 AtomicLong的一种优化
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
// 每个线程操纵一个cells中的元素,最终值为base + sum of cells
transient volatile Cell[] cells;
transient volatile long base;
transient volatile int cellsBusy; // 锁变量,保证cells在扩容或新建时只有一个线程在操作
public class LongAdder extends Striped64 implements Serializable {
...
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
整个累加过程大致示意:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TpbHf1s7-1645181212525)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220128174556670.png)]
源码解读
读懂add( )方法即可
public void add(long x) {
// b: base值
// v: cells[i]中期望值
// m: cells长度 - 1 用来计算hash后下标
Cell[] as; long b, v; int m; Cell a;
// 条件一: true -> cells数组已经初始化,当前线程应该去找寻对应的Cell操作,跳过之后的条件判断,直接执行if内语句
// false -> cells数组未初始化,线程去CAS修改base值(即执行条件二判断)
// 条件二:true -> cas替换失败,发生竞争,需要扩容或者重新尝试
// false -> cas替换base成功,结束if语句
if ((as = cells) != null || !casBase(b = base, b + x)) {
// if内部是针对多线程的情况下,进入说明条件一,二至少一个为true,则说明
// 1. 已经发生过竞争(cells != null)
// 2. CAS替换base失败,此时存在竞争
// 那么此时就需要对cells数组新建或者对某个元素进行操作
boolean uncontended = true; // 指示是否发生竞争
// 本行条件:是否是第一次发生冲突(写base冲突)(需要new cells了)
if (as == null || (m = as.length - 1) < 0 ||
// 本行条件:本次线程对应的cells[i]是否为空
(a = as[getProbe() & m]) == null ||
// 本行条件:是否是cas写cells[i]冲突(需要扩容 或者 重新尝试)
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended); // 扩容和新建在此完成
}
}
longAccumulate方法:
主要针对三种情况处理:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) { // wasUncontended只有在cells已创建并且同一个位置处发生多线程冲突时才会为false
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe(); // 类似HashMap中hash值
wasUncontended = true;
}
// collide == false 则一定不会扩容,collide == true则可能扩容
boolean collide = false; // True if last slot nonempty
for (;;) { // 自旋
Cell[] as; Cell a; int n; long v;
// 1.if内实现cells已经存在,则对cells[i]进行操作,i为当前线程对应的下标
// 该分支针对CAS(cells[i])失败 or cells[i] == null的情况
if ((as = cells) != null && (n = as.length) > 0) {
// cells[i] == null ?
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { c // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false; // cellsBusy == 1才会到这里
}
// cells[i]存在元素
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// CAS(cells[i])失败最多进行两次自旋,第二次(rehash之后)一定会进到此分支
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 修改成功
break;
// 扩容相关
// 能到这里一定是:CAS(cells[i])失败第二次的情况,此时因为发生同位置冲突尝试扩容以减少Hash碰撞
else if (n >= NCPU || cells != as) // NCPU表示CPU核心数,cells[]的长度无论怎么扩容不会超过这个值
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 扩容
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // length * 2
for (int i = 0; i < n; ++i)
rs[i] = as[i]; // 直接搬运数据
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // rehash
}
// 2. else-if分支实现初始化cells并赋值
// 该分支是针对CAS(base + x)失败的情况下,会直接新建cells
// cellsBusy指示是否有其它线程在使用, cellsBusy == 0表示没有其它线程在使用(比如在初始化)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // casCellsBusy()将cellsBusy置1
boolean init = false; // cells初始化成功标记
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; // 初始为2
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 到这里说明,需要创建cells但是锁在其它线程(cellsBusy == 1),此时当前线程直接修改base, 修改失败则继续自旋
// 之所以这样是因为如果线程A无法初始化cells,说明其它线程在初始化,此时不让线程A自旋浪费CPU资源,而是直接CAS(base)
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
JUC中只有CopyOnWriteArrayList唯一一个List容器,其基本的原理是用写时复制的策略;
写的时候会copy一份原数组的拷贝,修改的时候对写的代码段上锁,而不是对容器内部的元素数组上锁;
写的同时其它线程可以读取原数组,而写操作的线程对拷贝数组修改,这种修改对读操作线程不可见,写完后其它线程才可以看到修改内容,形成了弱一致性的线程安全容器
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock(); // 保证并发安全的可重入的锁
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array; // 元素数组
final Object[] getArray() {
return array;
}
/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
}
主要方法add(E e):
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 上锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 原数组拷贝副本
newElements[len] = e; // 新元素插入到新数组中
setArray(newElements); // 设置array指向新数组
return true;
} finally {
lock.unlock();
}
}
AbstractQueuedSynchronizer(AQS)队列同步器,是用来构建其它同步组件的基础框架,同步组件也是构建各种Lock工具(例如:ReentrantLock)的必备组件。
各种Lock类面向的是锁的使用者,AQS面向的是锁的实现者,屏蔽了底层同步状态变量的管理,线程状态的管理等细节操作(这些细节操作通过LockSupport和CAS去实现),简化Lock类的实现。
AQS基本源码解读见(4条消息) 【JUC】AQS 队列同步器_hu_xiang_1995的博客-CSDN博客
这里做下总结:
AQS仅仅实现了公平的同步器(也是公平锁的实现基础),包括共享式和非共享式的公平同步器,各种同步器的实现关键点:
公平锁/非公平锁
公平锁:同步队列中只有 node.prev == Head的的node可以执行tryAcquire(int arg)去获取锁
非公平锁:一个线程释放锁之后,队列中第一个结点线程和此时外来的尝试获取锁的线程共同竞争
共享式锁/非共享式锁
关键在于state共享状态量的初始值的设定
非共享锁:获取锁的过程中,state变量初始值为1,每当线程获取锁时 -1,为0不以获取锁
共享锁:获取锁的过程中,state变量初始值为> 1,每当线程获取锁时 -1,<=0不以获取锁
可重入锁
上面各种同步器相组合就形成了JUC中众多Lock类
Lock相对于synchronized优秀的地方,不仅仅在于可以组合出多种锁,还基于AQS的内部类ConditionObject提供了Condition的实现(类似管程中条件等待队列)
基本的使用如下:
先获取锁再获取Condition变量然后调用await()阻塞线程
@Test
public void test() throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
final Condition condition = reentrantLock.newCondition();
System.out.println("线程工作ing....");
condition.await(); // 线程阻塞
reentrantLock.unlock(); // 解锁
}
比较重要的方法是await()和signal()
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 加入条件队列中
int savedState = fullyRelease(node);// 释放锁(能够调用await()的线程一定持有锁)
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
一个比较好的,通过组合各种同步组件非公平的方式实现的(排他 & 共享)锁的示例就是JUC中的ReentrantReadWriteLock读写锁;
其锁内部同时维护一个写锁和读锁,写锁是可重入的排他锁,读锁是可重入的共享锁;
ReentrantReadWriteLock中为了同时维护读写状态变量,使用同一个int型变量维护读写状态,高16位表示读状态,低16位表示写状态
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock; // 读锁
private final ReentrantReadWriteLock.WriteLock writerLock; // 写锁
final Sync sync; // 同步组件(根据new 时传入参数决定是公平还是非公平同步组件)
// 自定义同步组件,完成基本的可重入 和读写锁的逻辑,是否是公平留给其它自定义同步组件完成
abstract static class Sync extends AbstractQueuedSynchronizer {
...
...
...
}
// 支持实现非公平锁的同步组件
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
// 判断获取锁失败后是否需要自我阻塞
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
// 读锁内部类
public static class ReadLock implements Lock, java.io.Serializable {
...
}
// 写锁内部类
public static class WriteLock implements Lock, java.io.Serializable {
...
}
}
写锁的获取
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
public void lock() {
sync.acquire(1); // 会调用AQS中模板方法
}
// AQS模板方法直接使用
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire(arg)尝试获取锁,成功则结束,失败进入下面语句
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 加入阻塞队列
selfInterrupt();
}
//acquireQueued(addWaiter(Node.EXCLUSIVE), arg))和AQS实现一样,只是其中的tryAcquire(arg)是
// ReentrantReadWriteLock.Sync中自定义的
// 重写的tryAcquire()方法
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 获取高16位的值,代表读锁状态变量
// 1.有锁
if (c != 0) {
// w == 0表示没有读锁(那一定是写锁),||后面判断当前需要获取写锁的线程是不是重入,不是则获取锁失败(可重入锁,写锁排他性在这里体现)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 到这里一定是没有读锁,有写锁,且是重入
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 2.无锁
if (writerShouldBlock() || // writerShouldBlock()是决定公平/非公平的关键,非公平组件直接返回false,直接抢占式执行下面获取锁的代码
!compareAndSetState(c, c + acquires)) // 无锁则尝试获取,成功下一步,不成功返回false
return false;
setExclusiveOwnerThread(current); // 设置拥有写锁的线程
return true;
}
如果获取写锁失败执行AQS中 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,将线程加入阻塞队列尾部,并自旋
写锁的释放
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {
sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
// AQS中模板方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒后续结点
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease
// 自定义tryRelease方法
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 更新状态
boolean free = exclusiveCount(nextc) == 0; // 写状态为0
if (free)
setExclusiveOwnerThread(null); // 释放锁
setState(nextc);
return free;
}
读锁的获取
读锁是共享锁,使用AQS中acquireShared的逻辑;
和普通共享锁不同的是,其它线程持有写锁时,当前线程不可以获取读锁;
当前线程持有写锁时,可以同时获取读锁;
其它线程持有写锁时,写锁和正常共享锁一样可以获取读锁
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
public void lock() {
sync.acquireShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
// 自定义获取读锁(共享锁)的方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 有写锁,且持有写锁线程不是当前线程,获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c); // 获取读状态
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // CAS保证每次竞争只有1个线程会获取读锁(但是可以有多个线程持有锁,本次没有获得则下次获得)
if (r == 0) { // 读锁未被获取,当前线程是一个获取锁的
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 到此说明有写锁或者读锁本次获取失败,自旋获取读锁
return fullTryAcquireShared(current);
}
CountDownLatch可以解决使用join()不方便的情况,比如这样一个场景:多线程去读取某个文件夹下的图片,读取完成主线程提示解析完成,如果使用join():
public class Solution {
@Test
public void test() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
public void run() {
// 读取图片
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
// 读取图片
}
});
...
// 开启多个读取图片的线程
t1.start()
...
// 调用多个线程的join()
t1.join();
t2.join();
...
// 返回解析完成
}
}
使用方式非常不灵活,主线程代码冗余不优雅,如下的代码则更为优雅,主线程仅需一行代码countDownLatch.await()即可完成等待其它线程结束再进行的过程
package juc;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
CountDownLatch countDownLatch = new CountDownLatch(2);
@Test
public void test() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
public void run() {
// 读取图片
});
Thread t2 = new Thread(new Runnable() {
public void run() {
// 读取图片
});
t1.start();
t2.start();
countDownLatch.await();
// 解析完成
}
}
CountDownLatch可以理解为共享锁,原理参照AQS中共享状态的获取与释放,用AQS中状态变量来记录已经结束的线程个数,调用countDown会使得计数器减一(底层调用了AQS的releaseShared方法),当计数器减少至0则调用await()的线程开始继续运行(await()方法底层调用AQS的acquireShared方法)
CountDownLatch源码:
// java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // AQS中方法,加入阻塞队列
}
// CountDownLatch重写了模板中的tryAcquireShared()方法
// java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// java.util.concurrent.CountDownLatch#countDown
public void countDown() {
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// java.util.concurrent.CountDownLatch.Sync#tryReleaseShared
// CountDownLatch重写了模板中的tryReleaseShared()方法
// CAS+自旋去减少状态变量的值,类似AQS共享同步状态的释放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
是JUC中实现线程同步的工具类之一
CountDownLatch在join()基础上进行了优化,但是仍旧有一个问题,其对计数器的设置是一次性的(构造函数中),无法重置,计数器归0之后再继续调用await()和countDown()方法会直接返回,无法实现线程同步,为了满足重置计数的要求诞生了CyclicBarrier;
基本工作方式是线程调用 await 方法后就会被阻塞,这个阻塞点就称为 屏障点,等所有线程都调用了 await 方法后,线程就会冲破屏障,继续下运行。
使用示例:
public class Solution {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2); // 构造器参数:多少个线程到达屏障点后一起继续运行
@Test
public void test() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入屏障点");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "离开屏障点");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "进入屏障点");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "离开屏障点");
} catch (Exception e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
结果:
Thread-0进入屏障点
Thread-1进入屏障点
Thread-1离开屏障点
Thread-0离开屏障点
可以看到Thread-0先调用await()停止在屏障点处,thread-1随后也到达屏障点,此时有两个线程到达屏障点,达到设定的数值,两个线程同时启动,如果使用之前CountDownLatch的例子,伪代码如下:
public class Solution {
@Test
public void test() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
public void run() {
// 读取图片
cyclicBarrier.await();
}
});
Thread t2 = new Thread(new Runnable() {
public void run() {
// 读取图片
cyclicBarrier.await();
}
});
// 返回解析完成
}
}
主线程不需要要任何控制同步的代码,所有子线程会在读取图片之后进入屏障点,设定好数值之后,等到所有线程都达到屏障点(读取完图片),所有线程一起返回