坪钗 发表于 2025-5-31 23:44:43

并发编程--上篇

Java并发探索--上篇

1.基本概念


[*]线程与进程:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。
[*]并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
[*]同步与异步:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。
“Java并发探索--下篇” --- 在下面找
【博客园】
https://www.cnblogs.com/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
2.探索线程的创建

①线程的状态

从Thread源码里面看出
public enum State {
        // 尚未启动的线程的线程状态。
    NEW,
        // 就绪
    RUNNABLE,
        // 等待监视器锁的线程的线程状态
    BLOCKED,
        /*
        等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
        Object.wait() 没有超时
        Thread.join() 没有超时
        LockSupport.park()
        */
    WAITING,
        /*
        指定等待时间的等待线程的线程状态
        线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
        Thread.sleep
    Object.wait with timeout
    Thread.join with timeout
    LockSupport.parkNanos
    LockSupport.parkUntil
        */
    TIMED_WAITING,
        //终止线程的线程状态。线程已完成执行。
    TERMINATED;
}下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】
在Java中,一个Thread有大致六个状态。
线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 RUNNABLE(就绪) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。
明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。
②线程创建

1)两种基本方式


[*]继承Thread类,重写run方法
public class MyThread1 extends Thread {
    @Override
    public void run() {
      System.out.println(Thread.currentThread().getName() + ": hello world");
    }
}
public class JUCMain {
    public static void main(String[] args) {
      new MyThread1().start();
    }
}

[*]实现Runnable接口,传入Thread
public class Runnable1 implements Runnable{
    @Override
    public void run() {
      System.out.println("hello world, Runnable");
    }
}
public class JUCMain {
    public static void main(String[] args) {
      new Thread(new Runnable1()).start();
    }
}网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。
首先从start()方法看起(这个方式属于Thread类的)。调用start()后,JVM会创建一个新线程并执行该线程的run()方法。注意:直接调用run()不会启动新线程,而是在当前线程中执行。
// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {
    // 零状态值对应于状态 “NEW”
    // 线程想要start,必须是为0的状态
    if (threadStatus != 0)
      throw new IllegalThreadStateException();
    /*
    group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
    同时线程组的未启动线程计数会减1。
    */
    group.add(this);
    boolean started = false;
    try {
      start0(); //关键!调用本地方法(native)
      started = true;
    } finally {
      try {
            if (!started) { //启动失败时回滚
                //如果 started 为 false,说明线程启动失败,
                //调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
                group.threadStartFailed(this);
            }
      } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
      }
    }
}
//========== native
private native void start0();那么执行的是run()方法,run方法里面是啥呢
private Runnable target; // target是Runnable类型

@Override
public void run() {
    if (target != null) {
      target.run();
    }
}如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。
如果是实现的Runnable接口,new Thread(new Runnable1())的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。
2) 其他创建方式

.lambda


[*]lambda表达式创建:这个仅仅是写法不同而已。因为Runnable是个函数式接口
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}.callable


[*]Callable创建的方式
public class MyCall implements Callable<String> {
    @Override
    public String call() throws Exception {
      Thread.sleep(2000);
      return "Hello Callable";
    }
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> task = new FutureTask<>(new MyCall());
    new Thread(task).start();
    System.out.println(task.get());
}new Thread(Runnable runnable)要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.
从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。
Future
Future 接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future 对象获取。
// 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future<V> {

    //尝试取消异步任务的执行。
    /*
    如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
    如果任务成功取消,则返回 true。
    */
    boolean cancel(boolean mayInterruptIfRunning);

    //如果任务在完成之前被取消,则返回 true;否则返回 false。
    boolean isCancelled();

    //如果任务已经完成,则返回 true;否则返回 false。
    boolean isDone();

    //获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
    V get() throws InterruptedException, ExecutionException;

    //获取异步任务的计算结果,并且可以指定一个超时时间。
    //如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
    V get(long timeout, TimeUnit unit)
      throws InterruptedException, ExecutionException, TimeoutException;
}RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
    // 很简单嘛,这个是来自Runnable的
    void run();
}这个接口就相当于组合了Runnable和Future,能够获取到返回值了。
FutureTask 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。
public class FutureTask<V> implements RunnableFuture<V> {
    // 基本属性
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL= 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED= 6;
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** 结果 */
    private Object outcome;
    /** The thread running the callable; CAS ed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
   
    // 看它的构造函数1
    public FutureTask(Callable<V> callable) {
      if (callable == null)
            throw new NullPointerException();
      this.callable = callable; // 赋值callable========
      this.state = NEW; // ensure visibility of callable
    }
    // 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
    public FutureTask(Runnable runnable, V result) {
      this.callable = Executors.callable(runnable, result);
      this.state = NEW;       // ensure visibility of callable
    }
    /*
    Executors::callable(xx, xx)方法==========
    public static <T> Callable<T> callable(Runnable task, T result) {
      if (task == null)
            throw new NullPointerException();
      return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
      final Runnable task;
      final T result;
      RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
      }
      public T call() {
            task.run(); // 调用Runnable的run()
            return result;
      }
    }
    */
   
