找回密码
 立即注册
首页 业界区 业界 Disruptor—2.并发编程相关简介

Disruptor—2.并发编程相关简介

粒浊 2025-6-3 00:17:47
大纲
1.并发类容器
2.volatile关键字与内存分析
3.Atomic系列类与UnSafe类
4.JUC常用工具类
5.AQS各种锁与架构核心
6.线程池的最佳使用指南
 
1.并发类容器
(1)ConcurrentMap
(2)CopyOnWrite容器
(3)ArrayBlockingQueue
(4)LinkedBlockingQueue
(5)SynchronousQueue
(6)PriorityBlockingQueue
(7)DelayQueue
 
(1)ConcurrentMap
ConcurrentMap是Map的子接口,是高并发下线程安全的Map集合。ConcurrentMap有两个实现类:
  1. ConcurrentHashMap
  2. ConcurrentSkipListMap
复制代码
ConcurrentHashMap采取了分段锁的技术来细化锁的粒度,把整个Map划分为一系列Segment的组成单元,一个Segment相当于一个小的HashTable。
 
ConcurrentSkipListMap的底层是通过跳表来实现的,跳表是一个链表,其插入、读取数据的复杂度为O(logn)。
 
(2)CopyOnWrite容器
CopyOnWrite容器即写时复制的容器。当我们往一个容器中添加元素时,不直接往容器中添加,而是将当前容器进行copy,复制出来一个新的容器。然后向新容器中添加需要的元素,最后将原容器的引用指向新容器。这样做的好处是:可在并发场景下对容器进行读操作而无需加锁,从而实现读写分离。
 
Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,分别是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器适用于读多写少 + 元素不会特别多的场景。
 
(3)ArrayBlockingQueue
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用ReentrantLock实现线程安全。
 
(4)LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列。默认情况下,该阻塞队列的大小为Integer.MAX_VALUE。由于这个数值特别大,所以LinkedBlockingQueue也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长。但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,在使用LinkedBlockingQueue时建议手动传一个队列的大小。
 
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。并且采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁。也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
 
(5)SynchronousQueue
SynchronousQueue是无缓冲阻塞队列,用来在两个线程之间移交元素。它并不是真正的队列,不维护存储空间,而维护一组线程,这些线程在等待放入或移出元素。
 
SynchronousQueue是一种极为特殊的阻塞队列,它没有实际的容量。任意线程都会等待获取到数据或者交付完数据才会返回,这里任意线程指的是生产者线程或者消费者线程。生产类型的操作比如put、offer,消费类型的操作比如poll、take。一个生产者线程的使命是将线程中的数据交付给另一个消费者线程,而一个消费者线程则是等待一个生产者线程中的数据。
 
(6)PriorityBlockingQueue
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最好或最低的元素,内部是平衡二叉树的实现。
 
(7)DelayQueue
DelayQueue是一个无界阻塞队列,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长,注意不能将null元素放置到这种队列中。
 
2.volatile关键字与内存分析
(1)volatile关键字的作用
(2)volatile的内存分析
 
(1)volatile关键字的作用
作用一:多线程间的可见性
作用二:阻止指令重排序
 
那些支持热部署、支持动态更新的框架会大量使用volatile关键字。此外,ZooKeeper的Watcher机制也可以支持对配置的动态感知。
 
(2)volatile的内存分析
1.png
 
3.Atomic系列类与UnSafe类
(1)Atomic与UnSafe类的关系
(2)UnSafe类的四大作用
 
(1)Atomic与UnSafe类的关系
Atomic系列类提供了原子性操作,保证了多线程下的并发安全,Atomic底层是通过调用UnSafe类的CAS方法来实现原子性操作的。
 
由于UnSafe类可以直接访问操作系统底层硬件,而Java没办法直接访问操作系统底层,所以需借助UnSafe类来实现CAS。
 
(2)UnSafe类的四大作用
作用一:进行内存操作,比如UnSafe类的allocateMemory()和freeMemory()方法等。
 
作用二:字段的定位与修改,比如UnSafe类的getInt()和putInt()方法等。
 
作用三:挂起与恢复线程,比如JDK的LockSupport类,会通过UnSafe类的park()和unpark()方法实现挂起和恢复线程。
 
作用四:CAS操作(乐观锁),比如UnSafe类的compareAndSwapObject()系列方法就实现了CAS操作。
 
4.JUC常用工具类
(1)CountDownLatch和CyclieBarrier
(2)Future模式与Callable接口
(3)Exchanger线程数据交换器
(4)ForkJoin并行计算
(5)Semaphore信号量
 
(1)CountDownLatch和CyclieBarrier
CountDownLatch主要是阻塞一个线程,即阻塞当前的线程,然后调用countdown()方法减到0时就会唤醒阻塞的当前线程。
 
CountDownLatch和CyclicBarrier的区别:CountDownLatch是一次的阻塞、一个线程的阻塞。比如new一个CountDownLatch时设置为3,当前线程调用await()方法被阻塞,那么必须有线程总共调用3次countdown()方法,当前线程才能继续往下执行。
 
CyclieBarrier也可以设置一个阈值,比如这个阈值设置为5。有5个线程都调用await()方法,当这5个线程全部准备就绪后才一起往下。
 
(2)Future模式与Callable接口
Future模式用于让当前线程异步去提交工作,然后进行等待,等到负责异步回调的线程真正执行完之后,当前线程通过调用get()方法就能获取到数据。
 
(3)Exchanger线程数据交换器
Exchanger可以实现两个线程间的数据的交换,Exchanger通常用于一些对账场景。比如线程A和线程B同时接收同样的数据,进行磁盘IO读写等操作。线程A和线程B执行完都有一个结果,可通过Exchanger对比结果是否一致。
 
(4)ForkJoin并行计算
ForkJoin的核心就是通过递归拆分将一个大任务拆分成若干的小任务,然后对这些拆分的小任务进行并行计算,接着通过join()整合计算结果,最后计算出统计的结果。
 
(5)Semaphore信号量
Semaphore可以控制能够同时进行并发访问的线程数量,比如设置了Semaphore最多允许5个线程可以并发进行访问。如果出现20个线程,那么会有15个线程进入Semaphore的AQS对列。
 
5.AQS各种锁与架构核心
(1)ReentrantLock重入锁
(2)ReentrantReadWriteLock读写锁
(3)Condition条件判断
(4)LookSupport基于线程的锁
(5)AQS架构简介
(6)ReentrantLock底层原理简介
(7)CountDownLatch底层原理简介
 
(1)ReentrantLock重入锁
ReentrantLock是基于AQS的,AQS的两大核心:一是共享变量state,二是CLH等待队列。重入指的是同一个线程可以反复获得这个ReentrantLock。
 
Object锁要配合Synchronized + wait() + notify()来实现线程间的阻塞唤醒,而且需要注意的是:wait()方法会释放锁,notify()方法不会释放锁。
 
ReentrantLock可以通过Condition来实现线程间的阻塞和唤醒,比如使用Condition的await()方法和signal()方法,而且需要注意的是:await()方法会释放锁,signal()方法不会释放锁。
 
(2)ReentrantReadWriteLock读写锁
ReentrantReadWriteLock也是基于AQS的,可以实现读写分离。
 
(3)Condition条件判断
Condition条件判断是配合ReentrantLock或者ReentrantReadWriteLock使用的,可以使用类似于Synchronized锁下的wait()方法和notify()方法进行线程阻塞与唤醒。
 
(4)LookSupport基于线程的锁
LookSupport有两个关键的方法:park()和unpark()。LookSupport的这两个方法会调用UnSafe类中的park()和unpark()方法,分别用来挂起指定线程和唤醒指定线程。
 
注意:LookSupport唤醒和挂起线程的顺序并没有先后关系,也就是对于同一个线程,先执行unpark()再执行park()也不会影响。
 
(5)AQS架构简介
一.AQS维护了一个state和一个线程等待队列
其中volatile int state代表着共享资源,多线程争用资源被阻塞时会进入一个FIFO线程等待队列。
 
二.AQS定义了独占和共享两种资源处理方式
比如ReentrantLock使用的是Exclusive独占的方式,Semaphore使用的是Share共享的方式。
 
三.AQS的核心方法
isHeldExclusively()方法:
判断线程是否正在独占资源。
 
tryAcquire()和tryRelease()方法:
表示以独占的方式尝试获取和释放资源。
 
