AbstractScheduledEventExecutor在AbstractEventExecutor的基础上提供了对定时任务的支持, 在内部有一个queue用于保存定时任务.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
}
在queue中存放的是ScheduledFutureTask的实例. 在深入研究AbstractScheduledEventExecutor的代码前, 我们先看一下ScheduledFutureTask.
ScheduledFutureTask
ScheduledFutureTask继承自PromiseTask, 并实现了ScheduledFuture接口. 而ScheduledFuture接口是从java.util.concurrent.ScheduledFuture继承, 再往上是Delayed接口, 这里有对于定时任务而言最重要的一个方法getDelay().
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
}
public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> {
}
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
getDelay()方法用来返回当前这个对象离执行的延迟值, 即还要过多长时间就要执行了(所谓定时任务).
ScheduledFutureTask.getDelay()方法的实现
ScheduledFutureTask的getDelay()方法中是通过调用delayNanos()方法得到时间结果,然后转换为指定的时间单位. 而delayNanos()方法是通过计算deadlineNanos()和nanoTime()的差值来计算delay时间的.
- deadlineNanos()表示任务计划要执行的时间点
- nanoTime()是取当前时间点
两个时间的差值自然是所谓的delay时间.
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
deadlineNanos()方法
deadlineNanos()方法返回的deadlineNanos属性在构造函数中被指定, 即在任务构造的时候指定任务计划执行的时间点, 这对于定时任务自然是合理的:
private long deadlineNanos; ScheduledFutureTask(EventExecutor executor, Callable<V> callable, long nanoTime, long period) { ...... deadlineNanos = nanoTime; } public long deadlineNanos() { return deadlineNanos; }
nanoTime()方法
nanoTime()方法并没有直接调用System.nanoTime(), 而是先通过常量START_TIME中保存着类装载时的时间值, 然后nanoTime()方法中用每次取当前时间减去这个START_TIME, 返回其差值.
private static final long START_TIME = System.nanoTime(); static long nanoTime() { return System.nanoTime() - START_TIME; }
ScheduledFutureTask.run()方法的实现
再看ScheduledFutureTask的run()方法, 去掉其他细节处理之后的简化代码如下:
public void run() {
......
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
}
......
}
调用task.call()方法来执行task, 在执行前注明不能cancel, 执行完成后设置执行成功.
看完ScheduledFutureTask之后, 我们回到类AbstractScheduledEventExecutor.
添加定时任务
前面看到AbstractScheduledEventExecutor中有一个scheduledTaskQueue用来保存定时任务ScheduledFutureTask, 我们来看这些定时任务时如何添加的.
先看AbstractScheduledEventExecutor的schedule()方法, 在AbstractEventExecutor中这几个schedule()方法都是简单的抛出UnsupportedOperationException, 现在都被实现:
@Override
public ScheduledFuture< ? > schedule(Runnable command, long delay, TimeUnit unit) {
......
return schedule(new ScheduledFutureTask<Void>(
this, toCallable(command), ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
这几个schedule()方法都是类似的, 利用输入的参数先生成ScheduledFutureTask对象的实例, 然后调用下面这个schedule(ScheduledFutureTask)方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new OneTimeTask() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
先通过inEventLoop()判断, 如果当前调用的线程是外部线程, 则生成一个OneTimeTask,然后调用execute()方法, 之后event loop的工作线程会执行这个OneTimeTask将task加入到scheduledTaskQueue; 如果调用的线程就是event loop的工作线程, 则直接enqueue.
注: scheduleAtFixedRate()方法我们先掉过.
获取定时任务
pollScheduledTask()方法用来获取到期的定时任务:
protected final Runnable pollScheduledTask() {
return pollScheduledTask(nanoTime());
}
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue < ScheduledFutureTask < ? > > scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask< ? > scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
// 任务队列中没有任务
return null;
}
if (scheduledTask.deadlineNanos() < = nanoTime) {
// 取到的任务满足deadline条件, 可以返回
// 在返回前将任务从queue中删掉, 注意前面取任务用的时peek()方法
scheduledTaskQueue.remove();
return scheduledTask;
}
// 取到的任务不满足deadline条件,不能返回,只能返回null表示没有满足条件的任务
return null;
}
hasScheduledTasks()用法用来判断是否有满足条件的定时任务:
protected final boolean hasScheduledTasks() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}
另外当没有定时任务时, 还有一个nextScheduledTaskNano()方法用来查询下一个可以满足条件的任务的定时时间:
protected final long nextScheduledTaskNano() {
Queue<ScheduledFutureTask< ? >> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask< ? > scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
// 没有任务返回-1
return -1;
}
return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
}
这几个方法明显是留着给基类调用的了.