1 概述:循环队列
循环队列
- 循环队列: 一种先进先出(FIFO)的数据结构——它通过将【顺序队列】的末尾连接到开头,形成一个【环状结构】,从而解决了【顺序队列】的【虚假满状态问题】。
- 【队列】:一种先进先出(First In First Out)的线性表,简称FIFO。允许插入的一端称为【队尾(head)】,允许删除的一端称为【队头(tail)】。
- Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区
别名:循环缓冲队列,Circular Buffer Queue
在 Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。
- 实时计算中,缓存单个设备的最近N秒的状态数据
- ...
- 环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer Queue)是【无锁队列】实现中一种【高效的数据结构】,特别适合【高性能并发场景】,如线程池任务调度、实时系统、服务器请求处理等。
- 环形缓冲区是一个逻辑上通过【头指针】(head)和【尾指针】(tail)形成循环队列的数据结构。
换言之,它由一个【固定大小】的【列表】和两个【指针】组成。一个指针 head 用于指向队列的头部,另一个指针 tail 则用于指向队列的尾部。
当指针到达数组末尾时,自动“绕回”到开头(通过模运算)。它常用于【无锁队列】,因为:
- 固定内存:避免动态分配,减少内存碎片。
- 缓存友好:连续内存布局提高缓存命中率。
- 高效操作:头尾指针通过原子操作更新,支持并发访问。
核心组件
- 缓冲区:固定大小的数组或链表。
- 头指针(head):指向下一个读取位置(消费者使用)。
- 尾指针(tail):指向下一个写入位置(生产者使用)。
- 大小计数器(size):跟踪队列中元素数量(可选,视场景)。
- 原子操作:使用 std::atomic(C/C++) / AtomicXxx(Java) 等锁机制,确保 head 和 tail 的线程安全更新。
不同策略下的循环队列
- 覆盖策略(满时入队覆盖队首)
- 阻塞策略(满/空时阻塞等待)
- 非阻塞策略(满/空时返回 null / 抛异常)
基于 ConcurrentLinkedDeque 实现线程安全的循环队列的思路
- ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞、线程安全的【双端队列】,基于【无锁】(CAS)机制实现。
java.util.concurrent.ConcurrentLinkedDeque
- 要基于 ConcurrentLinkedDeque 实现线程安全的循环队列。
核心思路是:利用 ConcurrentLinkedDeque 的并发安全特性封装队列操作,通过固定容量限制实现“【循环】”(队列满时入队会覆盖/阻塞/抛异常,队空时出队会阻塞/抛异常)。
- 固定容量:循环队列的核心是容量固定,满后入队需遵循循环规则(覆盖旧元素/阻塞/拒绝)。
- 线程安全:复用 ConcurrentLinkedDeque 的并发安全特性,避免手动加锁。
- 循环逻辑:入队时若队列满,根据策略处理(如覆盖队首、阻塞等待、抛异常);出队时若空,同理。
循环队列的特点
- 无动态分配,性能高。
- 内存布局连续,缓存命中率高。
- 适合固定容量的高性能场景。
- 固定容量,可能溢出或不足。
- MPMC 场景下【头尾指针竞争】,可能导致 CAS 重试。
- 需要仔细处理【内存序】和【数据一致性】。
2 覆盖式循环队列
此版本,项目亲测。
2.1 实现思路
- 基于 Java JDK 的 并发双端队列(java.util.concurrent.ConcurrentLinkedDeque)实现覆盖式循环队列。
容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))
2.2 源码实现(Java)
CoverStrategyCircularQueue
CircularQueueTest
- public class CircularQueueTest {
- private final static Logger log = LoggerFactory.getLogger(CircularQueueTest.class);
- @Test
- public void CoverStrategyCircularQueueTest(){
- int length = 5;
- CoverStrategyCircularQueue<String> queue = new CoverStrategyCircularQueue<>(length);
- //入队
- for (int i = 0; i < length + 3; i++) {
- queue.offer( (new Integer(i+1)).toString() );
- }
- log.info("queue:{}", queue);//[4, 5, 6, 7, 8]
- //出队
- queue.poll();
- queue.poll();
- queue.poll();
- log.info("queue:{}", queue);//[7, 8]
- }
- }
复制代码 3 阻塞式循环队列(等待策略)
- 满时入队阻塞,空时出队阻塞,适合生产-消费模型,需结合 Lock 和 Condition 实现。
- ReentrantLock : 可重入的互斥锁,又被称为“独占锁”
- Condition : Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此,通常来说比较推荐使用Condition。
源码实现(Java)
关键注意事项
- ConcurrentLinkedDeque.size() 是O(n)操作,高并发下性能差,因此基础版用 AtomicInteger 维护计数,阻塞版用锁保护计数。
- null元素禁止: ConcurrentLinkedDeque 不允许null元素,因此入队时需校验。
- 循环策略选择:
- 覆盖策略:适合日志缓存、临时数据存储(允许旧数据被覆盖)。
- 阻塞策略:适合生产-消费模型(如任务队列,需严格控制容量)。
- 非阻塞策略:适合快速响应场景(满/空时直接返回,不阻塞)。
使用示例
- public class CircularQueueTest {
- public static void main(String[] args) {
- // 基础版(覆盖策略)
- CircularQueue<String> queue = new CircularQueue<>(3);
- queue.offer("A");
- queue.offer("B");
- queue.offer("C");
- queue.offer("D"); // 满,覆盖队首"A"
- System.out.println(queue.poll()); // 输出B
- System.out.println(queue.size()); // 输出3(B/C/D)
- // 阻塞版(生产-消费)
- BlockingCircularQueue<Integer> blockingQueue = new BlockingCircularQueue<>(2);
- // 生产者线程
- new Thread(() -> {
- try {
- blockingQueue.put(1);
- blockingQueue.put(2);
- blockingQueue.put(3); // 满,阻塞
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }).start();
- // 消费者线程
- new Thread(() -> {
- try {
- Thread.sleep(1000);
- System.out.println(blockingQueue.take()); // 输出1,唤醒生产者
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }).start();
- }
- }
复制代码 4 非阻塞循环队列(满/空时返回 null / 抛异常)
实现思路
基于 ConcurrentLinkedDeque 实现非阻塞循环队列
- ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞、线程安全的双端队列,基于无锁(CAS)机制实现。
- 要基于它封装非阻塞式的循环队列,核心思路是:
- 限制队列容量,模拟循环队列的“固定长度”特性;
- 利用双端队列的首尾操作(offer/poll)模拟循环入队/出队;
- 基于 CAS 保证并发安全,避免阻塞(非阻塞核心);
- 处理队列满/空时的非阻塞策略(如返回 false/空值,而非等待)。
核心实现要点
- 容量限制:通过原子变量记录当前元素数,入队前检查是否已满,出队后更新计数;
- 循环逻辑:无需手动维护头尾指针(ConcurrentLinkedDeque 已封装链表节点的 CAS 操作),仅需在容量满时拒绝入队,空时拒绝出队;
- 非阻塞特性:所有操作均为非阻塞,失败时立即返回结果,不使用锁或条件变量等待。
源码实现(Java)
NonBlockingCircularQueue
关键细节说明
- 原子计数器(AtomicInteger):
- 必须用原子变量记录元素数量,避免并发下size()与实际队列元素数不一致(ConcurrentLinkedDeque 的size()是遍历计数,性能差且非原子);
- 入队/出队时通过compareAndSet(CAS)自旋更新计数,保证计数与实际操作的原子性。
- 非阻塞特性:
- 入队时若队列满,直接返回false,不阻塞等待;
- 出队时若队列空,直接返回null,不阻塞等待;
- 所有操作基于 CAS 自旋,无锁、无阻塞,适合高并发低延迟场景。
- 循环逻辑:
- 无需手动维护循环队列的头尾指针(如数组实现的循环队列),ConcurrentLinkedDeque 已通过链表节点的 CAS 操作实现高效的首尾操作;
- “循环”体现在“固定容量+满则拒绝入队”,出队后释放容量可再次入队,模拟循环复用空间。
- 线程安全:
- 底层 ConcurrentLinkedDeque 保证了队列操作的线程安全;
- 原子计数器保证了容量判断的准确性;
- CAS 自旋解决了“检查-更新”的竞态条件(如先判断容量,再入队的间隙被其他线程修改容量)。
适用场景与局限性
适用场景
- 高并发、低延迟的生产-消费场景;
- 不需要阻塞等待(如生产者可丢弃数据,消费者可跳过空队列);
- 容量固定,需循环复用队列空间。
局限性
- 不支持阻塞式操作(如需阻塞等待,应使用ArrayBlockingQueue/LinkedBlockingDeque);
- 基于链表实现,内存开销略高于数组实现的循环队列;
- CAS 自旋在高并发下可能导致 CPU 占用升高(可通过限制自旋次数优化)。
优化方向
- 自旋次数限制:避免无限自旋,可设置最大自旋次数,超过后返回失败;
- 批量操作:提供批量入队/出队方法,减少 CAS 次数;
- 公平性:可选公平策略(如按线程顺序自旋),避免线程饥饿;
- 元素过期:支持过期元素自动清理,适配缓存场景。
Z 最佳实践
循环队列的性能优化建议
- 高并发场景下,优先选择覆盖策略,避免【锁竞争】。
- 若需阻塞策略,可考虑直接使用 ArrayBlockingQueue(JDK 原生,性能更优)
- 避免频繁调用 size() 方法,高并发下改用 isEmpty() / isFull() 替代(O(1) 操作)。
基于循环队列实现滑动窗口
- [Python/循环队列] 数据结构之滑动窗口 - 博客园/千千寰宇
方案2:基于【循环队列】实现【滑动窗口】
Hadoop MapReduce 在 Shuffle 过程中环形缓冲区的应用
- Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区
别名:循环缓冲队列,Circular Buffer Queue
在 Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。
【环形缓冲区】的工作原理是【基于生产者-消费者模型】的。
- 在 Shuffle 过程中,Mapper 节点充当生产者的角色,将数据写入【环形缓冲区】;而 Reducer 节点则充当【消费者】的角色,从【环形缓冲区】中读取数据并进行后续的处理。
- 当 Mapper 节点将数据写入【环形缓冲区】时,tail 指针会递增。
如果 tail 指针追上了 head 指针,表示缓冲区已满,此时 Mapper 节点会等待一段时间,直到 Reducer 节点读取并释放了一些空间,再将数据写入【环形缓冲区】。
- 当 Reducer 节点从【环形缓冲区】中读取数据时,head 指针会递增。
如果 head 指针追上了 tail 指针,表示缓冲区已空,此时 Reducer 节点会等待一段时间,直到 Mapper 节点写入了更多的数据,再继续读取。
- 环形缓冲区在 Shuffle 过程中起到了至关重要的作用。
- 它将 Mapper 节点产生的数据进行【临时存储】,以便 Reducer 节点能够按照预定的顺序和方式进行读取和处理。
- 另外,由于 Hadoop 在 Shuffle 过程中使用了磁盘进行大规模的数据传输,而磁盘读写较慢。
因此,【环形缓冲区】通过在【内存】中存储数据,加速了数据的传输和处理过程,提高了整个 Shuffle 过程的效率和性能。
- Hadoop:Shuffle 过程中的环形缓冲区 - 极简博客
Y 推荐文献
X 参考文献
- 环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer)是无锁队列实现中一种高效的数据结构 - CSDN
本文作者: 千千寰宇
本文链接: https://www.cnblogs.com/johnnyzen
关于博文:评论和私信会在第一时间回复,或直接私信我。
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
日常交流:大数据与软件开发-QQ交流群: 774386015 【入群二维码】参见左下角。您的支持、鼓励是博主技术写作的重要动力!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |