Java并发探索–上篇
1.基本概念
- 线程与进程:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。
- 并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
- 同步与异步:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。
“Java并发探索–下篇” — 在下面找
【博客园】
https://www.cnblogs.com/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
2.探索线程的创建
①线程的状态
从Thread
源码里面看出
public enum State {
// 尚未启动的线程的线程状态。
NEW,
// 就绪
RUNNABLE,
// 等待监视器锁的线程的线程状态
BLOCKED,
/*
等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
Object.wait() 没有超时
Thread.join() 没有超时
LockSupport.park()
*/
WAITING,
/*
指定等待时间的等待线程的线程状态
线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
Thread.sleep
Object.wait with timeout
Thread.join with timeout
LockSupport.parkNanos
LockSupport.parkUntil
*/
TIMED_WAITING,
//终止线程的线程状态。线程已完成执行。
TERMINATED;
}
下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】
在Java中,一个Thread有大致六个状态。
线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start()
方法后开始运行,线程这时候处于 RUNNABLE(就绪) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。
明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。
②线程创建
1)两种基本方式
- 继承Thread类,重写run方法
public class MyThread1 extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": hello world");
}
}
public class JUCMain {
public static void main(String[] args) {
new MyThread1().start();
}
}
- 实现Runnable接口,传入Thread
public class Runnable1 implements Runnable{
@Override
public void run() {
System.out.println("hello world, Runnable");
}
}
public class JUCMain {
public static void main(String[] args) {
new Thread(new Runnable1()).start();
}
}
网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。
首先从start()
方法看起(这个方式属于Thread类的)。调用start()
后,JVM会创建一个新线程并执行该线程的run()
方法。注意:直接调用run()
不会启动新线程,而是在当前线程中执行。
// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {
// 零状态值对应于状态 “NEW”
// 线程想要start,必须是为0的状态
if (threadStatus != 0)
throw new IllegalThreadStateException();
/*
group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
同时线程组的未启动线程计数会减1。
*/
group.add(this);
boolean started = false;
try {
start0(); //关键!调用本地方法(native)
started = true;
} finally {
try {
if (!started) { //启动失败时回滚
//如果 started 为 false,说明线程启动失败,
//调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//========== native
private native void start0();
那么执行的是run()方法,run方法里面是啥呢
private Runnable target; // target是Runnable类型
@Override
public void run() {
if (target != null) {
target.run();
}
}
如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。
如果是实现的Runnable接口,new Thread(new Runnable1())
的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。
2) 其他创建方式
.lambda
- lambda表达式创建:这个仅仅是写法不同而已。因为Runnable是个函数式接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
.callable
- Callable创建的方式
public class MyCall implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "Hello Callable";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new MyCall());
new Thread(task).start();
System.out.println(task.get());
}
new Thread(Runnable runnable)
要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.
从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。
Future<V>
Future
接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent
包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future
对象获取。
// 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future<V> {
//尝试取消异步任务的执行。
/*
如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
如果任务成功取消,则返回 true。
*/
boolean cancel(boolean mayInterruptIfRunning);
//如果任务在完成之前被取消,则返回 true;否则返回 false。
boolean isCancelled();
//如果任务已经完成,则返回 true;否则返回 false。
boolean isDone();
//获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
V get() throws InterruptedException, ExecutionException;
//获取异步任务的计算结果,并且可以指定一个超时时间。
//如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
// 很简单嘛,这个是来自Runnable的
void run();
}
这个接口就相当于组合了Runnable和Future,能够获取到返回值了。
FutureTask<V>
既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。
public class FutureTask<V> implements RunnableFuture<V> {
// 基本属性
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** 结果 */
private Object outcome;
/** The thread running the callable; CAS ed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 看它的构造函数1
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 赋值callable========
this.state = NEW; // ensure visibility of callable
}
// 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
/*
Executors::callable(xx, xx)方法==========
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run(); // 调用Runnable的run()
return result;
}
}
*/
// run()方法 ---------------
// new Thread(new FutureTask<>(new MyCall()))
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//====调用callable.call()
result = c.call();
ran = true;
} catch (Throwable ex) {
.........
}
// 如果运行OK了,设置结果!
if (ran) set(result);
}
} finally {
.............
}
}
// 设置结果outcome
protected void set(V v) {
// https://www.cnblogs.com/jackjavacpp/p/18787832
// 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 这里
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 比较核心的get方法================start
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如果状态不是完成
s = awaitDone(false, 0L); // 等待完成
return report(s); // 返回结果
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 1.计算超时截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) { // 2.自旋循环等待任务完成
// 2.1如果该线程中断了
if (Thread.interrupted()) {
removeWaiter(q);// 从等待队列中移除当前节点
throw new InterruptedException();
}
// 2.2检查状态
int s = state;
// 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;// 返回最终状态
}
// 2.3若任务状态等于 COMPLETING,表明任务正在完成,
// 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued) //将节点加入等待队列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 2.4如果是有时限的get()
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state; // 返回状态
}
LockSupport.parkNanos(this, nanos);
}
else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x; // 返回outcome
......
}
//==================================end
}
awaitDone
方法的核心功能是让当前线程等待异步任务完成,它会持续检查任务的状态,根据不同的状态采取相应的处理措施,同时支持设置超时时间。在等待过程中,若线程被中断,会抛出 InterruptedException
异常。
通过上面的分析,Callable这种方式实际上本质还是Runnable嚯。使用FutureTask将Future和Runnable结合起来,功能更加丰富。
.线程池ThreadPoolExecutor
- 线程池创建线程
如下使用方式。
public class PoolMain {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(1);
long start = System.currentTimeMillis();
// execute=============
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("execute pool创建启动线程!");
});
// submit==============
Future<Integer> future = pool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("submit pool创建启动线程!");
return 100;
});
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
pool.shutdown();
}
}
从上面的例子可以看出,大致有ExecutorService
,Executors
, newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor
类。
接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,Executors
只是一个工具类。
Executor
是顶级接口
public interface Executor {
// 只定义了一个方法
void execute(Runnable command);
}
ExecutorService
:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//....
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
}
AbstractExecutorService
ExecutorService执行方法的默认实现,发现下面的submit()底层实际执行的是execute(ftask)方法【Executor接口的execute()方法,在这个抽象类里面没有具体实现,到具体子类ThreadPoolExecutor在可以看到】。
public abstract class AbstractExecutorService implements ExecutorService {
// 这里重点只看一下submit方法的默认实现
// 优点1: 可以有Future返回值
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 优点2: 支持Callable参数
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
ThreadPoolExecutor
:线程池,可以通过调用Executors静态工厂方法来创建线程池并返回一个ExecutorService对象
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 七大参数!!!!======
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,定制策略来处理任务
) {
if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
回到这一小节最开始的时候,例子中的线程池有两种提交并运行线程的方式execute
和submit
两个方法。现在来看一下ThreadPoolExecutor
中的execute()
方法是什么样子的。submit()我们已经知道了是在AbstractExecutorService中有默认实现的。
// ThreadPoolExecutor::execute(Runnable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.若当前工作线程数小于核心线程数(corePoolSize),尝试创建新的核心线程
// 这里是用的位运算的,我没有深究
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //
return;
// 创建新线程失败,重新获取ctl
c = ctl.get();
}
// 2.任务入队
// 线程池处于运行状态(isRunning(c))
// 且任务成功加入阻塞队列(workQueue.offer(command))
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.2 双重检查
/*
2.2.1再次检查线程池状态(可能在此期间线程池被关闭)。
2.2.2若线程池已关闭,尝试从队列中移除任务,移除成功则拒绝任务(reject(command))。
2.2.3若线程池仍运行但无活跃线程(workerCountOf(recheck) == 0),
添加一个非核心线程(addWorker(null, false)),该线程会从队列中拉取任务执行。
*/
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.若任务无法入队(队列已满),尝试创建非核心线程(addWorker(command, false))
else if (!addWorker(command, false))
//若创建失败(线程数已达maximumPoolSize或线程池已关闭),
//执行拒绝策略(reject(command))
reject(command);
}
/*
execute(command)
│
├─ 检查command非空
│
├─ 当前线程数 < corePoolSize?
│ ├─ 是 → 创建核心线程 → 成功则返回
│ └─ 否 → 进入下一步
│
├─ 线程池是否RUNNING且任务入队成功?
│ ├─ 是 → 双重检查状态
│ │ ├─ 线程池已关闭 → 移除任务并拒绝
│ │ └─ 线程池仍运行且无活跃线程 → 创建非核心线程
│ │
│ └─ 否 → 尝试创建非核心线程
│ ├─ 成功 → 处理任务
│ └─ 失败 → 拒绝任务
*/
为什么需要二次检查线程池状态?
- 任务入队后,其他线程可能关闭了线程池(如调用shutdown())。
- 处理:
- 若线程池已关闭,需移除已入队任务并拒绝。
- 若线程池仍运行但无活跃线程(如核心线程数为0且任务在队列中),需创建新线程处理队列任务。
场景1:核心线程数未满
- 线程池处于
RUNNING
,当前线程数 2(corePoolSize=5
)。 addWorker(task, true)
成功创建核心线程并执行任务。
场景2:队列已满,创建临时线程
- 核心线程满载,队列已满,线程数未达
maximumPoolSize
。 addWorker(task, false)
创建临时线程处理任务。
场景3:SHUTDOWN 状态处理剩余任务
- 线程池调用
shutdown()
,状态变为SHUTDOWN
。 - 已有任务在队列中,
addWorker(null, true)
创建线程处理队列任务。
ThreadPoolExecutor
设计思想:
- 核心线程优先:优先使用核心线程处理任务。
- 队列缓冲:核心线程满载后,任务入队等待。
- 非核心线程应急:队列满后,创建临时线程处理任务(不超过
maximumPoolSize
)
addWorker 创建工作线程
addWorker
方法通过精细的状态检查和并发控制,确保线程池在动态扩缩容时保持线程安全。【方便理解,可以把这个方法看作是创建线程】其核心在于:
- 双重循环:外层处理状态变化,内层处理线程数修改。
- 锁与原子操作结合:保证
workers
集合和workerCount
的一致性。 - 异常回滚机制:确保资源不会泄漏(如线程数虚增或 Worker 未清理)。
firstTask为我们最开始写的Runnable。【记一个代号叫做 “我的任务” 】
// ======== firstTask pool.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("execute pool创建启动线程!"); });
// addWorker(runnable, core)
//标记是否以核心线程数(corePoolSize)为上限创建线程。
//若为 false,则使用最大线程数(maximumPoolSize)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
........
int rs = runStateOf(c);// 获取线程池状态
// 检查是否允许添加Worker
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
.........
//CAS 增加线程数
if (compareAndIncrementWorkerCount(c))
break retry;// 成功增加,跳出循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
...........
try {
// 把“我的任务”传进去了
w = new Worker(firstTask); // 创建的一个Worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁保护 workers 集合
mainLock.lock();
try {
// 再次检查状态(防止在加锁前状态变化)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
//======抛出异常....
workers.add(w); // 将 Worker 加入集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 【Worker类里面的thread】
// 启动线程=========重点【见下面的分析】
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker是ThreadPoolExecutor的内部类,可以看出是一个Runnable,那么肯定重写了run()方法
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
Runnable firstTask; //"我的任务"到这里来了
// 构造函数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 利用线程工厂创建了一个线程
/*
如果final Thread t = w.thread;
t.start();启动的话,执行的是这个内部类的run()
*/
this.thread = getThreadFactory().newThread(this);
}
// run就这一行
public void run() {runWorker(this);}
//到这里了
final void runWorker(Worker w) {
..
//"我的任务"
Runnable task = w.firstTask;
...
try {
//getTask()从等待队列里面取出Runnable
while (task != null || (task = getTask()) != null) {
....
task.run();//==========
}
}......
..
//// 无任务时回收线程
processWorkerExit(w, completedAbruptly);
}
}
ThreadPoolExecutor的静态内部类
:: jdk自带的四种拒绝策略。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 1.直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
.........
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 2.直接丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {
.....
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 3.丢弃等待队列的第一个
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
........
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//如果线程池未关闭,就弹出队列头部的元素,然后尝试执行
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
// 4.调用者运行,直接执行run()方法里面的逻辑。
// 只要线程池没有关闭,就由提交任务的当前线程处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
......
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
总结一下,在了解到了ThreadPoolExecutor的一些类间关系、以及一些基本流程、属性之后。接下来我们来梳理一遍,线程池的运行方式。
-
创建线程池(七大参数、四大拒绝策略)
-
任务提交
//2.1 executor.execute(() -> { // 任务逻辑 });
2.2任务处理决策链:::
2.2.1尝试创建核心线程:当前工作线程数 <
corePoolSize
,直接创建新核心线程执行任务if (workerCount < corePoolSize) { addWorker(command, true); // true表示检查corePoolSize return; }
2.2.2任务入队: 若核心线程已满,任务尝试加入工作队列。
if (workQueue.offer(command)) { // 双重检查线程池状态 if (线程池已关闭) 移除任务并拒绝; else if (当前无活跃线程) 创建非核心线程处理队列任务; }
2.2.3尝试创建非核心线程: 若队列已满且线程数 <
maximumPoolSize
,创建非核心线程。else if (!addWorker(command, false)) { // false表示检查maximumPoolSize reject(command); // 触发拒绝策略 }
2.2.4拒绝任务
- 工作线程执行任务
3.1Worker初始化:每个
Worker
绑定一个线程和初始任务(firstTask
)。Worker w = new Worker(firstTask); final Thread t = w.thread; t.start(); // 启动线程执行runWorker()
3.2任务执行循环(
runWorker
方法):while (task != null || (task = getTask()) != null) { try { task.run(); // 执行任务 } finally { task = null; // 清理任务引用 } }
3.3从队列获取任务(
getTask
方法):
- 阻塞模式:若为核心线程或允许核心线程超时,调用
workQueue.take()
永久阻塞。- 超时模式:若非核心线程,调用
workQueue.poll(keepAliveTime)
超时等待。Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
- 线程回收与资源释放
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 异常终止时补偿workerCount decrementWorkerCount(); mainLock.lock(); try { workers.remove(w); // 从集合中移除Worker if (线程池仍在运行 && 队列非空) addWorker(null, false); // 补充线程处理队列任务 } finally { mainLock.unlock(); } }
线程池本质也是Runnable!
一张好图:【来自:【线程池工作原理】https://blog.csdn.net/fighting_yu/article/details/89473175】
3.探索”锁“
上面探索了线程以及线程池的创建,发现其源码中存在这种代码;
//1.Thread的start方法
public synchronized void start()
//2.线程池部分源码addWorker()方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//3.
LockSupport.park(this);
这些是什么呢?这就是锁。。
确保线程安全最常见的做法是利用锁机制(Lock
、sychronized
)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。
① synchronized
synchronized
是 Java 中最基本的同步机制,它可以修饰方法或代码块,确保同一时刻只有一个线程可以执行被修饰的代码。
public class SynchronizedTest {
public static void main(String[] args) {
SynchronizedTest lock1 = new SynchronizedTest();
new Thread(lock1::test).start();
new Thread(lock1::test2).start();
new Thread(lock1::testStatic).start();
new Thread(lock1::testFs).start();
}
public void testStatic() {
// 锁的是Class对象
synchronized (SynchronizedTest.class){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStatic()");
}
}
// 锁的是一个实例对象
public void test(){
synchronized (this){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test()");
}
}
public synchronized void test2(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test2()");
}
public void testFs(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testFs()");
}
}
上面只有test() 和 test2() 是互斥的。也就是1秒过后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起输出打印。
修饰代码块、修饰方法:锁的是该对象;
修饰静态成员:锁的是该类的Class对象;这种方式可以确保对静态变量的访问是线程安全的
还可以锁任意对象。
其实主要弄清楚各自锁的是什么对象就行了,看是否需要的是一个锁,就可以判断是否互斥了;
//锁的是Class对象
public static synchronized void testStatic1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStaticMethod()");
}
public void testStatic() {
synchronized (SynchronizedTest.class){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStatic()");
}
}
public synchronized void test2(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test2()");
}
如上述代码示例,testStatic1和testStatic需要持有的对象是同一个,故这二者会产生互斥,test2需要持有的是该类的一个实例对象,故不会与这二者产生互斥。
需要注意的是: synchronized 并不属于方法定义的一部分,故synchronized 关键字不能被继承。如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方 法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。
来看看如下示例:
public class Father {
public synchronized void method1(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Father method1");
}
}
//
public class Son extends Father{
public void syncSon() {
super.method1(); // 调用的父类的同步方法
}
public void syncSon1() {
super.method1();
}
public void method1() { // 重写了,但是synchronized并不会继承过来
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Son method1");
}
public void sonHaha() { method1(); }
public void sonHehe() { method1(); }
public static void main(String[] args) {
Son son = new Son();
// new Thread(son::syncSon).start();
// new Thread(son::syncSon1).start(); // 会互斥
new Thread(son::sonHaha).start();
new Thread(son::sonHehe).start(); // 不会
}
}
synchronized【隐式锁】的底层原理涉及到 Java 对象头(Object Header)和 Monitor(监视器)两个关键概念。
每个 Java 对象在内存中分为三部分:
- 对象头(Header)
- Mark Word(标记字段):存储哈希码、GC 分代年龄、锁状态等。
- Klass Pointer(类型指针):指向类的元数据。
- 实例数据(Instance Data)
- 对齐填充(Padding)
Java 对象头:在 Java 虚拟机中,每个对象都有一个对象头,用于存储对象的元数据信息,包括对象的哈希码、GC 相关信息、锁状态等。对象头通常包含一个标记字段(Mark Word),用于标识对象的锁状态。
Monitor(监视器):Monitor 是一种同步机制,负责管理对象的锁。每个对象都与一个 Monitor 相关联。当一个线程尝试进入一个被synchronized修饰的代码块或方法时,它会尝试获取对象的 Monitor。如果 Monitor 处于无锁状态,则当前线程会尝试将其锁定;如果 Monitor 已经被其他线程锁定,则当前线程会进入阻塞状态,直到持有锁的线程释放锁。
// C++ 实现(HotSpot 源码)
class ObjectMonitor {
void* _header; // Mark Word
void* _owner; // 持有锁的线程
volatile intptr_t _count; // 重入次数
ObjectWaiter* _WaitSet; // 等待队列(调用 wait() 的线程)
ObjectWaiter* _EntryList; // 阻塞队列(竞争锁失败的线程)
// ...
};
查看本小节开头示例的test()方法的字节码:
synchronized
同步语句块的实现使用的是 monitorenter
和 monitorexit
指令,当执行 monitorenter
指令时,线程试图获取锁也就是获取 对象监视器 monitor
的持有权。第一个monitorexit正常退出同步块, 第二个是异常退出同步块。
synchronized优化:
锁升级(JDK 6+)
3.0 无锁
- 无锁:当第一个线程第一次访问一个对象的同步块时,JVM会在对象头中设置该线程的ID,并将对象头的状态位设置为“偏向锁”。这个过程称为“偏向”,表示对象当前偏向于第一个访问它的线程。
3.1 偏向锁(Biased Locking)
- 原理:第一个获取锁的线程将线程 ID 写入 Mark Word,后续无需 CAS。这样如果该线程再来的时候,由于是已经设置了锁偏向该线程,故只需比对一下对象头里面的Mark Word就行了。
- 触发条件:JVM 启用了偏向锁(默认开启),且对象未处于锁定状态。
- 撤销:当其他线程尝试竞争时,撤销偏向锁并升级为轻量级锁。
3.2 轻量级锁(Lightweight Locking)
- 加锁流程:
- 在当前线程栈帧中创建 Lock Record。
- 通过 CAS 将 Mark Word 复制到 Lock Record,并尝试将 Mark Word 指向 Lock Record。
轻量级锁在以下场景会升级为重量级锁:
- 自旋失败:竞争线程自旋一定次数后仍未获取锁。
- 竞争加剧:等待锁的线程数超过 JVM 自适应的阈值。
3.3 重量级锁(Heavyweight Locking)
- 实现:通过操作系统提供的互斥量(Mutex)和条件变量实现,线程竞争失败后进入阻塞状态。
- 性能问题:涉及用户态到内核态的切换,开销较大。
synchronized优化:
锁会升级,从低到高升级,反着降级不可以:无锁状态 -> 偏向锁状态 -> 轻量级锁状态 -> 重量级锁状态
锁类型 | 实现机制 | 适用场景 | 性能开销 |
---|---|---|---|
偏向锁 | 通过 Mark Word 记录线程 ID | 单线程重复访问同步块 | 最低 |
轻量级锁 | CAS + 自旋(适应性自旋) | 低竞争、短时间同步 | 中等 |
重量级锁 | 操作系统互斥量(Mutex) + Monitor | 高竞争、长时间同步 | 最高 |
【节选自 锁升级】 :https://blog.csdn.net/weixin_45433817/article/details/132216383
问:synchronized是公平锁吗?
首先要知道公平锁和非公平锁的概念:
- 公平锁:公平锁指的是多个线程按照申请锁的顺序来获取锁,先到先得。当一个线程请求锁时,如果该锁当前处于可用状态,并且在该线程之前已经有其他线程在等待该锁,那么这个线程会被放入等待队列的尾部,等待前面的线程依次获取并释放锁后,它才能获取锁。
- 非公平锁:非公平锁则不保证线程获取锁的顺序与申请锁的顺序一致。当一个线程请求锁时,即使有其他线程在等待该锁,它也会先尝试直接获取锁,如果获取成功就可以直接执行,而不用排队等待。
那么,synchronized
基于对象头的 Mark Word 和监视器(Monitor)实现。当一个线程进入同步块时,它会尝试获取对象的监视器。如果监视器处于空闲状态,该线程会直接获取监视器,而不会考虑是否有其他线程已经在等待这个监视器。例如,当一个线程释放了 synchronized
修饰的同步块的锁后,新到来的线程有很大机会直接获取到锁,而不是等待那些在等待队列中的线程,这就体现了其非公平性。
class SynchronizedNonFairExample {
private static final Object lock = new Object();
private static int counter = 0;
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
synchronized (lock) {
counter++;
System.out.println(Thread.currentThread().getName() + " 获取到锁,计数: " + counter);
try {
// 模拟执行任务
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "Thread-" + i).start();
}
}
}
在上述代码中,多个线程竞争 lock
对象的锁。运行代码时,你会发现线程获取锁的顺序并不是按照线程启动的顺序,这就说明了 synchronized
是非公平锁。
② Lock
上一节的synchronized
是jdk内置的关键字,属于隐式锁、也叫内置锁。现在这一节来探索一下显式锁,其提供更细粒度的控制(如可中断、超时、公平性),核心实现为 ReentrantLock
。
public interface Lock {
//获取锁。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并在获取锁之前处于休眠状态。
void lock();
void lockInterruptibly() throws InterruptedException;
//如果锁可用,则获取锁,并立即返回值为 true。如果锁不可用,则此方法将立即返回值 false。
boolean tryLock();
/*
如果在给定的等待时间内有空闲,并且当前线程尚未中断,则获取该锁。
如果锁可用,则此方法立即返回值 true。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,
直到发生以下三种情况之一:锁由当前线程获取;或者其他线程中断当前线程,支持中断锁获取;或指定的等待时间已用
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//返回绑定到此 Lock 实例的新 Condition 实例。在等待条件之前,锁必须由当前线程持有
Condition newCondition();
}
从上图中,我们可以得知juc包下的几个主要的实现类,绿色圈圈连接的是里面的内部类。
1) ReentrantLock
接下来从我们最熟知的ReentrantLock
开始看起吧,他的简单使用:
public class LockTest {
Lock lock = new ReentrantLock();
int count = 0;
public static void main(String[] args) throws InterruptedException {
LockTest test = new LockTest();
for (int i = 1; i <= 2; i++) {
new Thread(test::add).start();
}
Thread.sleep(2000);
/*
如果不加锁的话,结果就不一定是两万了
*/
System.out.println(test.count);
}
public void add() {
// 标准写法 try加锁 finally释放锁
try {
lock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} finally {
lock.unlock();
}
}
}
我们分析一下,首先是调用了其无参构造函数创建了一个对象,里面是new了一个看名字是非公平同步标记的对象,那他肯定会有公平的同步标记。
// 下面都是在ReentrantLock.java里面
private final Sync sync;
// 无参构造
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 然后调用lock.lock()实际是调用的sync.lock();
public void lock() {
sync.lock();
}
// 然后调用lock.unlock()实际是调用的sync.release(1);;
public void unlock() {
sync.release(1);
}
// 是ReentrantLock的静态内部类
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 是ReentrantLock的静态内部类
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// Sync继承了AbstractQueuedSynchronizer:【AQS】
abstract static class Sync extends AbstractQueuedSynchronizer {
.....
}
从上面额关系我们可以看出一切源头就是AbstractQueuedSynchronizer
,也就是我们熟悉的AQS。在这篇文章的“补充知识点”环节中,对AQS做了一个简单的介绍及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487
【博客园】:https://blog.csdn.net/okok__TXF/article/details/146455487
它是是 Java 并发包 java.util.concurrent.locks
下的一个核心类,是构建锁和其他同步工具(如 ReentrantLock
、Semaphore
、CountDownLatch
等)的基础框架。
里面定义了两种资源共享模式:
- 独占模式(Exclusive):同一时刻只有一个线程能获取资源,如
ReentrantLock
。
在独占模式的时候,tryAcquire(int)
:尝试获取资源,成功返回 true
,失败返回 false
;tryRelease(int)
:尝试释放资源,成功返回 true
,失败返回 false
。
- 共享模式(Share):多个线程可同时获取资源,如
Semaphore
(信号量)、CountDownLatch
(倒计时 latch)。
在共享模式的时候,tryAcquireShared(int)
:尝试获取共享资源,负数表示失败;0 表示成功但无剩余资源;正数表示成功且有剩余资源。tryReleaseShared(int)
:尝试释放共享资源,释放后若允许唤醒后续等待节点,返回 true
,否则 false
。
– 非公平锁
回到ReentrantLock中,我们以lock()方法举例子:【lock是无参构造的】非公平锁
//ReentrantLock.java
public void lock() {
sync.lock(); // ===========1
}
//内部的抽象类Sync
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { // ===========7
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//具体实现类NonfairSync
final void lock() { // ===========2
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // ===========3
}
protected final boolean tryAcquire(int acquires) { // ===========6
return nonfairTryAcquire(acquires); // 这个是抽象类Sync的
}
//AbstractQueuedSynchronizer.java --- acquire(1)
public final void acquire(int arg) { // ===========4
// 这里的tryAcquire是NonfairSync.java里面的
if (!tryAcquire(arg) && // ===========5
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
已经按顺序将1234567标注在了上面,模板方法的设计模式有时候真的会让人晕头转向。。
一句简简单单的lock.lock()方法做了什么呢?
首先,会直接尝试CAS获取锁【compareAndSetState(0, 1)
会通过 CAS 操作尝试将 AQS 中的 state
状态从 0 改为 1】,如果成功的话成功则设置当前线程为锁持有者,否则进入AQS的获取流程;【在这里,当线程调用 lock()
时,会先通过 CAS
操作尝试将 AQS
的 state
从 0
改为 1
。此时不会检查等待队列中是否有其他线程在排队,只要 CAS
成功,就直接获取锁,体现了 “插队” 的特性。】
其次,进入aqs的acquire流程
,
1.tryAcquire(arg) 2.addWaiter(Node.EXCLUSIVE) 3.acquireQueued(xxx)
第一个方法tryAcquire再次尝试获取锁(非公平锁的 tryAcquire
即 nonfairTryAcquire(acquires)
),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次尝试 CAS 抢占(体现非公平性,不检查队列);如果不是0,说明被抢占了,判断持有锁的线程是不是当前线程,如果是(体现可重入性),更新state,如果不是返回false。
然后, 若tryAcquire
失败【返回false】,调用addWaiter(Node.EXCLUSIVE)
会将当前线程包装成一个独占模式的 Node
节点加入 AQS 队列。接着 acquireQueued
会使线程在队列中自旋等待,不断尝试获取锁或被唤醒后尝试获取,直到成功。
接下来分析一下lock.unlock()
方法,这个就在代码里面注释解释了
//ReentrantLock.java
public void unlock() {
//调用其内部同步器 sync 的 release 方法:
sync.release(1); // ===========1
}
// 内部抽象类Sync
protected final boolean tryRelease(int releases) { // ===========3
int c = getState() - releases; // 减少同步状态值(释放一次锁,`state` 减 1)
// 检查当前线程是否为锁的持有者,不是则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 当 `state` 减为 0 时,完全释放锁
free = true;
setExclusiveOwnerThread(null);// 清除独占锁的线程引用
}
setState(c);// 更新 `state` 值
return free;// 返回是否完全释放锁
}
//AbstractQueuedSynchronizer.java --- acquire(1)
public final boolean release(int arg) {
//tryRelease 尝试释放锁,由子类实现具体逻辑
if (tryRelease(arg)) { // ===========2
Node h = head;// 获取等待队列头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 唤醒后继节点 ===========4
return true;
}
return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) { ===========5
int ws = node.waitStatus;
if (ws < 0) // 将头节点的 `waitStatus` 设为 0(取消之前的状态)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到头节点的后继节点
if (s == null || s.waitStatus > 0) { // 若后继节点为空或已取消,从队尾向前找第一个非取消节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 唤醒找到的节点对应的线程
LockSupport.unpark(s.thread);
}
在内部抽象类Sync中的tryRelease中:
- 减少
state
值:ReentrantLock
支持重入,state
记录锁的重入次数。每次调用unlock()
,state
减 1。 - 检查线程所有权:确保只有锁的持有者才能释放锁,否则抛出
IllegalMonitorStateException
。 - 完全释放锁:当
state
减为 0 时,将setExclusiveOwnerThread(null)
,表示锁已完全释放,返回true
。
ReentrantLock
支持重入:【其实从名字就可以看出来了 — Reentrant(再进去的、就是可重入嘛)】
ReentrantLock lock = new ReentrantLock();
lock.lock(); // state=1,线程持有锁
lock.lock(); // state=2(重入)
lock.unlock(); // state=1(未完全释放)
lock.unlock(); // state=0,释放锁并唤醒等待线程
– 公平锁
那么ReentrantLock的公平锁是什么样子的呢?其实大致步骤都差不多,主要是在FairSync.java
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平锁会先检查等待队列是否有前驱节点,若有则不能抢锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁在 state == 0
时,会先通过 hasQueuedPredecessors
检查等待队列。若有其他线程在排队,则当前线程不能抢占,必须入队等待,保证 “先来先得”,体现了公平性。而非公平锁跳过这一步,直接抢锁,这就是非公平性的核心体现。
2) 读写锁
//ReadWriteLock 维护一对关联的锁,一个用于只读作,一个用于写入。只要没有写入器,多个读取器线程就可以同时持有读取锁。
//写锁是独占的。读写锁允许在访问共享数据时实现比互斥锁允许的更高级别的并发。它利用了这样一个事实,
//即虽然一次只有一个线程(写入线程)可以修改共享数据,但在许多情况下,任意数量的线程都可以同时读取数据(因此是读取线程)。
//从理论上讲,与使用互斥锁相比,使用读写锁允许的并发性增加将导致性能改进。
public interface ReadWriteLock {
//返回用于读取的锁。
Lock readLock();
//返回用于写入的锁。
Lock writeLock();
}
读写锁是否会比使用互斥锁提高性能 取决于读取数据的频率与修改数据的频率、读取和写入作的持续时间以及数据的争用 – 即尝试同时读取或写入数据的线程数。例如,最初填充数据,此后不经常修改的集合;经常搜索(例如某种目录)是使用读写锁的理想候选者。但是,如果更新变得频繁,则数据将花费大部分时间进行独占锁定,并发性几乎没有增加。只有分析和测量才能确定使用读写锁是否适合您的应用程序。
读写锁允许多个线程同时读(没有写入时,多个线程允许同时读(提高性能)),但只要有一个线程在写,其他线程就必须等待(只允许一个线程写入(其他线程既不能写入也不能读取))。也就是读读不冲突、读写就要冲突了。
– ReadWriteLock
下面给出一个简单示例:ReadWriteLock
public class ReadWriteLockTest2 {
private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static final Lock readLock = readWriteLock.readLock();
private static final Lock writeLock = readWriteLock.writeLock();
private static int[] a = new int[10];
public static void main(String[] args) throws InterruptedException {
// 1个线程写
new Thread(ReadWriteLockTest2::write).start();
for (int i = 0; i < 9; i++) // 10个线程读
new Thread(()-> System.out.println(get())).start();
Thread.sleep(2000);
}
public static Object get() {
readLock.lock();
try {
Thread.sleep(100);
return a[1];
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
readLock.unlock();
}
}
public static void write() {
writeLock.lock();
try {
a[1]++;
System.out.println("写进行~~~");
Thread.sleep(1000);
System.out.println("写ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock();
}
}
}
写操作在执行的时候,读线程是会阻塞的。但是10个读线程之间并没有阻塞
– StampedLock
StampedLock
对比 ReentrantReadWriteLock
有所增强,在原先读写锁的基础之上新增了乐观读的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。(乐观锁和悲观锁)
StampedLock具有三种控制读/写访问的模式:
1、写入(Writing):方法writeLock可能阻塞等待独占访问,并返回一个戳,该戳可在方法unlockWrite中使用以释放锁。还提供了tryWriteLock的非定时和定时版本。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。
2、读取(Reading):方法readLock可能会阻塞等待非独占访问,并返回一个戳,该戳可在方法unlockRead中使用以释放锁。还提供了tryReadLock的非定时和定时版本。
3、乐观读取(Optimistic Reading):tryOptimisticRead方法返回一个非0的stamp,只有当前同步状态没有被写模式所占有是才能获取到。他是在获取stamp值后对数据进行读取操作,最后验证该stamp值是否发生变化,如果发生变化则读取无效,代表有数据写入。这种方式能够降低竞争和提高吞吐量。
简单示例:
public class StampedLockTest {
private static final StampedLock stampedLock = new StampedLock();
private static double x = 1.0;
private static double y = 1.0;
public static void main(String[] args) {
// 1. 一个线程写
new Thread(() -> addXY(1, 1)).start();
// 2. 10个线程读
for (int i = 0; i < 10; i++) {
new Thread(() -> System.out.println(getSArea())).start();
}
}
private static double getSArea() {
// 乐观读
long stamp = stampedLock.tryOptimisticRead();
double s1 = x * y;
// 验证一下
if (!stampedLock.validate(stamp)) { // 验证失败
stamp = stampedLock.readLock(); // 升级为读锁
try {
s1 = x * y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return s1;
}
private static void addXY(double a, double b) {
long stamp = stampedLock.writeLock();
try {
System.out.println("写进行~~");
x += a;
y += b;
Thread.sleep(1000);
System.out.println("写ok~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(stamp);
}
}
}
写操作:writeLock()
返回一个 stamp
(时间戳),表示获取写锁成功。写锁是独占的,获取时会阻塞所有读锁和其他写锁(除了乐观读)。通过 unlockWrite(stamp)
释放写锁,stamp
必须与获取时的一致,否则抛出异常。
乐观读: tryOptimisticRead()
:获取一个 乐观读时间戳,不实际加锁,直接读取数据(成本极低),然后validate(stamp)
:检查该时间戳对应的读操作期间是否有写操作发生。若 stamp
有效(无写操作),则数据一致;否则需要升级为读锁。锁升级:若验证失败,说明数据可能被修改,通过 readLock()
获取读锁(阻塞直到写锁释放),确保后续读取的数据是最新的。
– 可重入性探讨
JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。
本小节看一下上面两种读写锁的可重入性,首先是ReadWriteLock,从他的实现类来看就是可重入的了【ReentrantReadWriteLock】
在ReentrantReadWriteLock
中也有Sync
的抽象内部类,当调用写锁的lock时,实际是会经过里面重写的tryAcquire
,从下面的代码可以知道同一线程可多次获取写锁:当线程获取写锁后,再次调用 writeLock()
会直接成功(无需等待),因为内部维护了一个 重入计数器(类似 ReentrantLock
)。每次获取写锁时计数器加 1,释放时减 1,计数器为 0 时才真正释放锁。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
....
// Reentrant acquire
setState(c + acquires);
return true;
}
....
return true;
}
ReentrantReadWriteLock
基于 AQS(AbstractQueuedSynchronizer) 实现,通过 state
变量的高 16 位和低 16 位分别记录 读锁的共享次数 和 写锁的重入次数:
- 写锁(独占模式):使用低 16 位记录当前线程的重入次数(和
ReentrantLock
类似)。 - 读锁(共享模式):使用高 16 位记录所有线程的读锁获取次数,但会通过线程本地变量(
ThreadLocal
)记录当前线程的读锁重入次数,避免不同线程的计数干扰。
这种设计使得同一线程多次获取写锁或在读锁 / 写锁之间按规则重入时,不会出现死锁,符合可重入锁的定义。
public static void write() {
writeLock.lock();
try {
writeLock.lock();
a[1]++;
System.out.println("写进行~~~");
Thread.sleep(1000);
System.out.println("写ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock(); // 可重入
writeLock.unlock();
}
}
StampedLock是不可重入的,为什么呢?
-
没有锁计数机制:
StampedLock
并没有像ReentrantLock
那样维护一个锁的重入计数。在ReentrantLock
中,state
变量用于记录锁的重入次数,每次获取锁时state
加 1,释放锁时state
减 1。而StampedLock
中的state
变量主要用于表示锁的状态和版本信息,并非用于记录重入次数。 -
如果一个线程已经持有了
StampedLock
的写锁或读锁,再次尝试获取相同类型的锁时,会出现以下情况:- 写锁情况:如果线程已经持有写锁,再次调用
writeLock()
方法,由于写锁是独占的,该线程会被阻塞,因为它会等待自己释放写锁后才能再次获取,这显然会导致死锁。 - 读锁情况:如果线程已经持有读锁,再次调用
readLock()
方法,虽然读锁是共享的,但StampedLock
并不会像可重入锁那样允许线程多次获取而不产生问题。而且如果在持有读锁的情况下尝试获取写锁,会导致死锁,因为写锁需要独占资源,而当前线程已经持有了读锁。
- 写锁情况:如果线程已经持有写锁,再次调用
private static void addXY(double a, double b) {
long stamp = stampedLock.writeLock();
try {
long lock = stampedLock.writeLock(); // 不可重入
System.out.println("写进行~~");
x += a;
y += b;
Thread.sleep(1000);
System.out.println("写ok~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(stamp);
}
}
③ 锁案例
上面只掌握了一丢丢的理论,没有实践怎么行呢?
1) 交替打印
第一个,我们来实现一下三个线程交替打印A B C试试,第一个线程打印A,第二个B,第三个C
// synchronized实现
public class PrintABCSynchronized {
private int now = 1;
public static void main(String[] args) {
PrintABCSynchronized obj = new PrintABCSynchronized();
new Thread(obj::printA).start();
new Thread(obj::printB).start();
new Thread(obj::printC).start();
}
public void printA() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 1 ) { // 为什么用while,不用if?留给读者思考
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("A"); now = 2;
this.notifyAll();
}
}
}
public void printB() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 2 ) {
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("B"); now = 3;
this.notifyAll();
}
}
}
public void printC() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 3 ) {
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("C"); now = 1;
this.notifyAll();
}
}
}
}
wait – notify 【这个只能用在synchronized同步代码块中,是属于Object的方法】上面的缺陷很严重,那就是一下子就唤醒了所有挂起的线程,其实有的线程根本就不用唤醒,有没有一种办法,就是我想唤醒谁就唤醒谁呢?
public class PrintABCLock {
private Lock lock = new ReentrantLock();
private Condition a = lock.newCondition();
private Condition b = lock.newCondition();
private Condition c = lock.newCondition();
private int flag = 1;
public static void main(String[] args) {
PrintABCLock obj = new PrintABCLock();
new Thread(obj::printA).start();
new Thread(obj::printB).start();
new Thread(obj::printC).start();
}
public void printA() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 1 ) a.await();
System.out.println('A'); flag = 2;
b.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 2 ) b.await();
System.out.println('B'); flag = 3;
c.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 3 ) c.await();
System.out.println('C'); flag = 1;
a.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
2) 阻塞队列
下面模仿jdk中ArrayBlockingQueue的源码,给了一个简洁的阻塞队列
class TBlockedQueue<T> {
private final Lock lock;
private final Condition notEmpty;
private final Condition notFull;
private final int capacity;
private final LinkedList<T> list;
public TBlockedQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException("Capacity 不能小于1");
this.capacity = capacity;
list = new LinkedList<>();
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void add( T t ) {
list.addLast(t);
}
// 1. 入队 -- 当队列已满时,向队列中添加元素的操作会被阻塞,直到队列有空间可用。
public void put( T t ) {
if (t == null) throw new NullPointerException();
lock.lock();
try {
while (list.size() == capacity) notFull.await();
list.addLast(t);
notEmpty.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
// 2. 出队 -- 当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新元素加入
public T take() {
lock.lock();
try {
while (list.isEmpty()) notEmpty.await();
T t = list.removeFirst();
notFull.signal();
return t;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
这里自定义的 TBlockedQueue
是一个典型的 有界阻塞队列,其核心思路是通过 锁(Lock)和条件变量(Condition) 实现线程间的同步与协调,确保在多线程环境下对队列的操作是安全的。通过 lock.lock()
和 lock.unlock()
包裹对共享资源 list
的操作,确保同一时刻只有一个线程修改队列。同时,使用 while
循环检查条件(如 list.size() == capacity
),防止 虚假唤醒导致条件不满足时错误地继续执行。
notEmpty
:当队列为空时,take
操作会等待此条件;当有元素入队时,通过signal()
唤醒等待的消费者线程。notFull
:当队列已满时,put
操作会等待此条件;当有元素出队时,通过signal()
唤醒等待的生产者线程。
3) AQS自定义锁
前面分析可以知道ReentrantLock是以AQS为基础框架来实现的,那么,此节我们自定义来实现一个锁。
见 “Java并发探索–下篇”
4.探索并发工具
5.虚拟线程
见 “Java并发探索–下篇” — 在下面找
【博客园】
https://www.cnblogs.com/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
end.参考
- https://blog.csdn.net/agonie201218/article/details/128712507
- https://blog.csdn.net/xu_yong_lin/article/details/117521773
- https://www.cnblogs.com/java-bible/p/13930006.html
- https://blog.csdn.net/fighting_yu/article/details/89473175
- https://tech.meituan.com/2018/11/15/java-lock.html
- https://blog.csdn.net/weixin_44772566/article/details/137398521
- https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized详解】
- https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方网站— 神中神】
来源链接:https://www.cnblogs.com/jackjavacpp/p/18852416
如有侵犯您的版权,请及时联系3500663466#qq.com(#换@),我们将第一时间删除本站数据。
暂无评论内容