tryAcquireShared()和tryReleaseShared()方法:
表示以共享的方式尝试获取和释放资源。
 
(6)ReentrantLock底层原理简介
首先state会初始化为0,表示未锁定状态。当线程A调用ReentrantLock的lock()方法时,会触发调用tryAcquire()方法以独占方式获取锁并将state + 1。此后其他线程调用tryAcquire()方法时就会失败,直到线程A调用ReentrantLock的unlock()方法释放锁(将state减为0)为止。线程A在释放锁之前,可以重复获取锁,重复获取锁时,state会累加,这就是可重入的原理。但获取锁多少次(重入锁多少次/state累加了多少次),就要释放锁多少次,这样才能保证state能回到初始值为0的时候。
 
(7)CountDownLatch底层原理简介
如果任务分为N个子线程去执行,那么state会初始化为N。这N个子线程是并行执行的,每个子线程执行完就会countdown()一次。每countdown()一次,state就会通过CAS减1。等所有子线程都执行完后(即state=0),会调用unpark()方法唤醒线程,然后主线程从await()方法中返回,继续后面的处理。
  1. public class TestSynchronized {
  2.     public static void main(String[] args) throws Exception {
  3.         Object lock = new Object();
  4.         Thread threadA = new Thread(new Runnable() {
  5.             @Override
  6.             public void run() {
  7.                 System.out.println("线程A开始计算");
  8.                 int sum = 0;
  9.                 for (int i = 0; i < 10; i++) {
  10.                     sum += i;
  11.                 }
  12.                 synchronized (lock) {
  13.                     try {
  14.                         System.out.println("线程A先进行阻塞,等待被唤醒");
  15.                         //wait()会阻塞线程A,并且释放synchronized锁
  16.                         lock.wait();
  17.                     } catch (InterruptedException e) {
  18.                         e.printStackTrace();
  19.                     }
  20.                 }
  21.                 System.out.println("线程A计算出的结果sum: " + sum);
  22.             }
  23.         });
  24.         System.out.println("主线程启动线程A");
  25.         threadA.start();
  26.         Thread.sleep(2000);
  27.   
  28.         synchronized (lock) {
  29.             System.out.println("主线程唤醒线程A");
  30.             //唤醒阻塞的线程A
  31.             lock.notify();
  32.         }
  33.     }
  34. }
  35. //执行程序输出的结果如下:
  36. //主线程启动线程A
  37. //线程A开始计算
  38. //线程A先进行阻塞,等待被唤醒
  39. //主线程唤醒线程A
  40. //线程A计算出的结果sum: 45
  41. public class ConditionTest {
  42.     public static void main(String[] args) throws Exception {
  43.         ReentrantLock lock = new ReentrantLock();
  44.         Condition condition = lock.newCondition();
  45.         new Thread() {
  46.             @Override
  47.             public void run() {
  48.                 lock.lock();
  49.                 System.out.println("第一个线程加锁");
  50.                 int sum = 0;
  51.                 for (int i = 0; i < 10; i++) {
  52.                     sum += i;
  53.                 }
  54.                 try {
  55.                     System.out.println("第一个线程释放锁以及阻塞等待");
  56.                     condition.await();
  57.                     System.out.println("第一个线程被唤醒重新获取锁");
  58.                 } catch (Exception e) {
  59.                     e.printStackTrace();
  60.                 }
  61.                 System.out.println("第一个线程计算出的结果sum: " + sum);
  62.                 lock.unlock();
  63.                 System.out.println("第一个线程释放锁");
  64.             }
  65.         }.start();
  66.         Thread.sleep(3000);
  67.         new Thread() {
  68.             public void run() {
  69.                 lock.lock();
  70.                 System.out.println("第二个线程加锁");
  71.                 System.out.println("第二个线程唤醒第一个线程");
  72.                 condition.signal();
  73.                 lock.unlock();
  74.                 System.out.println("第二个线程释放锁");
  75.             }
  76.         }.start();
  77.     }
  78. }
  79. //执行程序输出的结果如下:
  80. //第一个线程加锁
  81. //第一个线程释放锁以及阻塞等待
  82. //第二个线程加锁
  83. //第二个线程唤醒第一个线程
  84. //第二个线程释放锁
  85. //第一个线程被唤醒重新获取锁
  86. //第一个线程计算出的结果sum: 45
  87. //第一个线程释放锁
  88. public class LockSupportTest {
  89.     public static void main(String[] args) throws Exception {
  90.         Thread threadA = new Thread(new Runnable() {
  91.             @Override
  92.             public void run() {
  93.                 System.out.println("线程A开始运行");
  94.                 int sum = 0;
  95.                 for (int i = 0; i < 10; i++) {
  96.                     sum += i;
  97.                 }
  98.                 System.out.println("挂起线程A");
  99.                 LockSupport.park();
  100.                 System.out.println("线程A被唤醒,输出计算出结果sum: " + sum);
  101.             }
  102.         });
  103.         threadA.start();
  104.         Thread.sleep(2000);
  105.         System.out.println("唤醒线程A");
  106.         LockSupport.unpark(threadA);
  107.     }
  108. }
  109. //执行程序输出的结果如下:
  110. //线程A开始运行
  111. //挂起线程A
  112. //唤醒线程A
  113. //线程A被唤醒,输出计算出结果sum: 45
  114. public class Review {
  115.     public static void main(String[] args) throws Exception {
  116.         Thread threadA = new Thread(new Runnable() {
  117.             @Override
  118.             public void run() {
  119.                 System.out.println("线程A开始运行");
  120.                 int sum = 0;
  121.                 for (int i = 0; i < 10; i++) {
  122.                     sum += i;
  123.                 }
  124.                 try {
  125.                     Thread.sleep(4000);
  126.                 } catch (InterruptedException e) {
  127.                     e.printStackTrace();
  128.                 }
  129.                 System.out.println("线程A然后再被挂起");
  130.                 LockSupport.park();
  131.                 System.out.println("线程A还是会被唤醒,输出计算出结果sum: " + sum);
  132.             }
  133.         });
  134.         threadA.start();
  135.         Thread.sleep(2000);
  136.         System.out.println("主线程先唤醒线程A");
  137.         LockSupport.unpark(threadA);
  138.     }
  139. }
  140. //执行程序输出的结果如下:
  141. //线程A开始运行
  142. //主线程唤醒线程A
  143. //挂起线程A
  144. //线程A被唤醒,输出计算出结果sum: 45