    // run()方法 ---------------
    // new Thread(new FutureTask<>(new MyCall()))
    public void run() {
      if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                       null, Thread.currentThread()))
            return;
      try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                  //====调用callable.call()
                  result = c.call();
                  ran = true;
                } catch (Throwable ex) {
                   .........
                }
                // 如果运行OK了,设置结果!
                if (ran) set(result);
            }
      } finally {
            .............
      }
    }
   
    // 设置结果outcome
    protected void set(V v) {
      // https://www.cnblogs.com/jackjavacpp/p/18787832
      // 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
      if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v; // 这里
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
      }
    }
   
    // 比较核心的get方法================start
    public V get() throws InterruptedException, ExecutionException {
      int s = state;
      if (s <= COMPLETING) // 如果状态不是完成
            s = awaitDone(false, 0L); // 等待完成
      return report(s); // 返回结果
    }
    private int awaitDone(boolean timed, long nanos)
      throws InterruptedException {
                // 1.计算超时截止时间
      final long deadline = timed ? System.nanoTime() + nanos : 0L;
      WaitNode q = null;
      boolean queued = false;
      for (;;) { // 2.自旋循环等待任务完成
            // 2.1如果该线程中断了
            if (Thread.interrupted()) {
                removeWaiter(q);// 从等待队列中移除当前节点
                throw new InterruptedException();
            }
            // 2.2检查状态
            int s = state;
            // 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
            if (s > COMPLETING) {
                if (q != null)
                  q.thread = null;
                return s;// 返回最终状态
            }
            // 2.3若任务状态等于 COMPLETING,表明任务正在完成,
            // 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued) //将节点加入等待队列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                   q.next = waiters, q);
            else if (timed) { // 2.4如果是有时限的get()
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                  removeWaiter(q);
                  return state; // 返回状态
                }
                LockSupport.parkNanos(this, nanos);
            }
            else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
                LockSupport.park(this);
      }
    }
    private V report(int s) throws ExecutionException {
      Object x = outcome;
      if (s == NORMAL)
            return (V)x; // 返回outcome
       ......
    }
    //==================================end
}从上面的例子可以看出,大致有ExecutorService,Executors,newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor类。
接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,Executors只是一个工具类。
Executor是顶级接口
public class PoolMain {
    public static void main(String[] args) {
      // 创建一个线程池
      ExecutorService pool = Executors.newFixedThreadPool(1);
      long start = System.currentTimeMillis();
      // execute=============
      pool.execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("execute pool创建启动线程!");
      });
      // submit==============
      Future<Integer> future = pool.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("submit pool创建启动线程!");
            return 100;
      });
      try {
            System.out.println(future.get());
      } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
      }
      System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
      pool.shutdown();
    }
}ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
public interface ExecutorService extends Executor {    void shutdown();    List shutdownNow();   Future submit(Callable task);   Future submit(Runnable task, T result);    Future submit(Runnable task);    //....   List invokeAll(Collection
页: [1]
查看完整版本: 并发编程--上篇