io.netty.util.concurrent.AbstractScheduledEventExecutor
来源:4-10 -reactor线程任务的执行

南国漂泊
2018-07-17
这节课中讲到了`io.netty.util.concurrent.AbstractScheduledEventExecutor`的`schedule
()`方法
Queue<ScheduledFutureTask<?>> scheduledTaskQueue; Queue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); } return scheduledTaskQueue; } <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
scheduledTaskQueue() -> 我理解为: 延迟创建 成员变量 scheduledTaskQueue
inEventLoop() -> 如果是在同一Thread,就直接向成员变量中添加数据;否则,在一个新的Thread中添加, 这不会造成问题吗?
在什么条件下会出现 - 不是同一个线程的情况?
在 else 下, scheduledTaskQueue 重新初始化了一次吗? 否则怎么保证安全啊?
有点方,没转过弯来,请各位道友帮帮忙……
****
感谢 @鋒Nic 的回答,不过答案太长了也没分段看得我头疼。
我看到一句非常有价值的话是: 「外部线程发起的schedule的定时任务不应该去阻塞io事件的处理」 那是否是说进入到 else 的情形就是 那些会阻塞io事件的用户事件(我描述的不好,不知怎么描述)。
我一直觉得 成员变量 scheduledTaskQueue 会随着这个类一起被重新创建多次 (如果进入到else里的话)
我感觉我还得仔细看看源码好好琢磨琢磨……
1回答
-
鋒Nic
2018-07-17
scheduledTaskQueue() 方法返回的是定时任务队列ScheduledTaskQueue,这边没延迟创建的意思。定时任务队列ScheduledTaskQueue在调用NioEventLoop的schedule()方法将Callable任务封装成ScheduledFutureTask,判断是否为当前NioEventLoop发起的schedule还是外部线程发起的schedule,当前NioEventLoop发起的schedule直接添加定时任务,外部线程发起的schedule为了保证线程安全(ScheduledTaskQueue是PriorityQueue非线程安全)添加定时任务操作当做普通任务Task保证对于定时任务队列操作都在NioEventLoop实现,外部线程发起的schedule的定时任务不应该去阻塞io事件的处理所以丢到定时任务队列ScheduledTaskQueue,在reactor线程第三步会从scheduledTaskQueue转移定时任务到taskQueue(mpsc queue)。
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
在Handler里调用schedule()方法,例如
ctx.channel().eventLoop().schedule(new Runnable() { @Override
public void run() {
}
}, 60, TimeUnit.SECONDS);042018-07-17