1. 多线程基础
1.1. 创建线程的方式
1.1.1. 继承Thread
public class ThreadA extends Thread {
@Override
public void run() {
}
public static void main(String[] args){
new ThreadA().start();
}
}
1.1.2. 实现Runnable接口, 创建Thread对象
public class ThreadB implements Runnable {
@Override
public void run() {
}
public static void main(String[] args){
new Thread(new ThreadB()).start();
}
}
1.1.3. 实现Callable接口, 创建FutureTask对象
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class ThreadC implements Callable<String> {
@Override
public String call() {
return "Callable";
}
public static void main(String[] args) throws Exception {
FutureTask<String> task = new FutureTask<>(new ThreadC());
task.run();
task.get();
}
}
1.2. 关闭线程的方式
1.2.1. 设置关闭标志位
维护一个成员变量标识当前线程是否处于运行状态, 如果当前状态为停止, 则结束任务.
public class ThreadStopDemo1 extends Thread {
private volatile boolean stopped = false;
@Override
public void run() {
while (!stopped){
// working here. (1)
}
}
public void stopWork() {
this.stopped = true;
}
}
1 | 如果内部一直busy或阻塞住, 那么就会无法响应外部的停止信号. |
1.2.2. 中断线程
public class ThreadStopDemo2 extends Thread {
@Override
public void run() {
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
// stop working. (1)
}
}
public static void main(String[] args) {
Thread thread = new ThreadStopDemo2();
thread.start();
thread.interrupt(); (2)
}
}
1 | 通过catch住异常来中断当前的TIME_WAITING状态. |
2 | 唤醒指定线程. |
只有方法签名里会抛出InterruptedException的方法才会抛出异常:
|
2. 线程方法
2.1. run
调用Runnable对象的run方法.
如果使用继承Thread的方式来创建线程对象, 则会重写run方法.
2.2. start
start方法会运行当前创建出来的线程, 线程状态从 NEW
变为 RUNNABLE
.
start方法不能重复调用.
2.3. sleep
当前线程从 RUNNABLE
变为 TIME_WAITING
状态.
如果sleep过程中调用方调用了这个线程的 interrupt()
方法, 则 sleep
方法会抛出 InterruptedException
.
2.4. yield
当前线程从 RUNNING
变为 RUNNABLE
状态, 具体由操作系统实现, Thread.State
中均为 RUNNABLE
枚举值.
2.6. interrupt
打断 sleep/wait/join
的线程, 设置 interrupted
标志位为true.
2.7. interrupted
返回当前线程的打断标志, 然后重置为false.
|
3. volatile
-
保障可见性: volatile修饰的变量值被修改后, 其他线程工作内存里缓存的该变量的值会标记为过期, 需要从主存重新取值.
-
保障有序性: volatile变量读写前后会加入内存屏障, 保障读写指令不会乱序执行.
-
修饰double/long时保证写入/读取的原子性, 但不保证代码块对字段操作的原子性.
3.1. 读写屏障来实现volatile语义
// StoreStoreFence
写操作
// StoreLoadFence
// LoadLoadFence
读操作
// LoadStoreFence
4. synchronized
4.1. 使用方式
-
修饰成员方法
-
修饰静态方法
-
代码块指定修饰对象
4.2. 实现原理
synchronized基于Monitor实现. 在代码块前后和异常表 to
之后分别插入 monitorenter/monitorexit
指令.
Monitor锁对象组成:
* Owner
持有该Monitor锁的对象.
* EntryList
保存竞争该Monitor锁的 Blocked
状态的对象.
* WaitSet
保存竞争该Monitor锁的 Waiting
状态的对象.
-
线程1竞争锁时发现Owner为空, 则设置Owner为线程1.
-
线程2竞争锁, 发现Owner不为空, 则进入EntryList等待唤醒.
-
线程1释放锁, Owner置为空, 唤醒EntryList里的一个线程, 设置为Owner.
4.3. 锁升级的过程
4.3.1. MarkWord
无锁: \$ubrace("unused")_(25位) ubrace("hash")_(31位) ubrace("unused")_(1位) ubrace("age")_(4位) ubrace("bias_lock")_(1位)^0 ubrace("lock")_(2位)^01\$
偏向锁: \$ubrace("thread")_(54位) ubrace("epoch")_(2位) ubrace("unused")_(1位) ubrace("age")_(4位) ubrace("bias_lock")_(1位)^1 ubrace("lock")_(2位)^01\$
轻量级锁: \$ubrace("ptr_lock_record")_(62位) ubrace("lock")_(2位)^00\$
重量级锁: \$ubrace("ptr_monitor")_(62位) ubrace("lock")_(2位)^10\$
无锁 → 偏向锁 → 轻量级锁 → 重量级锁
4.3.2. 轻量级锁
-
每次竞争锁时, 线程栈帧中都会生成一个新的
LockRecord
对象(LockRecord地址 00
)和指向持有该线程锁的对象引用地址, -
第一次有对象竞争这个线程的锁时, 把
LockRecord
地址和对象的MarkWord(hash age bias 01
)cas互换, 后面两位设置为00
, 将栈帧中的对象引用指向这个对象. -
如果cas失败:
-
锁竞争对象MarkWord中LockRecord地址指向当前线程, 则表示该次竞争属于锁重入, cas会设置自己的LockRecord地址为null, 将栈帧中的对象引用指向这个对象.
-
已经有其他线程持有了本线程的锁, 则进入 锁膨胀 的过程.
-
-
每次释放锁时cas检测是否栈顶的LockRecord对象记录的值是否为null:
-
如果为null, 表示有重入, 将该LockRecord对象出栈.
-
如果不为null, 用cas更新对象的MarkWord为LockRecord中的hash.
-
更新成功, 表示解锁成功, 出栈.
-
更新失败, 表示该LockRecord指向对象持有的是重量级锁.
-
-
4.3.3. 重量级锁
-
线程cas更新对象MarkWord里的hash值为自己的LockRecord地址值失败, 则表示已经有其他线程持有了这个对象的轻量级锁, 此时进入锁膨胀的过程.
-
申请一个Monitor对象
-
将Monitor的Owner地址指向此时MarkWord里的LockRecord地址.
-
将对象的MarkWord设置为Monitor对象的地址,
-
将当前线程放入Monitor的EntryList里.
-
-
将Monitor的Owner置为null.
-
从EntryList里唤醒一个线程让其持有这个Monitor锁, 设置为Owner.
4.3.4. 自旋锁
重量级锁加锁失败时, 会自旋尝试多次, 尝试失败后才会把自己加到EntryList里.
4.3.5. 偏向锁
对象初始化时, MarkWord最后三位设置为 \$101\$, 第一次竞争锁时线程id存储在MarkWord的前54位里, 下一次该线程竞争锁时可以直接进入代码同步块.
-
-XX:BiasedLockingStartupDelay=0
设置偏向锁不延迟打开.
-
JVM关闭偏向锁的功能.
-XX:-UseBiasedLocking
-
有其他线程竞争锁.
-
调用wait/notify, 因为此时需要依赖Monitor对象的WaitSet.
5. LockSupport
每个线程都关联一个Parker对象, 由 _counter, _cond, _mutex
三部分组成.
-
_counter
为0时, 进入阻塞状态. -
_counter
为1时, 不进入阻塞状态, 继续运行. -
重置
_counter
为0.
-
如果线程处于阻塞状态, 就唤醒线程继续运行.
-
如果线程处于运行状态, 则设置
_counter
为1, 线程继续运行.
6. CAS
6.1. 实现原理
基于CPU指令 cmpxchg
比较并交换, 如果提供的值与获取到的值相等则赋值成功, 否则赋值失败.
public final class Unsafe {
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset); (1)
} while (!weakCompareAndSetInt(o, offset, v, v + delta)); (2)
return v;
}
}
1 | 获取变量最新的值. |
2 | CAS更新为最新的值. |
6.2. ABA问题
如果另外一个线程把值从A改为B再改为A, 那么比较的时候会认为该值没有被修改过, 这种情况称之为ABA问题.
-
AtomicStampedReference: 用一个int变量标识当前值的版本号, 每次cas还需要提供新旧的版本号.
-
AtomicMarkableReference: 用boolean变量作为当前值的标志, 每次cas需要提供新旧的标志.
6.3. LongAddr
LongAddr内部维护一个base变量加多个单元, 并发高的情况下可以将CAS并发竞争的操作分摊到各个单元里.
最后取值的时候, 再对base+这些单元求和.
LongAddr求和的时候没有对单元加锁, 所以取值操作只满足最终一致性.
public class LongAddr extends Striped64 implements Serializable {
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) { (1)
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x))) (2)
longAccumulate(x, null, uncontended); (3)
}
}
}
1 | 尝试cas修改base的值. |
2 | 尝试cas修改当前线程所属单元的变量值. |
3 | 操作cell数组, 设置值. |
abstract class Striped64 extends Number {
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) { (3)
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x))) (4)
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1); (5)
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); (6)
}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) { (1)
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x))) (2)
break done;
}
}
}
1 | cells默认为null, cellsBusy默认为0, 所以LongAddr里如果cas修改base失败, 就会走到这里初始化cells数组, 并将x加入到cell中. |
2 | 有其他线程在初始化cells, 所以当前线程无法在cells里面赋值, 只能尝试cas修改一次base. |
3 | 当前线程所属的cell为空, 初始化一个cell放到cells数组里. |
4 | 当前线程所属的cell不为空, 尝试cas修改该cell里的值. |
5 | 尝试扩容cells. |
6 | 修改当前线程存储的随机数, 使其归属到别的cell中, 避免竞争. |
7. JUC锁
7.1. ReentrantLock
-
ReentrantLock和synchronized都支持可重入.
-
ReentrantLock和synchronized都属于阻塞式同步.
-
synchronized使用C++实现的, ReentrantLock是JDK类库实现的.
-
ReentrantLock可中断, synchronized需要手动判断
interrupted
标志位. -
ReentrantLock尝试加锁时可以设置超时时间.
-
ReentrantLock可以设置为公平锁(默认非公平锁).
-
ReentrantLock支持多个条件变量(Condition).
ReentrantLock
加锁释放锁都是通过内部继承了 AbstractQueuedSynchronizer
的 Sync
类来实现的, Sync又有公平锁和非公平锁区分, 默认为非公平锁.
public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.tryLock();
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
}
7.1.1. lock方法实现
abstract static class Sync extends AbstractQueuedSynchronizer {
final void lock() {
if (!initialTryLock()) // 首先尝试先获得锁, 如果获取失败则与其他线程竞争来获取锁
acquire(1);
}
}
static final class NonfairSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // 非公平锁首先尝试修改state, 竞争锁
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) { // 如果是锁重入则直接增加state, lock成功
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
}
static final class FairSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) { // 公平锁只有当等待队列为空时才去尝试修改state竞争锁
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) { // 如果是锁重入则直接增加state, lock成功
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
}
public abstract class AbstractQueuedSynchronizer {
public final void acquire(int arg) {
if (!tryAcquire(arg)) // tryAcquire与initialTryLock类似, 会尝试去获取锁
acquire(null, arg, false, false, false, 0L); // 竞争锁
}
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg); // 尝试去竞争锁, 如果竞争成功则返回true
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node); // 如果当前竞争锁成功, 并且当前节点是头节点, 那么通知调用LockSupport.unpark唤醒当前节点的next节点
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode(); // 初始化Node
} else if (pred == null) { // 尝试将当前节点插入到等待队列末端, 并将当前tail的next指向成当前节点
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead(); // 如果head为null, 那么初始化head和tail为一个哑元节点
else if (!casTail(t, node)) // 将当前节点设置成tail
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this); // 当前节点已经进入队列, 调用LockSupport.park暂停当前线程
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
}
7.1.2. unlock方法实现
public abstract class AbstractQueuedSynchronizer {
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter); (1)
}
}
}
1 | 唤醒队列下一个节点. |
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null); (1)
setState(c); (2)
return free;
}
}
1 | 清空持有当前锁的线程记录. |
2 | 修改state值. |
总的来说, AQS实现了四个功能: 加锁、释放锁、等待、唤醒.
7.2. Condition
public class ConditionObject implements Condition, java.io.Serializable {
private int enableWait(ConditionNode node) {
// 使用condition通信前必须加锁, 所以这里正常会返回true.
if (isHeldExclusively()) {
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);
// 将当前线程加入到队列中, 并更新尾结点.
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
int savedState = getState();
// await释放锁
if (release(savedState))
return savedState;
}
node.status = CANCELLED;
throw new IllegalMonitorStateException();
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// ConditionNode是一个双向链表
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this);
boolean interrupted = false, cancelled = false;
// 判断是否在AQS的等待队列里(有其他线程notify时会将该node加入到等待队列里)
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else if ((node.status & COND) != 0) {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait();
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
// 阻塞结束, 重新竞争锁.
acquire(node, savedState, false, false, false, 0L);
// 如果是内部中断导致的唤醒, 则继续抛出中断异常.
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, true);
}
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
// 队列为空, 清空尾结点
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
// 将当前第一个node加入到等待队列里, 从而唤醒一个await线程.
enqueue(first);
// 如果不是signalAll, 则只需要唤醒第一个node
if (!all)
break;
}
first = next;
}
}
}
7.3. ReentrantReadWriteLock
ReentrantReadWriteLock
实现了读写/写写互斥, 但是读读不互斥.
ReentrantReadWriteLock
内部将state高16位维护持有共享锁线程的重入次数, 低16位维护持有排他锁线程的重入次数, 这是为了CAS无法同时对两个变量操作.
读写锁的加锁解锁分别依赖sync的方法来实现的, sync又分为公平锁和非公平锁, 所以一共有4种加锁解锁实现.
public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// state右移16位得到高16位的值.
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 将高16位置0, 得到低16位的值.
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
// 竞争排他锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // c不等于0, 代表当前有线程正在持有锁.
// w为0代表有写线程持有锁, 如果持有锁的线程不是自己, 直接返回false.
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 当前线程之前已经持有排他锁, 此时为重入锁, 增加state即可.
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
// 尝试竞争
!compareAndSetState(c, c + acquires))
return false;
// CAS修改state成功, 设置自己为ExclusiveOwnerThread.
setExclusiveOwnerThread(current);
return true;
}
// 释放排他锁
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 排他锁持有锁时, 不会有其他线程修改state, 且因为state是低16位保存了排他锁重入次数, 所以这里直接减state即可.
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 重入锁释放锁结束.
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// 竞争共享锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 如果当前存在排他锁且不是当前线程持有的排他锁则竞争失败.
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 共享锁重入次数保存在state高16位, 每次需要增加1<<16.
// 统计每个线程竞争共享锁次数.
if (r == 0) {
// 当前是竞争共享锁的第一个线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 不停重试尝试CAS修改state.
return fullTryAcquireShared(current);
}
// 释放共享锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // 因为会存在多个线程持有共享锁, 所以这里需要通过CAS修改state.
int c = getState();
int nextc = c - SHARED_UNIT;
// state减去1<<16. 如果state剩0, 则可以唤醒等待队列下一个线程.
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
// 非公平锁会尝试竞争一次.
return false;
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
// 公平锁判断如果等待队列中有其他线程, 则放弃竞争.
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
7.4. StampedLock
StampedLock
实现了写写互斥, 但是读读/读写不互斥.
7.4.1. StampedLock例子
package me.jy.lang.thread.juc;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.IntStream;
/**
* @author jy
*/
public class StampedLockDemo {
private final StampedLock lock = new StampedLock();
private double x;
private double y;
public static void main(String[] args) {
StampedLockDemo stampedLockDemo = new StampedLockDemo();
IntStream.rangeClosed(1, 100)
.parallel()
.forEach(i -> stampedLockDemo.move(i, i + 1));
// 7212
System.out.println(stampedLockDemo.computeDistance());
}
public void move(double deltaX, double deltaY) {
// 获取写锁
long stamp = lock.writeLock();
x += deltaX;
y += deltaY;
// 释放写锁
lock.unlockWrite(stamp);
}
public double computeDistance() {
// 尝试获取读锁(乐观锁)
long stamp = lock.tryOptimisticRead();
// 操作数据
double currentX = x;
double currentY = y;
// 校验是否有线程获取过写锁
if (!lock.validate(stamp)) {
// 重新获取读锁(悲观锁)
stamp = lock.readLock();
currentX = x;
currentY = y;
// 释放锁
lock.unlockRead(stamp);
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
7.4.2. 实现
StampedLock
的state变量维护了锁的版本号, 低7位标识了读锁状态, 第8位标识了写锁状态, 因为写锁标志只有1位, 所以写锁不能重入.
-
state
: 默认为 \$1"<<"8\$ . -
WBIT
: 值为 \$1"<<"7\$ , 第8位为写锁标志. -
RBITS
: 值为 \$1"<<"7-1\$ , 低7位为读锁标志, RBITS减1即为读锁的数量. -
ABITS
: 值为 \$"WBIT"|"RBITS"\$ (即8个1). -
SBITS
: 值为 \$~"RBITS"\$
public class StampedLock implements java.io.Serializable {
public long writeLock() {
// 重置低8位, 只有当低8位都为0的时候可以直接获得锁.
long s = U.getLongOpaque(this, STATE) & ~ABITS, nextState;
// 设置第8位为1.
if (casState(s, nextState = s | WBIT)) {
U.storeStoreFence();
return nextState;
}
return acquireWrite(false, false, 0L);
}
private long acquireWrite(boolean interruptible, boolean timed, long time) {
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
WriterNode node = null;
Node pred = null;
for (long s, nextState;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if ((first || pred == null) && ((s = state) & ABITS) == 0L &&
casState(s, nextState = s | WBIT)) {
U.storeStoreFence();
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (interrupted)
Thread.currentThread().interrupt();
}
return nextState;
} else if (node == null) { // retry before enqueuing
node = new WriterNode();
} else if (pred == null) { // try to enqueue
Node t = tail;
node.setPrevRelaxed(t);
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) { // reduce unfairness
--spins;
Thread.onSpinWait();
} else if (node.status == 0) { // enable signal
if (node.waiter == null)
node.waiter = Thread.currentThread();
node.status = WAITING;
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted);
}
private long cancelAcquire(Node node, boolean interrupted) {
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
cleanQueue();
if (node instanceof ReaderNode)
signalCowaiters((ReaderNode)node);
}
return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
}
private static void signalCowaiters(ReaderNode node) {
if (node != null) {
// 只要有一个ReadLock被唤醒, 就会唤醒所有的ReadLock.
for (ReaderNode c; (c = node.cowaiters) != null; ) {
if (node.casCowaiters(c, c.cowaiters))
LockSupport.unpark(c.waiter);
}
}
}
private long releaseWrite(long s) {
long nextState = state = unlockWriteState(s);
// 唤醒等待队列第一个节点.
signalNext(head);
return nextState;
}
private static long unlockWriteState(long s) {
// 将state加上1<<7, 这样第8位就置0了.
return ((s += WBIT) == 0L) ? ORIGIN : s;
}
// 获取乐观锁
public long tryOptimisticRead() {
long s;
// state & WBIT 不为0时, 方法直接返回0, validate(0) 会返回false.
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
}
8. JUC工具类
8.1. Semaphore
8.1.1. 使用场景
Semaphore
提供了资源的并发访问限制, 超过了并发量的线程将会阻塞一直到持有锁的线程释放锁.
内部也是自己继承了AQS来实现竞争/释放锁的功能, 有公平锁和非公平锁之分.
8.1.2. Semaphore例子
package me.jy.lang.thread.juc;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @author jy
*/
@Slf4j
public class SemaphoreDemo {
private static final Semaphore SEMAPHORE = new Semaphore(5);
public static void main(String[] args) {
IntStream
.rangeClosed(1, 100)
.parallel()
.forEach(i -> {
try {
SEMAPHORE.acquire();
log.info("I am in.");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
} finally {
SEMAPHORE.release();
}
});
}
}
8.1.3. 实现
public abstract class AbstractQueuedSynchronizer {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
}
public class Semaphore implements java.io.Serializable {
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// CAS修改state, remaining小于0时代表剩余的state不够用了, 线程会去竞争锁.
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
// 因为有多个线程持有该锁, 所以释放锁时需要不停CAS修改state.
if (compareAndSetState(current, next))
return true;
}
}
}
}
8.2. CountDownLatch
8.2.1. 使用场景
-
某个线程需要等待其他工作线程执行完毕, 再去执行下面的代码.
8.2.2. CountDownLatch例子
package me.jy.lang.thread.juc;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @author jy
*/
@Slf4j
public class CountDownLatchDemo {
private static final int WORKER_COUNT = 10;
private static final ExecutorService executorService = Executors.newFixedThreadPool(4);
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(WORKER_COUNT);
Runnable work = () -> {
try {
log.info("working");
TimeUnit.SECONDS.sleep(1);
latch.countDown();
} catch (InterruptedException ignored) {
}
};
IntStream.rangeClosed(1, WORKER_COUNT).forEach(i -> executorService.execute(work));
latch.await();
log.info("workers done!");
executorService.shutdownNow();
}
}
8.2.3. 实现
public class CountDownLatch {
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
// 一直等到state为0时才能竞争到锁.
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// 不停地CAS将state减1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
// state减到0, 可以唤醒等待线程.
return nextc == 0;
}
}
}
}
8.3. CyclicBarrier
8.3.1. 使用场景
-
一批线程互相等待统一就绪后, 再一起执行下面的代码.
8.3.2. CyclicBarrier例子
package me.jy.lang.thread.juc;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;
/**
* @author jy
*/
@Slf4j
public class CyclicBarrierDemo {
private static final int THREAD_COUNT = 5;
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(THREAD_COUNT);
private static final List<Player> PLAYERS = new CopyOnWriteArrayList<>();
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT, () -> {
log.info("====== Result ======");
PLAYERS.stream()
.sorted(Comparator.comparingLong(a -> a.finishedAt))
.forEach(System.out::println);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("====== End ======");
THREAD_POOL.shutdownNow();
});
IntStream.rangeClosed(1, THREAD_COUNT)
.forEach(i -> THREAD_POOL.execute(() -> {
try {
log.info(" is running!");
TimeUnit.SECONDS.sleep(i);
PLAYERS.add(new Player().setName(Thread.currentThread().getName()).setFinishedAt(Instant.now().getEpochSecond()));
cyclicBarrier.await();
log.info(" over!"); // 执行完barrierAction后才释放锁.
} catch (Exception ignored) {
}
}));
}
@Data
private static class Player {
private String name;
private long finishedAt;
}
}
8.3.3. 实现
public class CyclicBarrier {
private static class Generation {
Generation() {}
// 标识当前线程是否被打断.
boolean broken;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 捕获外部中断信号
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// 减到0, 则表示最后一个线程到达执行点
if (index == 0) { // tripped
Runnable command = barrierCommand;
if (command != null) {
try {
// 最后一个线程执行回调方法
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
nextGeneration();
return 0;
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 当前线程阻塞, 等待最后一个线程唤醒
// await会释放锁, 让其他线程也在这里阻塞
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// 被唤醒后, generation被重新初始化为新的对象, 此处不相等, await方法就会结束.
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 唤醒其他阻塞线程
trip.signalAll();
// 重置count计数, 下轮可以继续使用
count = parties;
// 重置标志位
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
}
8.4. Exchanger
8.4.1. 使用场景
-
两个线程之间交换数据.
8.4.2. Exchanger例子
package me.jy.lang.thread.juc;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
/**
* @author jy
*/
@Slf4j
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
// 阻塞等待线程B发送数据
String data = exchanger.exchange("A");
log.info(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "A").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
// 与线程A交换数据
String data = exchanger.exchange("B");
log.info(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "B").start();
}
}
8.4.3. 实现
public class Exchanger<V> {
private volatile Node[] arena;
private volatile Node slot;
private final Participant participant;
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
public Exchanger() {
participant = new Participant();
}
@jdk.internal.vm.annotation.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
public V exchange(V x) throws InterruptedException {
Object v;
Node[] a;
Object item = (x == null) ? NULL_ITEM : x;
// 先用slot交换数据, 如果slot被占用了, 就会用arena交换数据.
if (((a = arena) != null ||
(v = slotExchange(item, false, 0L)) == null) &&
(Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
// slot不为null表示有别的线程在等待交换数据
if ((q = slot) != null) {
// 重置SLOT, 拿到对方线程想要交换的数据
if (SLOT.compareAndSet(this, q, null)) {
Object v = q.item;
// 将自己的数据设置到对方线程的match属性上.
q.match = item;
Thread w = q.parked;
if (w != null)
// 唤醒该线程
LockSupport.unpark(w);
return v;
}
// 有其他线程修改了SLOT, CAS失败, 创建arena数组.
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
BOUND.compareAndSet(this, 0, SEQ)) // bound设置为256
arena = new Node[(FULL + 2) << ASHIFT]; // (CPU数/2+2)*32
}
// arena不为空则转为使用arena数组交换数据
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
p.item = item;
// 将自己线程数据设置到item属性里, 然后尝试放入到SLOT中.
if (SLOT.compareAndSet(this, null, p))
break;
p.item = null;
}
}
// 当前线程数据放入到slot中, 此处自旋, 直到对方线程取出.
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
// 自旋1024次
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// 如果p.match为null, 说明还没有别的线程过来交换数据
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
p.parked = t;
// 如果slot还没有被拿走, 阻塞自己.
if (slot == p) {
if (ns == 0L)
LockSupport.park(this);
else
LockSupport.parkNanos(this, ns);
}
p.parked = null;
}
// ns小于0或者被中断, 取消交换直接返回.
else if (SLOT.compareAndSet(this, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
// 清空双方交换的数据
MATCH.setRelease(p, null);
p.item = null;
// 保留随机数, 供下次使用
p.hash = h;
return v;
}
}
8.5. Phaser
8.5.1. 使用场景
-
可以当CountDownLatch或CyclicBarrier用.
-
与CyclicBarrier不同的是, Phaser可以在使用时动态修改需要同步的线程个数.
-
支持Phaser嵌套, 可以设置Phaser的父Phaser.
8.5.2. Phaser例子
package me.jy.lang.thread.juc;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @author jy
*/
@Slf4j
public class PhaserDemo {
private static final int PARTIES = 10;
private static final Phaser PHASER = new Phaser(PARTIES);
public static void main(String[] args) {
PhaserDemo demo = new PhaserDemo();
IntStream.rangeClosed(1, PARTIES)
.parallel()
.forEach(i -> new Thread(demo::countDownLatch).start());
// 模拟CountDownLatch
// 等待parties减到0
PHASER.awaitAdvance(0);
log.info("awaitAdvance passed");
// 模拟CyclicBarrier
IntStream.rangeClosed(1, PARTIES)
.parallel()
.forEach(i -> new Thread(demo::cyclicBarrier).start());
}
@SneakyThrows
public void countDownLatch() {
TimeUnit.SECONDS.sleep(200000);
PHASER.arrive();
log.info("arrived");
}
@SneakyThrows
public void cyclicBarrier() {
log.info("phase0");
TimeUnit.SECONDS.sleep(2);
// 等待其他线程执行完毕
PHASER.arriveAndAwaitAdvance();
log.info("phase1");
TimeUnit.SECONDS.sleep(3);
// 等待其他线程执行完毕
PHASER.arriveAndAwaitAdvance();
log.info("phase2");
}
}
8.5.3. 实现
public class Phaser {
// 第1位标识是否同步完成, 第2~32为phase数, 第33~48位为总线程数, 第49~64位为未到达线程数.
private volatile long state;
private final Phaser parent;
private final Phaser root;
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
// 设置2~32位, 33~48位, 49~64位 为parties
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
// 右移32位得到state里的phase数
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
private int internalAwaitAdvance(int phase, QNode node) {
// 唤醒阻塞在前面phase阶段的线程
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
// 如果被打断, 或者自旋结束, 则创建一个QNode
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
else
// 首先空自旋
Thread.onSpinWait();
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
// 加入到队列头部, phase为奇数时加入到evenQ, phase为偶数时加入到oddQ.
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
// 将当前node阻塞.
ForkJoinPool.managedBlock(node);
} catch (InterruptedException cantHappen) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
// 唤醒阻塞在当前phase阶段的线程
releaseWaiters(phase);
return p;
}
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
public int arrive() {
// arrive后将未到达线程数减1.
return doArrive(1);
}
public int arriveAndDeregister() {
// arrive后将未到达线程数和总线程数都减1
return doArrive(65537);
}
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
// 将state减去adjust
if (STATE.compareAndSet(this, s, s-=adjust)) {
// 减到最后一个线程
if (unarrived == 1) {
long n = s & PARTIES_MASK;
// 将总线程数赋值到未到达的线程数, 从而实现Phaser复用
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
// 先将phase+1, 再修改当前state
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
STATE.compareAndSet(this, s, n);
// 释放当前phase的线程
releaseWaiters(phase);
}
// 当前Phaser总线程数为0, 说明当前线程已经全部取消注册, 所以通知父级Phaser减65537(反注册一个总线程同时将未到达线程数减一)
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
STATE.compareAndSet(this, s, s | EMPTY);
}
else
// 通知父级Phaser减1.
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
}
8.7. ThreadLocal
ThreadLocal
在某个线程里存取数据, 使用裴波那契散列法+开放地址法存储, 使用裴波那契数列是为了尽量减少哈希碰撞, 让数据分布更加均匀.
每个 Thread
对象里有一个 ThreadLocalMap
成员变量, 用来存放 ThreadLocal
和对应的Value, ThreadLocal
对象使用 WeakReference
包装, 是为了防止回收 ThreadLocal
对象时, 因为Thread对象没有被回收且有强引用关联, 导致 ThreadLocal
对象回收不了.
public class ThreadLocal<T> {
private final int threadLocalHashCode = nextHashCode();
// 全局的hashCode
private static AtomicInteger nextHashCode = new AtomicInteger();
// 2^32*0.618 => int数据范围的黄金分割数
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
// 从0开始, 每次递增黄金分割数
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
// 为当前的Thread初始化一个ThreadLocalMap, 将当前的ThreadLocal对象和值存在当前的线程对象里.
createMap(t, value);
}
}
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
// 根据ThreadLocal的hashCode找到槽位上的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 如果找不到Entry, 则获取初始Value, 并放入到哈希表中
return setInitialValue();
}
// 每次ThreadLocal设置value后一定要手动remove掉, 因为线程池会复用线程, 下一次方法调用的时候会get到上一次ThreadLocal中存放的值.
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}
}
static class ThreadLocalMap {
// 哈希表初始化长度为16
private static final int INITIAL_CAPACITY = 16;
private Entry[] table;
private int size = 0;
private int threshold;
// 哈希表里每个元素都是弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化哈希表
table = new Entry[INITIAL_CAPACITY];
// 计算ThreadLocal对象的槽位
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 槽位
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 槽位放的就是该ThreadLocal对象
if (k == key) {
e.value = value;
return;
}
// e不为null, 但是get出来是null, 代表当前元素已被回收
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// tab[i] 为空, 则直接插入Entry到数组里.
tab[i] = new Entry(key, value);
int sz = ++size;
// 清理部分过期元素
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 如果哈希表中元素数量大于阈值就执行rehash
rehash();
}
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
// 如果遇到过期Entry, 将n再次赋值为len, 以再次扫描log(n)次
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0); // 控制扫描次数
return removed;
}
private void rehash() {
expungeStaleEntries();
// 清理了过期元素后, 元素数量还超过阈值的3/4, 则扩容一倍
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 清除当前槽位上的Entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
Entry e;
int i;
// 清除当前槽位的同时, 遍历下面不为空的槽位
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 清除过期Entry
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 重新计算槽位
int h = k.threadLocalHashCode & (len - 1);
// Entry所属的槽位和当前槽位不同, 移动该Entry到h或者h后面第一个空的槽位
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 每次扩容两倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (Entry e : oldTab) {
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
// 重新计算槽位
int h = k.threadLocalHashCode & (newLen - 1);
// 如果当前槽位有元素了就瞬移一位.
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
private Entry getEntry(ThreadLocal<?> key) {
// 计算槽位
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 命中entry
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 向后遍历, 寻找Entry
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
// 清除过期key
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
// 开放地址法
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
// 从i向后找一直找到key
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// 清除当前元素
e.clear();
expungeStaleEntry(i);
return;
}
}
}
}
总之, ThreadLocalMap
在get/set/remove都会尝试清除哈希表中的过期元素.
9. 并发容器
9.1. BlockingQueue
9.1.1. api
方法 | 抛出异常 | 返回特殊值 | 阻塞 | 超时时间控制 |
---|---|---|---|---|
插入 |
boolean add(e) 队列满了抛出 |
boolean offer(e) 队列满了返回false, 否则插入成功返回true. |
void put(e) 队列满了会一直阻塞, 一直到可以插入. |
boolean offer(e, time, unit) 队列满了会等待指定时间直到能插入. |
删除 |
E remove() 队列如果为空抛出 |
E poll() 队列如果为空返回null, 否则直接删除队列头部元素. |
E take() 队列为空会一直阻塞, 一直到队列存在元素. |
poll(time, unit) 队列为空会等待指定时间直到能删除. |
查看头部元素 |
E element() 队列如果为空抛出 |
E peek() 队列如果为空返回null, 否则返回队列头部元素. |
- |
- |
9.1.2. ArrayBlockingQueue
ArrayBlockingQueue
是一个用数组实现的环形队列, 初始化时会指定数组的长度.
插入和删除使用 ReentrantLock
实现加锁和释放锁了来实现线程安全.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public boolean add(E e) {
if (offer(e))
return true;
else
// 队列满了抛出异常
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
final Object[] items = this.items;
// 将元素插入到items数组里
items[putIndex] = e;
// putIndex即为队列尾部索引
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果数组满了则阻塞住
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E remove() {
E x = poll();
if (x != null)
return x;
// 如果数组满了则抛出异常
else
throw new NoSuchElementException();
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
// 删除数组元素
items[takeIndex] = null;
// takeIndex即为队列头部索引
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒阻塞在notFull的线程
notFull.signal();
return e;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 数组为空时阻塞
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public E element() {
E x = peek();
if (x != null)
return x;
// 数组为空时抛出异常
else
throw new NoSuchElementException();
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 直接返回takeIndex所在元素(队列头部)
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
}
9.1.3. LinkedBlockingQueue
LinkedBlockingQueue
是基于单向链表实现的阻塞队列, 默认最大容量为 Integer.MAX_VALUE
.
内部分别为put和take持有锁, 所以put和put互斥, take和take互斥, put和take不互斥.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
// 单向链表节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 判断队列是否满了
if (count.get() == capacity)
return false;
// 加入到队尾
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列满了阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
// LinkedBlockingQueue的put也会唤醒其他的put操作
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
final E x;
final int c;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 队列为空返回null
if (count.get() == 0)
return null;
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// LinkedBlockingQueue的poll也会唤醒其他的poll操作
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
// 唤醒阻塞在notFull的线程
signalNotFull();
return x;
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
// head为标识队列头的哑元节点, 此处将head.next变为head, 并将item清除
E x = first.item;
first.item = null;
return x;
}
}
9.1.4. PriorityBlockingQueue
PriorityBlockingQueue
会将元素排好序存放.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock = new ReentrantLock();
// PriorityBlockingQueue因为会无限增长, 所以只会有notEmpty的等待条件
private final Condition notEmpty = lock.newCondition();
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
}
// 不存在插入不了的情况, 所以add/put都直接调用的offer
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] es;
// 如果queue数组满了, 则扩容
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
try {
// 扩容两倍或直接+2
int growth = oldCap < 64 ? oldCap + 2 : oldCap >> 1;
int newCap = ArraysSupport.newLength(oldCap, 1, growth);
if (queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 如果队列没有元素, 则阻塞
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
// 二叉堆删除元素
private E dequeue() {
// assert lock.isHeldByCurrentThread();
final Object[] es;
final E result;
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
return result;
}
}
9.1.5. DelayQueue
DelayQueue
是一个按照延迟时间从小到大出队的优先队列.
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private Thread leader;
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final transient ReentrantLock lock = new ReentrantLock();
// queue非空的条件变量
private final Condition available = lock.newCondition();
// 与PriorityBlokingQueue一样, 没有容量的限制, 所以add/put都直接调用的offer
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 只有当堆顶的元素为新插入的元素才需要唤醒阻塞在take的线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 当第一个元素的超时时间大于0的时候, 直接返回null, 否则返回第一个元素
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 取出延迟时间最小的元素
E first = q.peek();
// 如果为空则阻塞
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
// 如果有其他线程在等待元素, 则阻塞住.
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待队列头部元素的delay时间
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 取出队列头部的元素后, 唤醒其他阻塞在take的线程
available.signal();
lock.unlock();
}
}
}
9.1.6. SynchronousQueue
SynchronousQueue
本身不存储元素, 插入元素后会阻塞, 一直到有其他线程取出元素.
存在非公平和公平两种模式(默认非公平), 公平下第一次take的元素为第一个put进去的元素, 非公平下第一次take的元素为最后一个put进去的元素.
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 默认为非公平
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 插入元素时第一个参数传这个元素
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public E poll() {
// 取出元素时第一个参数传null
return transferer.transfer(null, true, 0);
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
}
9.2. BlockingDeque
BlockingDeque
继承了 BlockingQueue
, 增加了对队列头部和尾部元素操作的api.
BlockingDeque
的实现类只有 LinkedBlockingDeque
.
9.2.1. LinkedBlockingDeque
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {
// 双向链表节点
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
// 头节点
transient Node<E> first;
// 尾节点
transient Node<E> last;
// 节点总数
private transient int count;
// 链表容量
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列满了会阻塞
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
if (count >= capacity)
return false;
Node<E> f = first;
// node的next为当前的头节点
node.next = f;
// 将头节点改为node
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
// 唤醒阻塞在notEmpty条件变量的线程
notEmpty.signal();
return true;
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 链表空时会阻塞
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
private E unlinkLast() {
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
// 改掉last.prev指针, 防止强引用无法回收last节点
l.prev = l;
// last = last.prev
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
// 唤醒阻塞在notFull条件变量的线程
notFull.signal();
return item;
}
}
9.3. CopyOnWrite
CopyOnWrite
指的是在写数据的时候将源数据拷贝出来一份作修改, 然后将改后的数据作为源数据的引用.
9.3.1. CopyOnWriteArrayList
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
// 写操作的锁对象(synchronized锁)
final transient Object lock = new Object();
private transient volatile Object[] array;
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
// 先拷贝出来一份数组
es = Arrays.copyOf(es, len + 1);
// 写入到拷贝出来的数组
es[len] = e;
// 重新赋值数组
setArray(es);
return true;
}
}
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
// 先不加锁判断数组是否包含该元素, 然后加锁再判断一次, 如果不存在则插入元素
return indexOfRange(e, snapshot, 0, snapshot.length) < 0
&& addIfAbsent(e, snapshot);
}
public E remove(int index) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
E oldValue = elementAt(es, index);
int numMoved = len - index - 1;
Object[] newElements;
if (numMoved == 0)
// 如果删除的是最后一个元素, 则直接复制数组的0~len-1位
newElements = Arrays.copyOf(es, len - 1);
else {
// 将删除位置的左右两侧复制到新的数组里
newElements = new Object[len - 1];
System.arraycopy(es, 0, newElements, 0, index);
System.arraycopy(es, index + 1, newElements, index,
numMoved);
}
setArray(newElements);
return oldValue;
}
}
}
9.3.2. CopyOnWriteArraySet
CopyOnWriteArraySet
内部还是使用的 CopyOnWriteArrayList
, 只是添加元素的时候判断不存在才添加.
public class CopyOnWriteArraySet<E> extends AbstractSet<E> implements java.io.Serializable {
private final CopyOnWriteArrayList<E> al;
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
// 判断不存在则插入元素
public boolean add(E e) {
return al.addIfAbsent(e);
}
}
9.4. ConcurrentLinkedQueue
ConcurrentLinkedQueue
通过对head和tail节点的next指针进行CAS操作实现入队和出队, 而非使用ReentrantLock悲观锁直接操作队列的head和tail节点.
个人猜测是因为CAS只能操作一个变量, 没法同时更新tail和tail.next, 所以插入和删除时优先更新tail.next指针和head.item, 然后才尝试cas修改head和tail节点.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
static final class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
ITEM.set(this, item);
}
Node() {}
void appendRelaxed(Node<E> next) {
NEXT.set(this, next);
}
boolean casItem(E cmp, E val) {
return ITEM.compareAndSet(this, cmp, val);
}
}
public boolean offer(E e) {
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// 1. tail.next=newNode
if (NEXT.compareAndSet(p, null, newNode)) {
if (p != t)
// 3. 第二步操作后, p就不等于tail, 这时候更新tail=tail.next(newNode)
TAIL.weakCompareAndSet(this, t, newNode);
return true;
}
}
// 如果p==p.next, 说明有其他线程更新了head节点, 当前遍历的属于清空数据的脏节点(详见updateHead方法), 更新下p和t变量
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
// 2. 第一次没有去更新tail, 只是设置了tail的next指针. 所以q!=null, 这时更新p=p.next
p = (p != t && t != (t = tail)) ? t : q;
}
}
public E poll() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
// 第一次删除节点时, 只是将head.item设置为null
// 第二次删除节点时, 先将p=p.next, 然后将更新后p的next清空
if ((item = p.item) != null && p.casItem(item, null)) {
// 第二次删除元素后, p=p.next, 所以此时p!=h
if (p != h)
// 如果p.next不为null, 则将head更新为p.next(存储数据的节点), 否则更新为p(哑元节点)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果p和p.next都为null, 代表整个队列都没节点, 可以直接返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 如果p==p.next, 说明有其他线程更新了head节点, 当前遍历的属于清空数据的脏节点(详见updateHead方法), 重新遍历来取到最新的head
else if (p == q)
continue restartFromHead;
}
}
}
public boolean isEmpty() {
return first() == null;
}
Node<E> first() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
// 如果p==p.next, 说明有其他线程更新了head节点, 当前遍历的属于清空数据的脏节点(详见updateHead方法), 重新遍历来取到最新的head
else if (p == q)
continue restartFromHead;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
// 更新head为p
if (h != p && HEAD.compareAndSet(this, h, p))
// 将旧head节点的next指针指向自己, 防止旧head节点因为next指针指向新的head(强引用)导致不被GC
NEXT.setRelease(h, h);
}
}
9.5. ConcurrentLinkedDeque
ConcurrentLinkedDeque
的实现与 ConcurrentLinkedQueue
类似, 区别是 ConcurrentLinkedDeque
使用双向链表存储节点.
10. 线程池
10.2. 使用
10.2.1. ThreadPoolExecutor构造函数参数
-
int corePoolSize
: 核心线程数个数 -
int maximumPoolSize
: 最大线程数个数 -
long keepAliveTime
: 核心线程之外的线程存活时间 -
TimeUnit unit
: KeepAliveTime时间单位 -
BlockingQueue<Runnable> workQueue
: 线程池所用的阻塞队列类型 -
ThreadFactory threadFactory
: 线程创建的工厂类 -
RejectedExecutionHandler handler
: 最大线程满载后的线程提交后拒绝策略
10.2.2. 线程池分类
-
FixedThreadPool: 核心线程数等于最大线程数, 使用无界队列存储多余任务, 适用于任务量已知且耗时的场景.
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
-
CachedThreadPool: 没有核心线程数, 适用于任务执行时间短且密集的场景.
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
-
SingleThreadPool: 核心线程数和最大线程数都为1, 适用于希望任务排队串行执行的场景.
new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()))
10.3. 线程池状态
状态名 | 标志 | 是否接收新任务 | 是否处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING |
111 |
√ |
√ |
|
SHUTDOWN |
000 |
× |
√ |
不会接收新的任务, 只会处理阻塞队列中剩余的任务. |
STOP |
001 |
× |
× |
中断正在执行的任务, 抛弃阻塞队列中的任务. |
TIDYING |
010 |
× |
× |
任务全部执行完毕, 活动线程为0, 即将进入终结状态. |
TERMINATED |
011 |
× |
× |
终结状态 |
10.4. 线程调度流程
-
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
-
如果当前线程池中的线程数目大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
-
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
-
如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
10.5. 线程池关闭
10.5.1. shutdown
线程池状态更新为 SHUTDOWN
, 只执行所有已提交的任务(包括阻塞队列里的任务), 不再接受新的任务.
private static void shutdown() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
SleepUtil.sleep(1000);
log.info("sleep 1s");
});
executorService.execute(() -> {
SleepUtil.sleep(2000);
log.info("sleep 2s");
});
executorService.execute(() -> {
SleepUtil.sleep(3000);
log.info("sleep 3s");
});
executorService.shutdown(); (1)
log.info("shutdown called");
executorService.execute(() -> System.out.println("no more")); (2)
(3)
}
1 | 线程池状态变为SHUTDOWN, 只会执行已经提交的任务. |
2 | reject任务. |
3 | 等待任务1/2/3执行完 |
10.5.2. shutdownNow
调用所有核心线程的interrupt()方法, 并直接返回阻塞队列里的任务.
private static void shutdownNow() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
SleepUtil.sleep(10000);
log.info("sleep 10s");
});
executorService.execute(() -> {
SleepUtil.sleep(20000);
log.info("sleep 20s");
});
executorService.execute(() -> {
SleepUtil.sleep(30000);
log.info("sleep 30s");
});
List<Runnable> runnables = executorService.shutdownNow(); (1)
log.info("shutdownNow: {}", runnables); (2)
log.info("shutdown called");
executorService.execute(() -> System.out.println("no more")); (3)
(4)
}
1 | 调用所有核心线程的interrupt()方法, 并直接返回阻塞队列里的任务. |
2 | 返回任务3. |
3 | reject任务. |
4 | 等待任务1/2执行完 |
10.6. 实现
10.6.1. 线程池的关闭
-
外部调用线程池的
shutdown
或者shutdownNow
方法. -
外部循环调用线程池的
awaitTermination
方法. -
如果调用的是
shutdown
, 线程池会打断所有的空闲线程, 否则直接打断所有的线程, 并将阻塞队列的线程对象返回出来. -
线程池状态升为
TIDYING
. -
执行线程池
terminated
模板方法. -
线程池状态升为
TERMINATED
. -
唤醒阻塞在
awaitTermination
方法的外部调用线程.
public class ThreadPoolExecutor extends AbstractExecutorService {
// 前3位标识线程池的状态, 后29位标识线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 线程池的5种状态常量
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 线程池的当前状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 打断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 能获得锁说明该线程是空闲的, 此处打断空闲线程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptWorkers() {
for (Worker w : workers)
// 不管能不能获得锁, 都直接打断该线程
w.interruptIfStarted();
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
// 将阻塞队列里的线程对象放入到taskList里
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
// 如果是DelayQueue, drainTo方法可能会没有把队列元素全放入taskList里, 此处手动删除再加入到taskList里
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 阻塞队列为空并且没有工作线程时, 设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 执行terminated回调
terminated();
} finally {
// 设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒阻塞在awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
// 等待线程池状态变为TERMINATED
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果当前线程池的状态小于TERMINATED(3), 则阻塞
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}
}
10.6.2. 线程池任务的提交
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果工作线程小于核心线程数, 则直接执行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 将任务线程添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池状态不是RUNNING, 则拒绝执行新提交的任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果阻塞队列放不下, 则启动新线程, 直到超过最大线程数
else if (!addWorker(command, false))
// 超过最大线程数, 拒绝执行
reject(command);
}
// 新开一个线程直接执行
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 大于核心线程数or最大线程数
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 工作线程计数加1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将线程对象添加到workers集合中
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 执行该工作线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
10.6.3. 线程池任务的执行
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
10.7. 线程池的4种拒绝策略
-
AbortPolicy
: 新的任务过来时, 直接抛出异常(默认). -
CallerRunsPolicy
: 让调用方线程去执行新的任务. -
DiscardPolicy
: 忽略掉新的任务. -
DiscardOldestPolicy
: 将队列最老的任务删除掉, 然后去执行新的任务.