找回密码
 立即注册
首页 业界区 安全 BlockingQueue:阻塞操作与条件队列的高效结合 ...

BlockingQueue:阻塞操作与条件队列的高效结合

精滂软 3 天前
BlockingQueue和BlockingDeque

BlockingQueue

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:
1.png

一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。 负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
BlockingQueue 的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常特定值阻塞超时插入add(o)offer(o)put(o)offer(o, timeout, timeunit)移除remove()poll()take()poll(timeout, timeunit)检查element()peek()四组不同的行为方式解释:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。 可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高,因此你尽量不要用这一类的方法,除非你确实不得不那么做。
BlockingDeque

java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。
BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。 deque(双端队列) 是 "Double Ended Queue" 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。
在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到 BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用 BlockingDeque。BlockingDeque 图解:
2.png

BlockingDeque 的方法

一个 BlockingDeque - 线程在双端队列的两端都可以插入和提取元素。 一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。
BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常特定值阻塞超时插入addFirst(o)offerFirst(o)putFirst(o)offerFirst(o, timeout, timeunit)移除removeFirst(o)pollFirst(o)takeFirst(o)pollFirst(timeout, timeunit)检查getFirst(o)peekFirst(o)抛异常特定值阻塞超时插入addLast(o)offerLast(o)putLast(o)offerLast(o, timeout, timeunit)移除removeLast(o)pollLast(o)takeLast(o)pollLast(timeout, timeunit)检查getLast(o)peekLast(o)四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
BlockingDeque 与BlockingQueue关系

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。
以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:
BlockingQueueBlockingDequeadd()addLast()offer() x 2offerLast() x 2put()putLast()remove()removeFirst()poll() x 2pollFirst()take()takeFirst()element()getFirst()peek()peekFirst()BlockingQueue 的例子

这里是一个 Java 中使用 BlockingQueue 的示例。本示例使用的是 BlockingQueue 接口的 ArrayBlockingQueue 实现。 首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer 和 一个 Consumer。Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer 则会从中把它们拿出来。
  1. public class BlockingQueueExample {
  2.     public static void main(String[] args) throws Exception {
  3.         BlockingQueue queue = new ArrayBlockingQueue(1024);
  4.         
  5.         Producer producer = new Producer(queue);
  6.         Consumer consumer = new Consumer(queue);
  7.         new Thread(producer).start();
  8.         new Thread(consumer).start();
  9.         Thread.sleep(4000);
  10.     }
  11. }
复制代码
以下是 Producer 类。注意它在每次 put() 调用时是如何休眠一秒钟的。这将导致 Consumer 在等待队列中对象的时候发生阻塞。
  1. public class Producer implements Runnable{
  2.     protected BlockingQueue queue = null;
  3.     public Producer(BlockingQueue queue) {
  4.         this.queue = queue;
  5.     }
  6.     public void run() {
  7.         try {
  8.             queue.put("1");
  9.             Thread.sleep(1000);
  10.             queue.put("2");
  11.             Thread.sleep(1000);
  12.             queue.put("3");
  13.         } catch (InterruptedException e) {
  14.             e.printStackTrace();
  15.         }
  16.     }
  17. }
复制代码
以下是 Consumer 类。它只是把对象从队列中抽取出来,然后将它们打印到 System.out。
  1. public class Consumer implements Runnable{
  2.     protected BlockingQueue queue = null;
  3.     public Consumer(BlockingQueue queue) {
  4.         this.queue = queue;
  5.     }
  6.     public void run() {
  7.         try {
  8.             System.out.println(queue.take());
  9.             System.out.println(queue.take());
  10.             System.out.println(queue.take());
  11.         } catch (InterruptedException e) {
  12.             e.printStackTrace();
  13.         }
  14.     }
  15. }
复制代码
数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。
ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注: 因为它是基于数组实现的,也就具有数组的特性: 一旦初始化,大小就无法修改)。 ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。 以下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:
  1. BlockingQueue queue = new ArrayBlockingQueue(1024);
  2. queue.put("1");
  3. Object object = queue.take();
复制代码
以下是使用了 Java 泛型的一个 BlockingQueue 示例。注意其中是如何对 String 元素放入和提取的:
  1. BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
  2. queue.put("1");
  3. String string = queue.take();
复制代码
延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口。
DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,该接口定义:
[code]public interface Delayed extends Comparable

相关推荐

您需要登录后才可以回帖 登录 | 立即注册