登录
/
注册
首页
论坛
其它
首页
科技
业界
安全
程序
广播
Follow
关于
每日签到
每天签到奖励2圆-6圆
发帖说明
VIP申请
登录
/
注册
账号
自动登录
找回密码
密码
登录
立即注册
搜索
搜索
关闭
CSDN热搜
程序园
精品问答
技术交流
资源下载
本版
帖子
用户
软件
问答
教程
代码
写记录
写博客
VIP申请
VIP网盘
网盘
联系我们
每日签到
道具
勋章
任务
设置
我的收藏
退出
腾讯QQ
微信登录
返回列表
首页
›
业界区
›
业界
›
Netty源码—3.Reactor线程模型二
Netty源码—3.Reactor线程模型二
[ 复制链接 ]
但婆
2025-6-3 14:53:31
大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
5.NioEventLoop的执行总体框架
(1)Reactor线程所做的三件事情
(2)处理多久IO事件就执行多久任务
(3)NioEventLoop.run()方法的执行流程
(1)Reactor线程所做的三件事情
NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。
一.首先是调用select()方法进行一次事件轮询
由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。
二.然后调用processSelectedKeys()方法处理轮询出来的IO事件
三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
private volatile int ioRatio = 50;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void select(boolean oldWakenUp) throws IOException {
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
//2.轮询过程中发现有任务加入,中断本次轮询
//3.阻塞式select操作: selector.select(timeoutMills)
//4.避免JDK空轮询Bug
}
}
...
}
复制代码
(2)处理多久IO事件就执行多久任务
在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
private volatile int ioRatio = 50;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
...
}
}
...
}
复制代码
(3)NioEventLoop.run()方法的执行流程
NioEventLoop.run() -> for(;;)
select() //执行一次事件轮询检查是否有IO事件
processSelectedKeys() //处理产生IO事件的Channel
runAllTasks() //处理异步任务队列
//这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的
复制代码
6.Reactor线程执行一次事件轮询
(1)执行select操作前设置wakeUp变量
(2)定时任务快开始了则中断本次轮询
(3)轮询中发现有任务加入则中断本次轮询
(4)执行阻塞式select操作
(5)避免JDK的空轮询Bug
(6)执行一次事件轮询的总结
(1)执行select操作前设置wakeUp变量
NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
//Boolean that controls determines if a blocked Selector.select should break out of its selection process.
//In our case we use a timeout for the select method and the select method will block for that time unless waken up.
private final AtomicBoolean wakenUp = new AtomicBoolean();
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
}
}
...
}
复制代码
如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
//2.轮询过程中发现有任务加入,中断本次轮询
//3.阻塞式select操作: selector.select(timeoutMills)
//4.避免JDK空轮询Bug
}
}
...
}
复制代码
(2)定时任务快开始了则中断本次轮询
NioEventLoop中的Reactor线程的select操作也是一个for循环。
在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。
Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
...
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();//当前时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
for(;;) {
//1.定时任务截止时间快到了,中断本次轮询
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();//非阻塞执行select操作
selectCnt = 1;
}
break;
}
...
}
}
...
}
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
...
}
//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
...
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}
...
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
...
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
public long deadlineNanos() {
return deadlineNanos;
}
...
}
复制代码
9.Reactor线程处理任务队列之执行任务
(1)runAllTasks()方法需要传入超时时间
(2)Reactor线程执行任务的步骤
(3)Netty性能优化之批量策略
(4)NioEventLoop.run()方法执行任务总结
(1)runAllTasks()方法需要传入超时时间
SingleThreadEventExecutor的runAllTasks()方法需要传入参数timeoutNanos,表示尽量在timeoutNanos时间内将所有的任务都取出来执行一遍。因为如果Reactor线程在执行任务时停留的时间过长,那么将会累积许多IO事件无法及时处理,从而导致大量客户端请求阻塞。因此Netty会精细控制内部任务队列的执行时间。
(2)Reactor线程执行任务的步骤
一.任务聚合
转移定时任务到MPSC队列,这里只是将快到期的定时任务转移到MPSC队列里。
二.时间计算
计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC队列)里了。
三.任务执行
首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔64次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。
[code]//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //每一个NioEventLoop会有一个MPSC队列 private final Queue taskQueue; ... //Poll all tasks from the task queue and run them via Runnable#run() method. //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos. protected boolean runAllTasks(long timeoutNanos) { //1.转移定时任务到MPSC队列,也就是任务聚合 fetchFromScheduledTaskQueue(); //从普通的任务队列(MPSC队列)里获取任务 Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } //2.计算本轮任务执行的截止时间 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //3.执行任务,通过for循环逐个执行pollTask()取出的任务 for (;;) { //3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行 safeExecute(task); //3.2 累加当前已执行的任务数 runTasks ++; //3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } //3.4 判断本轮任务是否已经执行完毕 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; } protected Runnable pollTask() { assert inEventLoop(); return pollTaskFrom(taskQueue); } protected final Runnable pollTaskFrom(Queue taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { continue; } return task; } } ...}//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { Queue> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos()
Netty
源码
Reactor
线程
模型
相关帖子
面试官:如何确保动态线程池任务都执行完?
技术面:Java并发(线程池、ForkJoinPool)
百度多线程批量推送
社交app源码开发平台基础知识,软件二维码的生成
技术面:Java并发(线程同步、死锁、多线程编排)
推荐一款线程or进程间数据同步解决方案
Tekla坐标定位插件源码
文生图模型Stable Diffusion使用详解
Redis容量评估模型
源码调试-带你了解下车牌识别的深度学习模型-LPRNet
vip免费申请,1年只需15美金$
回复
使用道具
举报
提升卡
置顶卡
沉默卡
喧嚣卡
变色卡
千斤顶
照妖镜
相关推荐
业界
面试官:如何确保动态线程池任务都执行完?
0
760
水苯
2025-09-04
业界
技术面:Java并发(线程池、ForkJoinPool)
0
147
揉幽递
2025-09-05
软件
百度多线程批量推送
0
15
新程序
2025-09-09
安全
社交app源码开发平台基础知识,软件二维码的生成
0
590
慷规扣
2025-09-09
业界
技术面:Java并发(线程同步、死锁、多线程编排)
0
974
丧血槌
2025-09-10
安全
推荐一款线程or进程间数据同步解决方案
0
609
米榜饴
2025-09-10
业界
Tekla坐标定位插件源码
0
256
慎气
2025-09-10
业界
文生图模型Stable Diffusion使用详解
0
266
县挫伪
2025-09-11
安全
Redis容量评估模型
0
367
姬宜欣
2025-09-12
业界
源码调试-带你了解下车牌识别的深度学习模型-LPRNet
0
829
予捻
2025-09-12
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
|
立即注册
回复
本版积分规则
回帖并转播
回帖后跳转到最后一页
签约作者
程序园优秀签约作者
发帖
但婆
2025-6-3 14:53:31
关注
0
粉丝关注
11
主题发布
板块介绍填写区域,请于后台编辑
财富榜{圆}
敖可
9984
杭环
9988
凶契帽
9988
4
氛疵
9988
5
黎瑞芝
9988
6
猷咎
9986
7
里豳朝
9986
8
肿圬后
9986
9
蝓俟佐
9984
10
虽裘侪
9984
查看更多