并发编程--上篇
Java并发探索--上篇1.基本概念
[*]线程与进程:线程是程序执行的最小单位,而进程是系统进行资源分配和调度的基本单位。例如,一个 Java 程序可以包含多个线程,它们共享进程的资源。
[*]并发与并行:并发是指多个任务在同一时间段内执行,而并行是指多个任务在同一时刻执行。在多核 CPU 系统中,可以实现真正的并行。
[*]同步与异步:同步是指程序按照顺序依次执行,而异步是指程序在执行某个任务时,不需要等待该任务完成,可以继续执行其他任务。
“Java并发探索--下篇” --- 在下面找
【博客园】
https://www.cnblogs.com/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
2.探索线程的创建
①线程的状态
从Thread源码里面看出
public enum State {
// 尚未启动的线程的线程状态。
NEW,
// 就绪
RUNNABLE,
// 等待监视器锁的线程的线程状态
BLOCKED,
/*
等待线程的线程状态,线程由于调用以下方法之一而处于等待状态:
Object.wait() 没有超时
Thread.join() 没有超时
LockSupport.park()
*/
WAITING,
/*
指定等待时间的等待线程的线程状态
线程处于定时等待状态,因为调用了以下方法之一,并指定等待时间:
Thread.sleep
Object.wait with timeout
Thread.join with timeout
LockSupport.parkNanos
LockSupport.parkUntil
*/
TIMED_WAITING,
//终止线程的线程状态。线程已完成执行。
TERMINATED;
}下面看一张图,很清楚的解释了各状态之间的关系:【节选自https://blog.csdn.net/agonie201218/article/details/128712507】
在Java中,一个Thread有大致六个状态。
线程创建之后(new Thread)它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 RUNNABLE(就绪) 状态。可运行状态的线程获得了 CPU 时间片后就处于 RUNNING(运行) 状态。
明白了线程的运行状态,接下来让我们来看一下在爪哇里面如何创建并且启动线程。
②线程创建
1)两种基本方式
[*]继承Thread类,重写run方法
public class MyThread1 extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": hello world");
}
}
public class JUCMain {
public static void main(String[] args) {
new MyThread1().start();
}
}
[*]实现Runnable接口,传入Thread
public class Runnable1 implements Runnable{
@Override
public void run() {
System.out.println("hello world, Runnable");
}
}
public class JUCMain {
public static void main(String[] args) {
new Thread(new Runnable1()).start();
}
}网上还传有其他创建线程的方式,比如: Callable接口,重写call,结合FutureTask;线程池;lambda表达式等等。。。诚然,这也确实是创建线程启动的方式不错。但是本文毕竟是探索性质的文章,我们要探索其本质。
首先从start()方法看起(这个方式属于Thread类的)。调用start()后,JVM会创建一个新线程并执行该线程的run()方法。注意:直接调用run()不会启动新线程,而是在当前线程中执行。
// 启动线程并触发 JVM 创建原生线程
// synchronized后面解释【见 探索“锁”】
public synchronized void start() {
// 零状态值对应于状态 “NEW”
// 线程想要start,必须是为0的状态
if (threadStatus != 0)
throw new IllegalThreadStateException();
/*
group 是线程所属的线程组。这行代码将当前线程实例添加到线程组中,
同时线程组的未启动线程计数会减1。
*/
group.add(this);
boolean started = false;
try {
start0(); //关键!调用本地方法(native)
started = true;
} finally {
try {
if (!started) { //启动失败时回滚
//如果 started 为 false,说明线程启动失败,
//调用 group.threadStartFailed(this) 方法通知线程组该线程启动失败。
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//========== native
private native void start0();那么执行的是run()方法,run方法里面是啥呢
private Runnable target; // target是Runnable类型
@Override
public void run() {
if (target != null) {
target.run();
}
}如果继承Thread类后,重写run()方法,那么run方法就会覆盖上面的方法。
如果是实现的Runnable接口,new Thread(new Runnable1())的时候,就会把target赋值,然后调用run()方法的时候,就执行的是target的run方法了。
2) 其他创建方式
.lambda
[*]lambda表达式创建:这个仅仅是写法不同而已。因为Runnable是个函数式接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}.callable
[*]Callable创建的方式
public class MyCall implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "Hello Callable";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new MyCall());
new Thread(task).start();
System.out.println(task.get());
}new Thread(Runnable runnable)要求传的类型是Runnable,但是现在传的是FutureTask。所以先来看一看FutureTask和Runnable之间有什么联系.
从上面可以看到,FutureTask实现了RunnableFuture接口,然后RunnableFuture接口继承了Future和Runnable两个接口。
Future
Future 接口是 Java 并发编程中的一个重要接口,位于 java.util.concurrent 包下,它代表了一个异步计算的结果。异步计算意味着在调用方法后,程序不会立即等待结果返回,而是可以继续执行其他任务,当结果准备好时,再通过 Future 对象获取。
// 这里使用了泛型 <V>,表示该 Future 对象所代表的异步计算结果的类型。
public interface Future<V> {
//尝试取消异步任务的执行。
/*
如果任务已经完成、已经被取消或者由于其他原因无法取消,则返回 false;
如果任务成功取消,则返回 true。
*/
boolean cancel(boolean mayInterruptIfRunning);
//如果任务在完成之前被取消,则返回 true;否则返回 false。
boolean isCancelled();
//如果任务已经完成,则返回 true;否则返回 false。
boolean isDone();
//获取异步任务的计算结果。如果任务还未完成,调用该方法的线程会被阻塞,直到任务完成。
V get() throws InterruptedException, ExecutionException;
//获取异步任务的计算结果,并且可以指定一个超时时间。
//如果在指定的时间内任务还未完成,调用该方法的线程会被阻塞,直到任务完成或者超时。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
// 很简单嘛,这个是来自Runnable的
void run();
}这个接口就相当于组合了Runnable和Future,能够获取到返回值了。
FutureTask 既然要把它当做参数传进Thread的构造函数,那么想必它肯定是实现了run方法的。
public class FutureTask<V> implements RunnableFuture<V> {
// 基本属性
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL= 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED= 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** 结果 */
private Object outcome;
/** The thread running the callable; CAS ed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 看它的构造函数1
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 赋值callable========
this.state = NEW; // ensure visibility of callable
}
// 构造函数2 ==== 本质还是把Runnable加了一层,给封装成Callable了
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
/*
Executors::callable(xx, xx)方法==========
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run(); // 调用Runnable的run()
return result;
}
}
*/
// run()方法 ---------------
// new Thread(new FutureTask<>(new MyCall()))
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//====调用callable.call()
result = c.call();
ran = true;
} catch (Throwable ex) {
.........
}
// 如果运行OK了,设置结果!
if (ran) set(result);
}
} finally {
.............
}
}
// 设置结果outcome
protected void set(V v) {
// https://www.cnblogs.com/jackjavacpp/p/18787832
// 使用CAS --- 【见上一篇文章 java map & CAS & AQS】
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 这里
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 比较核心的get方法================start
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如果状态不是完成
s = awaitDone(false, 0L); // 等待完成
return report(s); // 返回结果
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 1.计算超时截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) { // 2.自旋循环等待任务完成
// 2.1如果该线程中断了
if (Thread.interrupted()) {
removeWaiter(q);// 从等待队列中移除当前节点
throw new InterruptedException();
}
// 2.2检查状态
int s = state;
// 任务已终态(NORMAL, EXCEPTIONAL, CANCELLED)
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;// 返回最终状态
}
// 2.3若任务状态等于 COMPLETING,表明任务正在完成,
// 此时调用 Thread.yield() 方法让当前线程让出 CPU 时间片,等待任务完成。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued) //将节点加入等待队列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 2.4如果是有时限的get()
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state; // 返回状态
}
LockSupport.parkNanos(this, nanos);
}
else //若没有设置超时时间,就调用 LockSupport.park 方法让当前线程无限期阻塞,直到被唤醒。
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x; // 返回outcome
......
}
//==================================end
}从上面的例子可以看出,大致有ExecutorService,Executors,newFixedThreadPool()方法本质是 new ThreadPoolExecutor(),故还有一个ThreadPoolExecutor类。
接下来梳理一下这些类背后的关系。【通过idea得到下面的关系图】此外,Executors只是一个工具类。
Executor是顶级接口
public class PoolMain {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(1);
long start = System.currentTimeMillis();
// execute=============
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("execute pool创建启动线程!");
});
// submit==============
Future<Integer> future = pool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("submit pool创建启动线程!");
return 100;
});
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main线程执行时间:" + (System.currentTimeMillis() - start));
pool.shutdown();
}
}ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
public interface ExecutorService extends Executor { void shutdown(); List shutdownNow(); Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); //.... List invokeAll(Collection
页:
[1]