并发编程–上篇

Java并发探索–上篇

1.基本概念

  • 线程与进程:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。
  • 并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
  • 同步与异步:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。

“Java并发探索–下篇” — 在下面找

【博客园】

https://www.cnblogs.com/jackjavacpp

【CSDN】

https://blog.csdn.net/okok__TXF

2.探索线程的创建

①线程的状态

Thread源码里面看出

public enum State {
 	// 尚未启动的线程的线程状态。
    NEW,
	// 就绪
    RUNNABLE,
 	// 等待监视器锁的线程的线程状态
    BLOCKED,
	/*
	等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
	Object.wait() 没有超时
	Thread.join() 没有超时
	LockSupport.park()
	*/
    WAITING,
	/*
	指定等待时间的等待线程的线程状态
	线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
	Thread.sleep
    Object.wait with timeout
    Thread.join with timeout
    LockSupport.parkNanos
    LockSupport.parkUntil
	*/
    TIMED_WAITING,
	//终止线程的线程状态。线程已完成执行。
    TERMINATED;
}

下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】

在Java中,一个Thread有大致六个状态。

线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 RUNNABLE(就绪) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。

明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。

②线程创建

1)两种基本方式

  • 继承Thread类,重写run方法
public class MyThread1 extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": hello world");
    }
}
public class JUCMain {
    public static void main(String[] args) {
        new MyThread1().start();
    }
}
  • 实现Runnable接口,传入Thread
public class Runnable1 implements Runnable{
    @Override
    public void run() {
        System.out.println("hello world, Runnable");
    }
}
public class JUCMain {
    public static void main(String[] args) {
        new Thread(new Runnable1()).start();
    }
}

网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。

首先从start()方法看起(这个方式属于Thread类的)。调用start()后,JVM会创建一个新线程并执行该线程的run()方法。注意:直接调用run()不会启动新线程,而是在当前线程中执行。

// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {
    // 零状态值对应于状态 “NEW”
    // 线程想要start,必须是为0的状态
    if (threadStatus != 0)
        throw new IllegalThreadStateException();
    /*
    group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
    同时线程组的未启动线程计数会减1。
    */
    group.add(this);
    boolean started = false;
    try {
        start0(); //关键!调用本地方法(native)
        started = true;
    } finally {
        try {
            if (!started) { //启动失败时回滚
                //如果 started 为 false,说明线程启动失败,
                //调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
        }
    }
}
//========== native
private native void start0();

那么执行的是run()方法,run方法里面是啥呢

private Runnable target; // target是Runnable类型

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。

如果是实现的Runnable接口,new Thread(new Runnable1())的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。

2) 其他创建方式

.lambda
  • lambda表达式创建:这个仅仅是写法不同而已。因为Runnable是个函数式接口
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
.callable
  • Callable创建的方式
public class MyCall implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(2000);
        return "Hello Callable";
    }
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> task = new FutureTask<>(new MyCall());
    new Thread(task).start();
    System.out.println(task.get());
}

new Thread(Runnable runnable)要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.

从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。

Future<V>

Future 接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future 对象获取。

// 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future<V> {

    //尝试取消异步任务的执行。
    /*
    如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
    如果任务成功取消,则返回 true。
    */
    boolean cancel(boolean mayInterruptIfRunning);

    //如果任务在完成之前被取消,则返回 true;否则返回 false。
    boolean isCancelled();

    //如果任务已经完成,则返回 true;否则返回 false。
    boolean isDone();

    //获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
    V get() throws InterruptedException, ExecutionException;

    //获取异步任务的计算结果,并且可以指定一个超时时间。
    //如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    // 很简单嘛,这个是来自Runnable的
    void run();
}

这个接口就相当于组合了Runnable和Future,能够获取到返回值了。

FutureTask<V> 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。

public class FutureTask<V> implements RunnableFuture<V> {
    // 基本属性
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** 结果 */
    private Object outcome; 
    /** The thread running the callable; CAS ed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    // 看它的构造函数1
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable; // 赋值callable========
        this.state = NEW; // ensure visibility of callable
    }
    // 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    /*
    Executors::callable(xx, xx)方法==========
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run(); // 调用Runnable的run()
            return result;
        }
    }
    */
    
    // run()方法 --------------- 
    // new Thread(new FutureTask<>(new MyCall()))
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //====调用callable.call()
                    result = c.call(); 
                    ran = true;
                } catch (Throwable ex) {
                   .........
                }
                // 如果运行OK了,设置结果!
                if (ran) set(result);
            }
        } finally {
            .............
        }
    }
    
    // 设置结果outcome
    protected void set(V v) {
        // https://www.cnblogs.com/jackjavacpp/p/18787832
        // 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v; // 这里
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    // 比较核心的get方法================start
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) // 如果状态不是完成
            s = awaitDone(false, 0L); // 等待完成
        return report(s); // 返回结果
    }
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
  		// 1.计算超时截止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) { // 2.自旋循环等待任务完成
            // 2.1如果该线程中断了
            if (Thread.interrupted()) {
                removeWaiter(q);// 从等待队列中移除当前节点
                throw new InterruptedException();
            }
            // 2.2检查状态
            int s = state;
            // 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;// 返回最终状态
            }
            // 2.3若任务状态等于 COMPLETING,表明任务正在完成,
            // 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued) //将节点加入等待队列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) { // 2.4如果是有时限的get()
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) { 
                    removeWaiter(q);
                    return state; // 返回状态
                }
                LockSupport.parkNanos(this, nanos);
            }
            else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
                LockSupport.park(this);
        }
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x; // 返回outcome
       ......
    }
    //==================================end
}

awaitDone 方法的核心功能是让当前线程等待异步任务完成,它会持续检查任务的状态,根据不同的状态采取相应的处理措施,同时支持设置超时时间。在等待过程中,若线程被中断,会抛出 InterruptedException 异常。

通过上面的分析,Callable这种方式实际上本质还是Runnable嚯。使用FutureTask将Future和Runnable结合起来,功能更加丰富。

.线程池ThreadPoolExecutor
  • 线程池创建线程

如下使用方式。

public class PoolMain {
    public static void main(String[] args) {
        // 创建一个线程池
        ExecutorService pool = Executors.newFixedThreadPool(1);
        long start = System.currentTimeMillis();
        // execute=============
        pool.execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("execute pool创建启动线程!");
        });
        // submit==============
        Future<Integer> future = pool.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("submit pool创建启动线程!");
            return 100;
        });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
        System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
        pool.shutdown();
    }
}

从上面的例子可以看出,大致有ExecutorServiceExecutors, newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor类。

接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,Executors只是一个工具类。

Executor是顶级接口

public interface Executor {
	// 只定义了一个方法
    void execute(Runnable command);
}

ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    //....
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
}

AbstractExecutorServiceExecutorService执行方法的默认实现,发现下面的submit()底层实际执行的是execute(ftask)方法【Executor接口的execute()方法,在这个抽象类里面没有具体实现,到具体子类ThreadPoolExecutor在可以看到】

public abstract class AbstractExecutorService implements ExecutorService {
    // 这里重点只看一下submit方法的默认实现
    // 优点1: 可以有Future返回值
	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    // 优点2: 支持Callable参数
    public <T> Future<T> submit(Callable<T> task) { 
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

ThreadPoolExecutor:线程池,可以通过调用Executors静态工厂方法来创建线程池并返回一个ExecutorService对象

public class ThreadPoolExecutor extends AbstractExecutorService {
     /**
     * 七大参数!!!!======
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,定制策略来处理任务
                               ) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

回到这一小节最开始的时候,例子中的线程池有两种提交并运行线程的方式executesubmit两个方法。现在来看一下ThreadPoolExecutor中的execute()方法是什么样子的。submit()我们已经知道了是在AbstractExecutorService中有默认实现的。

// ThreadPoolExecutor::execute(Runnable command)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1.若当前工作线程数小于核心线程数(corePoolSize),尝试创建新的核心线程
    // 这里是用的位运算的,我没有深究
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) // 
            return;
        // 创建新线程失败,重新获取ctl
        c = ctl.get();
    }
    // 2.任务入队
    // 线程池处于运行状态(isRunning(c))
    // 且任务成功加入阻塞队列(workQueue.offer(command))
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.2 双重检查
        /*
        2.2.1再次检查线程池状态(可能在此期间线程池被关闭)。
        2.2.2若线程池已关闭,尝试从队列中移除任务,移除成功则拒绝任务(reject(command))。
        2.2.3若线程池仍运行但无活跃线程(workerCountOf(recheck) == 0),
        添加一个非核心线程(addWorker(null, false)),该线程会从队列中拉取任务执行。
        */
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.若任务无法入队(队列已满),尝试创建非核心线程(addWorker(command, false))
    else if (!addWorker(command, false))
        //若创建失败(线程数已达maximumPoolSize或线程池已关闭),
        //执行拒绝策略(reject(command))
        reject(command);

}
/*
execute(command)
│
├─ 检查command非空
│
├─ 当前线程数 < corePoolSize?
│   ├─ 是 → 创建核心线程 → 成功则返回
│   └─ 否 → 进入下一步
│
├─ 线程池是否RUNNING且任务入队成功?
│   ├─ 是 → 双重检查状态
│   │   ├─ 线程池已关闭 → 移除任务并拒绝
│   │   └─ 线程池仍运行且无活跃线程 → 创建非核心线程
│   │
│   └─ 否 → 尝试创建非核心线程
│       ├─ 成功 → 处理任务
│       └─ 失败 → 拒绝任务
*/

为什么需要二次检查线程池状态?

- 任务入队后,其他线程可能关闭了线程池(如调用shutdown())。
- 处理:
  - 若线程池已关闭,需移除已入队任务并拒绝。
  - 若线程池仍运行但无活跃线程(如核心线程数为0且任务在队列中),需创建新线程处理队列任务。

场景1:核心线程数未满

  • 线程池处于 RUNNING,当前线程数 2(corePoolSize=5)。
  • addWorker(task, true) 成功创建核心线程并执行任务。

场景2:队列已满,创建临时线程

  • 核心线程满载,队列已满,线程数未达 maximumPoolSize
  • addWorker(task, false) 创建临时线程处理任务。

场景3:SHUTDOWN 状态处理剩余任务

  • 线程池调用 shutdown(),状态变为 SHUTDOWN
  • 已有任务在队列中,addWorker(null, true) 创建线程处理队列任务。

ThreadPoolExecutor设计思想:

  • 核心线程优先:优先使用核心线程处理任务。
  • 队列缓冲:核心线程满载后,任务入队等待。
  • 非核心线程应急:队列满后,创建临时线程处理任务(不超过maximumPoolSize

addWorker 创建工作线程

addWorker 方法通过精细的状态检查和并发控制,确保线程池在动态扩缩容时保持线程安全。【方便理解,可以把这个方法看作是创建线程】其核心在于:

  • 双重循环:外层处理状态变化,内层处理线程数修改。
  • 锁与原子操作结合:保证 workers 集合和 workerCount 的一致性。
  • 异常回滚机制:确保资源不会泄漏(如线程数虚增或 Worker 未清理)。

firstTask为我们最开始写的Runnable。【记一个代号叫做 “我的任务” 】

// ======== firstTask
pool.execute(() -> {
   try {
       Thread.sleep(1000);
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   }
   System.out.println("execute pool创建启动线程!");
});
// addWorker(runnable, core)
//标记是否以核心线程数(corePoolSize)为上限创建线程。
//若为 false,则使用最大线程数(maximumPoolSize)
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        ........
        int rs = runStateOf(c);// 获取线程池状态
        // 检查是否允许添加Worker
        if (rs >= SHUTDOWN &&
            !(rs == SHUTDOWN &&
               firstTask == null &&
               !workQueue.isEmpty()))
            return false;

        for (;;) {
            .........
            //CAS 增加线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;// 成功增加,跳出循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    ...........
    try {
        // 把“我的任务”传进去了
        w = new Worker(firstTask); // 创建的一个Worker
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁保护 workers 集合
            mainLock.lock();
            try {
                // 再次检查状态(防止在加锁前状态变化)
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                       //======抛出异常....
                    workers.add(w); // 将 Worker 加入集合
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 【Worker类里面的thread】
                // 启动线程=========重点【见下面的分析】
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker是ThreadPoolExecutor的内部类,可以看出是一个Runnable,那么肯定重写了run()方法

private final class Worker
        extends AbstractQueuedSynchronizer implements Runnable
{
    final Thread thread;
    Runnable firstTask; //"我的任务"到这里来了
    // 构造函数
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 利用线程工厂创建了一个线程
        /*
        如果final Thread t = w.thread;
        t.start();启动的话,执行的是这个内部类的run()
        */
        this.thread = getThreadFactory().newThread(this);
    }
    // run就这一行
    public void run() {runWorker(this);}
    //到这里了
    final void runWorker(Worker w) {
        ..
        //"我的任务"
        Runnable task = w.firstTask;
        ...
        try {
          //getTask()从等待队列里面取出Runnable
          while (task != null || (task = getTask()) != null) {
              ....
              task.run();//==========
          }
        }......
        ..
        //// 无任务时回收线程
         processWorkerExit(w, completedAbruptly);
        
    }
}

ThreadPoolExecutor的静态内部类 :: jdk自带的四种拒绝策略

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 1.直接抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
    .........
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
// 2.直接丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {
    .....
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
// 3.丢弃等待队列的第一个
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    ........
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		//如果线程池未关闭,就弹出队列头部的元素,然后尝试执行
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
// 4.调用者运行,直接执行run()方法里面的逻辑。
// 只要线程池没有关闭,就由提交任务的当前线程处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    ......
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

总结一下,在了解到了ThreadPoolExecutor的一些类间关系、以及一些基本流程、属性之后。接下来我们来梳理一遍,线程池的运行方式。

  1. 创建线程池(七大参数、四大拒绝策略)

  2. 任务提交

//2.1
executor.execute(() -> {
    // 任务逻辑
});

2.2任务处理决策链:::

2.2.1尝试创建核心线程:当前工作线程数 < corePoolSize,直接创建新核心线程执行任务

if (workerCount < corePoolSize) {
    addWorker(command, true); // true表示检查corePoolSize
    return;
}

2.2.2任务入队: 若核心线程已满,任务尝试加入工作队列。

if (workQueue.offer(command)) {
    // 双重检查线程池状态
    if (线程池已关闭) 移除任务并拒绝;
    else if (当前无活跃线程) 创建非核心线程处理队列任务;
}

2.2.3尝试创建非核心线程: 若队列已满且线程数 < maximumPoolSize,创建非核心线程。

else if (!addWorker(command, false)) { // false表示检查maximumPoolSize
    reject(command); // 触发拒绝策略
}

2.2.4拒绝任务

  1. 工作线程执行任务

3.1Worker初始化:每个Worker绑定一个线程和初始任务(firstTask)。

Worker w = new Worker(firstTask);
final Thread t = w.thread;
t.start(); // 启动线程执行runWorker()

3.2任务执行循环(runWorker方法)

while (task != null || (task = getTask()) != null) {
    try {
        task.run(); // 执行任务
    } finally {
        task = null; // 清理任务引用
    }
}

3.3从队列获取任务(getTask方法)

  • 阻塞模式:若为核心线程或允许核心线程超时,调用workQueue.take()永久阻塞。
  • 超时模式:若非核心线程,调用workQueue.poll(keepAliveTime)超时等待。
Runnable r = timed ? 
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
  1. 线程回收与资源释放
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // 异常终止时补偿workerCount
        decrementWorkerCount();
    mainLock.lock();
    try {
        workers.remove(w); // 从集合中移除Worker
        if (线程池仍在运行 && 队列非空) 
            addWorker(null, false); // 补充线程处理队列任务
    } finally {
        mainLock.unlock();
    }
}

线程池本质也是Runnable!

一张好图:【来自:【线程池工作原理】https://blog.csdn.net/fighting_yu/article/details/89473175】

3.探索”锁“

上面探索了线程以及线程池的创建,发现其源码中存在这种代码;

//1.Thread的start方法
public synchronized void start()

//2.线程池部分源码addWorker()方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

//3.
LockSupport.park(this);

这些是什么呢?这就是锁。。

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

① synchronized

synchronized 是 Java 中最基本的同步机制,它可以修饰方法或代码块,确保同一时刻只有一个线程可以执行被修饰的代码。

public class SynchronizedTest {
    public static void main(String[] args) {
        SynchronizedTest lock1 = new SynchronizedTest();
        new Thread(lock1::test).start();
        new Thread(lock1::test2).start();
        new Thread(lock1::testStatic).start();
        new Thread(lock1::testFs).start();
    }
    public void testStatic() {
        // 锁的是Class对象
        synchronized (SynchronizedTest.class){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("testStatic()");
        }
    }

    // 锁的是一个实例对象
    public void test(){
        synchronized (this){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("test()");
        }
    }

    public synchronized void test2(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("test2()");
    }

    public void testFs(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("testFs()");
    }
}

上面只有test() 和 test2() 是互斥的。也就是1秒过后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起输出打印。

修饰代码块、修饰方法:锁的是该对象;

修饰静态成员:锁的是该类的Class对象;这种方式可以确保对静态变量的访问是线程安全的

还可以锁任意对象。

其实主要弄清楚各自锁的是什么对象就行了,看是否需要的是一个锁,就可以判断是否互斥了;

//锁的是Class对象
public static synchronized void testStatic1() {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("testStaticMethod()");
}
public void testStatic() {
    synchronized (SynchronizedTest.class){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("testStatic()");
    }
}
public synchronized void test2(){
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("test2()");
}

如上述代码示例,testStatic1和testStatic需要持有的对象是同一个,故这二者会产生互斥,test2需要持有的是该类的一个实例对象,故不会与这二者产生互斥。

需要注意的是: synchronized 并不属于方法定义的一部分,故synchronized 关键字不能被继承。如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方 法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。
来看看如下示例:

public class Father {
    public synchronized void method1(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Father method1");
    }
}
//
public class Son extends Father{
    public void syncSon() {
        super.method1(); // 调用的父类的同步方法
    }
    public void syncSon1() {
        super.method1();
    }
    public void method1() { // 重写了,但是synchronized并不会继承过来
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Son method1");
    }

    public void sonHaha() { method1(); }
    public void sonHehe() { method1(); }

    public static void main(String[] args) {
        Son son = new Son();
        // new Thread(son::syncSon).start();
        // new Thread(son::syncSon1).start(); // 会互斥
        new Thread(son::sonHaha).start();
        new Thread(son::sonHehe).start(); // 不会
    }
}

synchronized【隐式锁】的底层原理涉及到 Java 对象头(Object Header)和 Monitor(监视器)两个关键概念。

每个 Java 对象在内存中分为三部分:

  1. 对象头(Header)
    • Mark Word(标记字段):存储哈希码、GC 分代年龄、锁状态等。
    • Klass Pointer(类型指针):指向类的元数据。
  2. 实例数据(Instance Data)
  3. 对齐填充(Padding)

Java 对象头:在 Java 虚拟机中,每个对象都有一个对象头,用于存储对象的元数据信息,包括对象的哈希码、GC 相关信息、锁状态等。对象头通常包含一个标记字段(Mark Word),用于标识对象的锁状态

Monitor(监视器):Monitor 是一种同步机制,负责管理对象的锁。每个对象都与一个 Monitor 相关联。当一个线程尝试进入一个被synchronized修饰的代码块或方法时,它会尝试获取对象的 Monitor。如果 Monitor 处于无锁状态,则当前线程会尝试将其锁定;如果 Monitor 已经被其他线程锁定,则当前线程会进入阻塞状态,直到持有锁的线程释放锁。

// C++ 实现(HotSpot 源码)
class ObjectMonitor {
    void*   _header;       // Mark Word
    void*   _owner;        // 持有锁的线程
    volatile intptr_t  _count;     // 重入次数
    ObjectWaiter* _WaitSet;        // 等待队列(调用 wait() 的线程)
    ObjectWaiter* _EntryList;      // 阻塞队列(竞争锁失败的线程)
    // ...
};

查看本小节开头示例的test()方法的字节码:

synchronized 同步语句块的实现使用的是 monitorentermonitorexit 指令,当执行 monitorenter 指令时,线程试图获取锁也就是获取 对象监视器 monitor 的持有权。第一个monitorexit正常退出同步块, 第二个是异常退出同步块。

synchronized优化:

锁升级(JDK 6+)

3.0 无锁

  • 无锁:当第一个线程第一次访问一个对象的同步块时,JVM会在对象头中设置该线程的ID,并将对象头的状态位设置为“偏向锁”。这个过程称为“偏向”,表示对象当前偏向于第一个访问它的线程。

3.1 偏向锁(Biased Locking)

  • 原理:第一个获取锁的线程将线程 ID 写入 Mark Word,后续无需 CAS。这样如果该线程再来的时候,由于是已经设置了锁偏向该线程,故只需比对一下对象头里面的Mark Word就行了。
  • 触发条件:JVM 启用了偏向锁(默认开启),且对象未处于锁定状态。
  • 撤销:当其他线程尝试竞争时,撤销偏向锁并升级为轻量级锁。

3.2 轻量级锁(Lightweight Locking)

  • 加锁流程
    1. 在当前线程栈帧中创建 Lock Record
    2. 通过 CAS 将 Mark Word 复制到 Lock Record,并尝试将 Mark Word 指向 Lock Record。

轻量级锁在以下场景会升级为重量级锁:

  1. 自旋失败:竞争线程自旋一定次数后仍未获取锁。
  2. 竞争加剧:等待锁的线程数超过 JVM 自适应的阈值

3.3 重量级锁(Heavyweight Locking)

  • 实现:通过操作系统提供的互斥量(Mutex)和条件变量实现,线程竞争失败后进入阻塞状态。
  • 性能问题:涉及用户态到内核态的切换,开销较大。

synchronized优化:

锁会升级,从低到高升级,反着降级不可以:无锁状态 -> 偏向锁状态 -> 轻量级锁状态 -> 重量级锁状态

锁类型 实现机制 适用场景 性能开销
偏向锁 通过 Mark Word 记录线程 ID 单线程重复访问同步块 最低
轻量级锁 CAS + 自旋(适应性自旋) 低竞争、短时间同步 中等
重量级锁 操作系统互斥量(Mutex) + Monitor 高竞争、长时间同步 最高

【节选自 锁升级】 :https://blog.csdn.net/weixin_45433817/article/details/132216383

问:synchronized是公平锁吗?

首先要知道公平锁和非公平锁的概念:

  • 公平锁:公平锁指的是多个线程按照申请锁的顺序来获取锁,先到先得。当一个线程请求锁时,如果该锁当前处于可用状态,并且在该线程之前已经有其他线程在等待该锁,那么这个线程会被放入等待队列的尾部,等待前面的线程依次获取并释放锁后,它才能获取锁。
  • 非公平锁:非公平锁则不保证线程获取锁的顺序与申请锁的顺序一致。当一个线程请求锁时,即使有其他线程在等待该锁,它也会先尝试直接获取锁,如果获取成功就可以直接执行,而不用排队等待。

那么,synchronized 基于对象头的 Mark Word 和监视器(Monitor)实现。当一个线程进入同步块时,它会尝试获取对象的监视器。如果监视器处于空闲状态,该线程会直接获取监视器,而不会考虑是否有其他线程已经在等待这个监视器。例如,当一个线程释放了 synchronized 修饰的同步块的锁后,新到来的线程有很大机会直接获取到锁,而不是等待那些在等待队列中的线程,这就体现了其非公平性。

class SynchronizedNonFairExample {
    private static final Object lock = new Object();
    private static int counter = 0;
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                while (true) {
                    synchronized (lock) {
                        counter++;
                        System.out.println(Thread.currentThread().getName() + " 获取到锁,计数: " + counter);
                        try {
                            // 模拟执行任务
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "Thread-" + i).start();
        }
    }
}

在上述代码中,多个线程竞争 lock 对象的锁。运行代码时,你会发现线程获取锁的顺序并不是按照线程启动的顺序,这就说明了 synchronized 是非公平锁。

② Lock

上一节的synchronized是jdk内置的关键字,属于隐式锁、也叫内置锁。现在这一节来探索一下显式锁,其提供更细粒度的控制(如可中断、超时、公平性),核心实现为 ReentrantLock

public interface Lock {
    //获取锁。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并在获取锁之前处于休眠状态。
    void lock();
    void lockInterruptibly() throws InterruptedException;
    //如果锁可用,则获取锁,并立即返回值为 true。如果锁不可用,则此方法将立即返回值 false。
	boolean tryLock();
    /*
    如果在给定的等待时间内有空闲,并且当前线程尚未中断,则获取该锁。
    如果锁可用,则此方法立即返回值 true。如果锁不可用,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,
    直到发生以下三种情况之一:锁由当前线程获取;或者其他线程中断当前线程,支持中断锁获取;或指定的等待时间已用
    */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    //释放锁
	void unlock();
    //返回绑定到此 Lock 实例的新 Condition 实例。在等待条件之前,锁必须由当前线程持有
	Condition newCondition();
}

从上图中,我们可以得知juc包下的几个主要的实现类,绿色圈圈连接的是里面的内部类。

1) ReentrantLock

接下来从我们最熟知的ReentrantLock开始看起吧,他的简单使用:

public class LockTest {
    Lock lock = new ReentrantLock();
    int count = 0;
    public static void main(String[] args) throws InterruptedException {
        LockTest test = new LockTest();
        for (int i = 1; i <= 2; i++) {
            new Thread(test::add).start();
        }
        Thread.sleep(2000);
        /*
        如果不加锁的话,结果就不一定是两万了
        */
        System.out.println(test.count);
    }

    public void add() {
        // 标准写法 try加锁 finally释放锁
        try {
            lock.lock();
            for (int i = 0; i < 10000; i++) {
                count++;
            }
        } finally {
            lock.unlock();
        }
    }
}

我们分析一下,首先是调用了其无参构造函数创建了一个对象,里面是new了一个看名字是非公平同步标记的对象,那他肯定会有公平的同步标记。

// 下面都是在ReentrantLock.java里面
private final Sync sync;
// 无参构造
public ReentrantLock() {
    sync = new NonfairSync();
}
// 有参构造
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
// 然后调用lock.lock()实际是调用的sync.lock();
public void lock() {
    sync.lock();
}
// 然后调用lock.unlock()实际是调用的sync.release(1);;
public void unlock() {
    sync.release(1);
}
// 是ReentrantLock的静态内部类
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
// 是ReentrantLock的静态内部类
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
// Sync继承了AbstractQueuedSynchronizer:【AQS】
abstract static class Sync extends AbstractQueuedSynchronizer {
    .....
}

从上面额关系我们可以看出一切源头就是AbstractQueuedSynchronizer,也就是我们熟悉的AQS。在这篇文章的“补充知识点”环节中,对AQS做了一个简单的介绍及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487

【博客园】:https://blog.csdn.net/okok__TXF/article/details/146455487

它是是 Java 并发包 java.util.concurrent.locks 下的一个核心类,是构建锁和其他同步工具(如 ReentrantLockSemaphoreCountDownLatch 等)的基础框架。

里面定义了两种资源共享模式:

  • 独占模式(Exclusive):同一时刻只有一个线程能获取资源,如 ReentrantLock

在独占模式的时候,tryAcquire(int):尝试获取资源,成功返回 true,失败返回 falsetryRelease(int):尝试释放资源,成功返回 true,失败返回 false

  • 共享模式(Share):多个线程可同时获取资源,如 Semaphore(信号量)、CountDownLatch(倒计时 latch)。

在共享模式的时候,tryAcquireShared(int):尝试获取共享资源,负数表示失败;0 表示成功但无剩余资源;正数表示成功且有剩余资源。tryReleaseShared(int):尝试释放共享资源,释放后若允许唤醒后续等待节点,返回 true,否则 false

– 非公平锁

回到ReentrantLock中,我们以lock()方法举例子:【lock是无参构造的】非公平锁

//ReentrantLock.java
public void lock() {
    sync.lock(); // ===========1
}
//内部的抽象类Sync
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { // ===========7
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

//具体实现类NonfairSync
final void lock() { // ===========2
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1); // ===========3
}
protected final boolean tryAcquire(int acquires) { // ===========6
    return nonfairTryAcquire(acquires); // 这个是抽象类Sync的
}

//AbstractQueuedSynchronizer.java --- acquire(1)
public final void acquire(int arg) { // ===========4
    // 这里的tryAcquire是NonfairSync.java里面的
    if (!tryAcquire(arg) && // ===========5
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

已经按顺序将1234567标注在了上面,模板方法的设计模式有时候真的会让人晕头转向。。

一句简简单单的lock.lock()方法做了什么呢?

首先,会直接尝试CAS获取锁【compareAndSetState(0, 1) 会通过 CAS 操作尝试将 AQS 中的 state 状态从 0 改为 1】,如果成功的话成功则设置当前线程为锁持有者,否则进入AQS的获取流程;【在这里,当线程调用 lock() 时,会先通过 CAS 操作尝试将 AQSstate0 改为 1此时不会检查等待队列中是否有其他线程在排队,只要 CAS 成功,就直接获取锁,体现了 “插队” 的特性。】

其次,进入aqs的acquire流程

1.tryAcquire(arg)   2.addWaiter(Node.EXCLUSIVE)   3.acquireQueued(xxx)

第一个方法tryAcquire再次尝试获取锁(非公平锁的 tryAcquirenonfairTryAcquire(acquires)),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次尝试 CAS 抢占(体现非公平性,不检查队列);如果不是0,说明被抢占了,判断持有锁的线程是不是当前线程,如果是(体现可重入性),更新state,如果不是返回false。

然后,tryAcquire失败【返回false】,调用addWaiter(Node.EXCLUSIVE) 会将当前线程包装成一个独占模式的 Node 节点加入 AQS 队列。接着 acquireQueued 会使线程在队列中自旋等待,不断尝试获取锁或被唤醒后尝试获取,直到成功。

接下来分析一下lock.unlock()方法,这个就在代码里面注释解释了

//ReentrantLock.java
public void unlock() {
    //调用其内部同步器 sync 的 release 方法:
    sync.release(1); // ===========1
}
// 内部抽象类Sync
protected final boolean tryRelease(int releases) { // ===========3
    int c = getState() - releases; // 减少同步状态值(释放一次锁,`state` 减 1)
    // 检查当前线程是否为锁的持有者,不是则抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {// 当 `state` 减为 0 时,完全释放锁
        free = true;
        setExclusiveOwnerThread(null);// 清除独占锁的线程引用
    }
    setState(c);// 更新 `state` 值
    return free;// 返回是否完全释放锁
}

//AbstractQueuedSynchronizer.java --- acquire(1)
public final boolean release(int arg) {
    //tryRelease 尝试释放锁,由子类实现具体逻辑
    if (tryRelease(arg)) { // ===========2
        Node h = head;// 获取等待队列头节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);// 唤醒后继节点 ===========4
        return true;
    }
    return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {  ===========5
    int ws = node.waitStatus;
    if (ws < 0) // 将头节点的 `waitStatus` 设为 0(取消之前的状态)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next; // 找到头节点的后继节点
    if (s == null || s.waitStatus > 0) { // 若后继节点为空或已取消,从队尾向前找第一个非取消节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 唤醒找到的节点对应的线程
        LockSupport.unpark(s.thread);
}

在内部抽象类Sync中的tryRelease中:

  1. 减少 stateReentrantLock 支持重入,state 记录锁的重入次数。每次调用 unlock()state 减 1。
  2. 检查线程所有权:确保只有锁的持有者才能释放锁,否则抛出 IllegalMonitorStateException
  3. 完全释放锁:当 state 减为 0 时,将 setExclusiveOwnerThread(null),表示锁已完全释放,返回 true

ReentrantLock支持重入:【其实从名字就可以看出来了 — Reentrant(再进去的、就是可重入嘛)】

ReentrantLock lock = new ReentrantLock();
lock.lock();  // state=1,线程持有锁
lock.lock();  // state=2(重入)
lock.unlock(); // state=1(未完全释放)
lock.unlock(); // state=0,释放锁并唤醒等待线程
– 公平锁

那么ReentrantLock的公平锁是什么样子的呢?其实大致步骤都差不多,主要是在FairSync.java

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //公平锁会先检查等待队列是否有前驱节点,若有则不能抢锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false; 
}
public final boolean hasQueuedPredecessors() {
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

公平锁在 state == 0 时,会先通过 hasQueuedPredecessors 检查等待队列。若有其他线程在排队,则当前线程不能抢占,必须入队等待,保证 “先来先得”,体现了公平性。而非公平锁跳过这一步,直接抢锁,这就是非公平性的核心体现。

2) 读写锁

//ReadWriteLock 维护一对关联的锁,一个用于只读作,一个用于写入。只要没有写入器,多个读取器线程就可以同时持有读取锁。
//写锁是独占的。读写锁允许在访问共享数据时实现比互斥锁允许的更高级别的并发。它利用了这样一个事实,
//即虽然一次只有一个线程(写入线程)可以修改共享数据,但在许多情况下,任意数量的线程都可以同时读取数据(因此是读取线程)。
//从理论上讲,与使用互斥锁相比,使用读写锁允许的并发性增加将导致性能改进。
public interface ReadWriteLock {
    //返回用于读取的锁。
    Lock readLock();
	//返回用于写入的锁。
    Lock writeLock();
}

读写锁是否会比使用互斥锁提高性能 取决于读取数据的频率与修改数据的频率、读取和写入作的持续时间以及数据的争用 – 即尝试同时读取或写入数据的线程数。例如,最初填充数据,此后不经常修改的集合;经常搜索(例如某种目录)是使用读写锁的理想候选者。但是,如果更新变得频繁,则数据将花费大部分时间进行独占锁定,并发性几乎没有增加。只有分析和测量才能确定使用读写锁是否适合您的应用程序。

读写锁允许多个线程同时读(没有写入时,多个线程允许同时读(提高性能)),但只要有一个线程在写,其他线程就必须等待(只允许一个线程写入(其他线程既不能写入也不能读取))。也就是读读不冲突、读写就要冲突了。

– ReadWriteLock

下面给出一个简单示例:ReadWriteLock

public class ReadWriteLockTest2 {
    private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static final Lock readLock = readWriteLock.readLock();
    private static final Lock writeLock = readWriteLock.writeLock();
    private static int[] a = new int[10];
    public static void main(String[] args) throws InterruptedException {
        // 1个线程写
        new Thread(ReadWriteLockTest2::write).start();
        for (int i = 0; i < 9; i++)  // 10个线程读
            new Thread(()-> System.out.println(get())).start();
        Thread.sleep(2000);
    }
    public static Object get() {
       readLock.lock();
        try {
            Thread.sleep(100);
            return a[1];
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            readLock.unlock();
        }
    }
    public static void write() {
        writeLock.lock();
        try {
            a[1]++;
            System.out.println("写进行~~~");
            Thread.sleep(1000);
            System.out.println("写ok~~~");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            writeLock.unlock();
        }
    }
}

写操作在执行的时候,读线程是会阻塞的。但是10个读线程之间并没有阻塞

– StampedLock

StampedLock对比 ReentrantReadWriteLock 有所增强,在原先读写锁的基础之上新增了乐观读的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。(乐观锁和悲观锁)

StampedLock具有三种控制读/写访问的模式:

1、写入(Writing):方法writeLock可能阻塞等待独占访问,并返回一个戳,该戳可在方法unlockWrite中使用以释放锁。还提供了tryWriteLock的非定时和定时版本。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。

2、读取(Reading):方法readLock可能会阻塞等待非独占访问,并返回一个戳,该戳可在方法unlockRead中使用以释放锁。还提供了tryReadLock的非定时和定时版本。

3、乐观读取(Optimistic Reading):tryOptimisticRead方法返回一个非0的stamp,只有当前同步状态没有被写模式所占有是才能获取到。他是在获取stamp值后对数据进行读取操作,最后验证该stamp值是否发生变化,如果发生变化则读取无效,代表有数据写入。这种方式能够降低竞争和提高吞吐量。

简单示例:

public class StampedLockTest {
    private static final StampedLock stampedLock = new StampedLock();
    private static double x = 1.0;
    private static double y = 1.0;
    public static void main(String[] args) {
        // 1. 一个线程写
        new Thread(() -> addXY(1, 1)).start();
        // 2. 10个线程读
        for (int i = 0; i < 10; i++) {
            new Thread(() -> System.out.println(getSArea())).start();
        }
    }
    private static double getSArea() {
        // 乐观读
        long stamp = stampedLock.tryOptimisticRead();
        double s1 = x * y;
        // 验证一下
        if (!stampedLock.validate(stamp)) { // 验证失败
            stamp = stampedLock.readLock(); // 升级为读锁
            try {
                s1 = x * y;
            } finally {
                stampedLock.unlockRead(stamp);
            }
        }
        return s1;
    }
    private static void addXY(double a, double b) {
        long stamp = stampedLock.writeLock();
        try {
            System.out.println("写进行~~");
            x += a;
            y += b;
            Thread.sleep(1000);
            System.out.println("写ok~~");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            stampedLock.unlockWrite(stamp);
        }
    }
}

写操作:writeLock() 返回一个 stamp(时间戳),表示获取写锁成功。写锁是独占的,获取时会阻塞所有读锁和其他写锁(除了乐观读)。通过 unlockWrite(stamp) 释放写锁,stamp 必须与获取时的一致,否则抛出异常。

乐观读: tryOptimisticRead():获取一个 乐观读时间戳,不实际加锁,直接读取数据(成本极低),然后validate(stamp):检查该时间戳对应的读操作期间是否有写操作发生。若 stamp 有效(无写操作),则数据一致;否则需要升级为读锁。锁升级:若验证失败,说明数据可能被修改,通过 readLock() 获取读锁(阻塞直到写锁释放),确保后续读取的数据是最新的。

– 可重入性探讨

JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁

本小节看一下上面两种读写锁的可重入性,首先是ReadWriteLock,从他的实现类来看就是可重入的了【ReentrantReadWriteLock】

ReentrantReadWriteLock中也有Sync的抽象内部类,当调用写锁的lock时,实际是会经过里面重写的tryAcquire,从下面的代码可以知道同一线程可多次获取写锁:当线程获取写锁后,再次调用 writeLock() 会直接成功(无需等待),因为内部维护了一个 重入计数器(类似 ReentrantLock)。每次获取写锁时计数器加 1,释放时减 1,计数器为 0 时才真正释放锁。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
           ....
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    ....
    return true;
}

ReentrantReadWriteLock 基于 AQS(AbstractQueuedSynchronizer) 实现,通过 state 变量的高 16 位和低 16 位分别记录 读锁的共享次数写锁的重入次数

  • 写锁(独占模式):使用低 16 位记录当前线程的重入次数(和 ReentrantLock 类似)。
  • 读锁(共享模式):使用高 16 位记录所有线程的读锁获取次数,但会通过线程本地变量(ThreadLocal)记录当前线程的读锁重入次数,避免不同线程的计数干扰。

这种设计使得同一线程多次获取写锁或在读锁 / 写锁之间按规则重入时,不会出现死锁,符合可重入锁的定义。

public static void write() {
    writeLock.lock();
    try {
        writeLock.lock();
        a[1]++;
        System.out.println("写进行~~~");
        Thread.sleep(1000);
        System.out.println("写ok~~~");
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        writeLock.unlock(); // 可重入
        writeLock.unlock();
    }
}

StampedLock是不可重入的,为什么呢?

  1. 没有锁计数机制:StampedLock 并没有像 ReentrantLock 那样维护一个锁的重入计数。在 ReentrantLock 中,state 变量用于记录锁的重入次数,每次获取锁时 state 加 1,释放锁时 state 减 1。而 StampedLock 中的 state 变量主要用于表示锁的状态和版本信息,并非用于记录重入次数。

  2. 如果一个线程已经持有了 StampedLock 的写锁或读锁,再次尝试获取相同类型的锁时,会出现以下情况:

    • 写锁情况:如果线程已经持有写锁,再次调用 writeLock() 方法,由于写锁是独占的,该线程会被阻塞,因为它会等待自己释放写锁后才能再次获取,这显然会导致死锁。
    • 读锁情况:如果线程已经持有读锁,再次调用 readLock() 方法,虽然读锁是共享的,但 StampedLock 并不会像可重入锁那样允许线程多次获取而不产生问题。而且如果在持有读锁的情况下尝试获取写锁,会导致死锁,因为写锁需要独占资源,而当前线程已经持有了读锁。
private static void addXY(double a, double b) {
    long stamp = stampedLock.writeLock();
    try {
        long lock = stampedLock.writeLock(); // 不可重入
        System.out.println("写进行~~");
        x += a;
        y += b;
        Thread.sleep(1000);
        System.out.println("写ok~~");
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stampedLock.unlockWrite(stamp);
    }
}

③ 锁案例

上面只掌握了一丢丢的理论,没有实践怎么行呢?

1) 交替打印

第一个,我们来实现一下三个线程交替打印A B C试试,第一个线程打印A,第二个B,第三个C

// synchronized实现
public class PrintABCSynchronized {
    private int now = 1;
    public static void main(String[] args) {
        PrintABCSynchronized obj = new PrintABCSynchronized();
        new Thread(obj::printA).start();
        new Thread(obj::printB).start();
        new Thread(obj::printC).start();
    }
    public void printA() {
        for (int i = 0; i < 10; i++) {
            synchronized (this) {
                while ( now != 1 ) { // 为什么用while,不用if?留给读者思考
                    try {this.wait();}
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("A"); now = 2;
                this.notifyAll();
            }
        }
    }
    public void printB() {
        for (int i = 0; i < 10; i++) {
            synchronized (this) {
                while ( now != 2 ) { 
                    try {this.wait();}
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("B"); now = 3;
                this.notifyAll();
            }
        }
    }
    public void printC() {
        for (int i = 0; i < 10; i++) {
            synchronized (this) {
                while ( now != 3 ) {
                    try {this.wait();}
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("C"); now = 1;
                this.notifyAll();
            }
        }
    }
}

wait – notify 【这个只能用在synchronized同步代码块中,是属于Object的方法】上面的缺陷很严重,那就是一下子就唤醒了所有挂起的线程,其实有的线程根本就不用唤醒,有没有一种办法,就是我想唤醒谁就唤醒谁呢?

public class PrintABCLock {
    private Lock lock = new ReentrantLock();
    private Condition a = lock.newCondition();
    private Condition b = lock.newCondition();
    private Condition c = lock.newCondition();
    private int flag = 1;
    public static void main(String[] args) {
        PrintABCLock obj = new PrintABCLock();
        new Thread(obj::printA).start();
        new Thread(obj::printB).start();
        new Thread(obj::printC).start();
    }
    public void printA()  {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                while ( flag != 1 ) a.await();
                System.out.println('A'); flag = 2;
                b.signal();
            }
        } catch ( InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
    }
    public void printB() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                while ( flag != 2 ) b.await();
                System.out.println('B'); flag = 3;
                c.signal();
            }
        } catch ( InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
    }
    public void printC() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                while ( flag != 3 ) c.await();
                System.out.println('C'); flag = 1;
                a.signal();
            }
        } catch ( InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
    }
}

2) 阻塞队列