复制代码
 
6.线程池的最佳使用指南
(1)Excutors工厂类
(2)ThreadPoolExecutor自定义线程池
(3)如何确定线程池的线程数量
(4)如何正确使用线程池
 
(1)Excutors工厂类
Excutors提供了很多方法,比如newFixedThreadPool()等。但是不建议使用Excutors工厂类里的创建线程池方法,因为这些创建线程池的方法里很多都没有界限限制的,存在安全隐患。比如使用newFixedThreadPool()方法创建线程池时,没有限制阻塞队列长度。比如使用newCachedThreadPool()方法创建线程池时,没有限制线程数量。
 
(2)ThreadPoolExecutor自定义线程池
[code]public class ThreadPoolExecutor extends AbstractExecutorService {    ...    //@param corePoolSize:    //the number of threads to keep in the pool, even if they are idle,     //unless allowCoreThreadTimeOut is set    //@param maximumPoolSize:    //the maximum number of threads to allow in the pool    //@param keepAliveTime:    //when the number of threads is greater than the core,     //this is the maximum time that excess idle threads will wait for new tasks before terminating.    //@param unit:    //the time unit for the keepAliveTime argument    //@param workQueue:    //the queue to use for holding tasks before they are executed.    //This queue will hold only the Runnable tasks submitted by the execute method.    //@param threadFactory:    //the factory to use when the executor creates a new thread    //@param handler    //the handler to use when execution is blocked because the thread bounds and queue capacities are reached    public ThreadPoolExecutor(            //核心线程数            int corePoolSize,            //最大线程数            int maximumPoolSize,            //空闲线程的回收时间            long keepAliveTime,            TimeUnit unit,            //存放任务的队列            BlockingQueue workQueue,            //线程工厂            ThreadFactory threadFactory,            RejectedExecutionHandler handler) {        if (corePoolSize < 0 || maximumPoolSize

相关推荐

您需要登录后才可以回帖 登录 | 立即注册