Kafka中时间轮分析与Java实现_kafka 时序

yumo6663周前 (08-28)技术文章23

在Kafka中应用了大量的延迟操作但在Kafka中 并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,Kafka这类分布式框架有大量延迟操作并且对性能要求及其高,而java.util.Timer与
java.util.concurrent.DelayQueue的插入和删除时间复杂度都为对数阶O(log n)并不能满足Kafka性能要求,所以Kafka实现了
基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)
  
时间轮的应用并不少见,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影;

时间轮数据结构

时间轮名词解释:

  时间格:环形结构中用于存放延迟任务的区块;
  
指针(CurrentTime):指向当前操作的时间格,代表当前时间
  
格数(ticksPerWheel):为时间轮中时间格的个数
  
间隔(tickDuration):每个时间格之间的间隔
  
总间隔(interval):当前时间轮总间隔,也就是等于ticksPerWheel*tickDuration

  TimingWheel并非简单的环形时间轮,而是多层级时间轮,每个时间轮由多个时间格组成,每个时间格为一个时间间隔,底层的时间格跨度较小,然后随着延迟任务延迟时间的长短逐层变大;如上图,底下的时间轮每个时间格为1ms,整个时间轮为10ms,而上面一层的时间轮中时间格为10ms,整个时间轮为100ms;
  
时间轮添加上级时间轮的规则为:当前currentTime为上级时间轮的startMs,当前interval为上级时间轮的tickDuration,每层ticksPerWheel相同;简单点说就是上层时间轮跨度为当前的M倍,时间格为当前的N倍;

Kafka中时间轮的实现

  Kafka中时间轮时间类为TimingWheel,该类结构为存储定时任务的环形队列,内部使用数组实现,数组是用于存放TimerTaskList对象,TimerTaskList环形双向链表,链表项TimerTaskEntry封装了定时任务TimerTask,TimerTaskList与TimerTaskEntry中均有超时时间字段,TimerTask中delayMs字段用于记录任务延迟时间;该三个类为Kafka时间轮实现的核心;
  
TimingWheel:表示一个时间轮,通常会有多层时间轮也就存在多个TimingWheel对象;
  
TimerTaskList:为数组对象用于存放延迟任务,一个TimerTaskList就代表一个时间格,一个时间格中能保存的任务到期时间只可在[t~t+10ms]区间(t为时间格到期时间,10ms时间格间格),每个时间格有个过期时间,时间格过期后时间格中的任务将向前移动存入前面时间格中;
  
TimerTask:表示延迟任务;
  
SystemTimer:kafka实现的定时器,内部封装了TimningWheel用于执行、管理定时任务;

  下面通过一个示例来介绍kafka时间轮的工作过程:

  时间轮初始化:初始时间轮中的格数、间隔、指针的初始化时间,创建时间格所对应的buckets数组,计算总间隔interval;
  
添加延迟任务:判断该任务是否已被取消、是否已经过期如已过期则把任务放入线程池中执行、根据时间轮总间隔与当前时间判断任务是否可存入当前层级时间轮否则添加上层时间轮并再次尝试往时间轮中添加该任务;

  时间轮降级:有一个定时任务再300ms后将执行,现层级时间轮每层有10个时间格,顶层时间轮的时间格间隔为1ms,整个时间轮为10ms,无法存下该任务。这时创建第二层时间轮,时间格间隔为10ms,整个时间轮为100ms,还是无法存该任务。接着创建第三层时间轮,时间格间隔为100ms,整个时间轮为1000ms,此时任务存入第三层时间轮的第三个时间格中;过了段时间,TimerTaskList到期(时间格)可该任务还有90ms,还无法执行。此时将再次把定时任务添加到时间轮中,顶层时间轮还是无法满足存入条件,往第二层时间轮添加,这时定时任务存入第二层时间轮第九个时间格当中;任务在时间轮中如此反复,直到任务过期时将放入线程池中执行;

关键实现方法

 public boolean add(TaskEntry e) {    synchronized (this) {        long expiration = e.getExpirationMs();         if(expiration<(currentTime+tickDuration)){            LOGGER.info("当前任务已过期");            return false;        }else if(expiration<(currentTime+interval)) {            //查找时间格的位置,过期时间/时间格%时间轮大小            long virtualId = expiration / tickDuration;             TaskEntryList taskEntryList = buckets.get((int) (virtualId % ticksPerWheel));            taskEntryList.add(e);             //设置EntryList过期时间            if(taskEntryList.setTime(virtualId * tickDuration)) {                 listDelayQueue.offer(taskEntryList);            if(overflowWheel==null){                 addOverflowWheel();            return overflowWheel.add(e); public void advanceClock(long timeMS){    if(timeMS>=(currentTime+tickDuration)){        currentTime=timeMS-(timeMS%tickDuration);    if (overflowWheel != null) overflowWheel.advanceClock(currentTime);public void add(TaskEntry taskEntry) {    if (!timingWheel.add(taskEntry)) {        System.out.println(String.format("任务已过期,开始执行 %s",taskEntry.getTimerTask()));        taskExecutor.execute(taskEntry.getTimerTask());

文章首发地址:Solinx
http://www.solinx.co/archives/989

相关文章

java定时器Timer 你还记得吗?_c#timer定时器的基本用法

java已经帮我们写了定时器的任务,我们只需要按照API的文档来实现就行。首先我们看下java帮我们实现的定时器类:java.lang.Timer我们先来看下Timer的构造方法:我们可以看到Time...

Java 底层大揭秘系列:如何实现定时任务

定时器已经是现代软件中不可缺少的一部分,例如每隔5秒去查询一下状态,是否有新邮件,实现一个闹钟等, Java 中已经有现成的 api 供使用,但是如果你想设计更高效,更精准的定时器任务,就需要了解底层...

java总结:8.正则表达式,匹配一天的指定时间段跑定时器

定时时间(每天早上3点到晚上23点,每5分钟执行):0 1/5 3-22 ? * * * 举例操作:定时器每20分钟执行一次,每天从3点执行,到5点结束 0 1/20 3-4 ? * * 执行的结...

面试突击34:如何使用线程池执行定时任务?

在 Java 语言中,有两个线程池可以执行定时任务:ScheduledThreadPool 和 SingleThreadScheduledExecutor,其中 SingleThreadSchedul...

JAVA架构师之路-教你如何去实现一个分布式定时任务

什么是分布式定时任务:首先,我们要了解计划任务这个概念,计划任务是指由计划的定时运行或者周期性运行的程序。我们最常见的就是Linux的‘crontab’和Windows的‘计划任务’。那么什么是分布式...

Java---定时任务的实现方式_java定时任务的实现方式

一 什么是定时任务见名知意,定时任务就是每隔一段时间执行一次这个任务,比如我们日常生活中的下课铃,或者是闹钟等等,就是在设置好的固定时间段去不断执行这个任务。二 如何实现定时任务功能这次我介绍两种执行...