SkyWalking 源码解析 —— Collector Queue 队列组件

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-queue-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


1. 概述

本文主要分享 SkyWalking Collector Queue Module,队列组件。该组件被 Collector Streaming Module 流式处理使用,提供异步执行的特性。

友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。

Cluster Module 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • collector-queue-define :定义队列组件接口。
  • collector-queue-datacarrier-provider :基于 apm-datacarrier 的队列组件实现。目前暂未完成
  • collector-queue-zookeeper-provider :基于 Disruptor 的队列组件实现。

下面,我们从接口到实现的顺序进行分享。

2. collector-queue-define

collector-queue-define :定义队列组件接口。项目结构如下 :

2.1 QueueModule

org.skywalking.apm.collector.queue.QueueModule ,实现 Module 抽象类,队列 Module 。

#name() 实现方法,返回模块名为 "queue"

#services() 实现方法,返回 Service 类名:QueueCreatorService 。

2.2 QueueCreatorService

org.skywalking.apm.collector.queue.service.QueueCreatorService ,继承 Service 接口,队列创建服务接口

#create(queueSize, executor) 接口方法,创建队列处理器。

2.3 MessageHolder

org.skywalking.apm.collector.queue.base.MessageHolder ,消息持有者。

  • message 属性,持有的消息。
  • #reset() 方法,清空消息。为什么会有这个方法,下文胖友会看到。

2.4 QueueEventHandler

org.skywalking.apm.collector.queue.base.QueueEventHandler,队列处理器接口。它定义了 #tell(message) 接口方法,输入消息给自己。最终,QueueEventHandler 会”提交“消息给 org.skywalking.apm.collector.queue.base.QueueExecutor,执行处理该消息。

LocalAsyncWorkerRef 实现 QueueEventHandler 接口,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3.1.2 LocalAsyncWorkerRef」 有详细解析。

2.5 DaemonThreadFactory

org.skywalking.apm.collector.queue.base.DaemonThreadFactory,守护进程线程工厂,被用于创建消息处理器的线程。

3. collector-queue-disruptor-provider

collector-queue-disruptor-provider ,基于 Disruptor 的队列组件实现。

项目结构如下 :

默认配置,在 application-default.yml 已经配置如下:

queue:
  disruptor:

3.1 QueueModuleDisruptorProvider

org.skywalking.apm.collector.queue.disruptor.CQueueModuleDisruptorProvider ,实现 ModuleProvider 抽象类,基于 Disruptor 的队列服务提供者。

#name() 实现方法,返回组件服务提供者名为 "disruptor"

module() 实现方法,返回组件类为 QueueModule 。

#requiredModules() 实现方法,返回依赖组件为空。


#prepare(Properties) 实现方法,执行准备阶段逻辑。

  • 第 44 行 :创建 DisruptorQueueCreatorService 对象,并调用 #registerServiceImplementation() 父类方法,注册到 services

#start() 实现方法,方法为空。

#notifyAfterCompleted() 实现方法,方法为空。

2.2 DisruptorQueueCreatorService

org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService ,实现 QueueCreatorService 接口,基于 Disruptor 的队列创建服务实现类

#create(queueSize, executor) 实现方法,调用 DisruptorQueueCreator#register(queueSize, executor) 方法,创建队列处理器。

3.2.1 DisruptorQueueCreator

友情提示:如果胖友对 Disruptor 暂时不了解,建议先使用 Disruptor 写个小 Demo 。

如下是笔者阅读的文章:

org.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator ,实现 QueueCreator 接口,基于 Disruptor 的队列创建器实现类

#create(queueSize, executor) 实现方法,代码如下:

  • 第 42 至 45 行:校验队列大小为 2 的指数,否则创建 Disruptor 对象会报 "bufferSize must be a power of 2" 的异常,参见 AbstractSequencer 的代码。
  • 第 49 行:创建 Disruptor 对象。
  • 第 51 至 64 行:设置 Disruptor 对象的默认异常处理器
  • 第 67 至 70 行:创建 DisruptorEventHandler 对象,并设置为 Disruptor 对象的事件处理器
  • 第 74 行:启动 Disruptor 对象。

为什么 Disruptor 要求队列大小为 2 的指数呢?如下是相关资料,感兴趣的同学可以看看( 可跳过 ):

  • SingleProducerSequencer#hasAvailableCapacity(requiredCapacity) 方法,代码如下:

3.3 DisruptorEventHandler

org.skywalking.apm.collector.queue.disruptor.base.DisruptorEventHandler ,基于 Disruptor 的队列处理器实现类

4. collector-queue-datacarrier-provider

collector-queue-datacarrier-provider :基于 apm-datacarrier 的队列组件实现。

目前暂未完成

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » SkyWalking 源码解析 —— Collector Queue 队列组件
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