下面模仿jdk中ArrayBlockingQueue的源码,给了一个简洁的阻塞队列

class TBlockedQueue<T> {
    private final Lock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    private final int capacity;
    private final LinkedList<T> list;
    public TBlockedQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException("Capacity 不能小于1");
        this.capacity = capacity;
        list = new LinkedList<>();
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    public void add( T t ) {
        list.addLast(t);
    }

    // 1. 入队 -- 当队列已满时,向队列中添加元素的操作会被阻塞,直到队列有空间可用。
    public void put( T t ) {
        if (t == null) throw new NullPointerException();
        lock.lock();
        try {
            while (list.size() == capacity) notFull.await();
            list.addLast(t);
            notEmpty.signal();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
    // 2. 出队 -- 当队列为空时,从队列中获取元素的操作会被阻塞,直到队列中有新元素加入
    public T take() {
        lock.lock();
        try {
            while (list.isEmpty()) notEmpty.await();
            T t = list.removeFirst();
            notFull.signal();
            return t;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

这里自定义的 TBlockedQueue 是一个典型的 有界阻塞队列,其核心思路是通过 锁(Lock)和条件变量(Condition) 实现线程间的同步与协调,确保在多线程环境下对队列的操作是安全的。通过 lock.lock()lock.unlock() 包裹对共享资源 list 的操作,确保同一时刻只有一个线程修改队列。同时,使用 while 循环检查条件(如 list.size() == capacity),防止 虚假唤醒导致条件不满足时错误地继续执行。

  • notEmpty:当队列为空时,take 操作会等待此条件;当有元素入队时,通过 signal() 唤醒等待的消费者线程。
  • notFull:当队列已满时,put 操作会等待此条件;当有元素出队时,通过 signal() 唤醒等待的生产者线程。

3) AQS自定义锁

前面分析可以知道ReentrantLock是以AQS为基础框架来实现的,那么,此节我们自定义来实现一个锁。

见 “Java并发探索–下篇”

4.探索并发工具

5.虚拟线程

见 “Java并发探索–下篇” — 在下面找

【博客园】

https://www.cnblogs.com/jackjavacpp

【CSDN】

https://blog.csdn.net/okok__TXF

end.参考

  1. https://blog.csdn.net/agonie201218/article/details/128712507
  2. https://blog.csdn.net/xu_yong_lin/article/details/117521773
  3. https://www.cnblogs.com/java-bible/p/13930006.html
  4. https://blog.csdn.net/fighting_yu/article/details/89473175
  5. https://tech.meituan.com/2018/11/15/java-lock.html
  6. https://blog.csdn.net/weixin_44772566/article/details/137398521
  7. https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized详解】
  8. https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方网站— 神中神】

来源链接:https://www.cnblogs.com/jackjavacpp/p/18852416

© 版权声明
THE END
支持一下吧
点赞10 分享
评论 抢沙发
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码快捷回复

    暂无评论内容