SkyWalking 源码解析 —— Agent 发送 Trace 数据

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取10G资料包与项目实战视频资料

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/agent-send-trace/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


1. 概述

分布式链路追踪系统,链路的追踪大体流程如下:

  1. Agent 收集 Trace 数据。
  2. Agent 发送 Trace 数据给 Collector
  3. Collector 接收 Trace 数据。
  4. Collector 存储 Trace 数据到存储器,例如,数据库。

本文主要分享【第二部分】 SkyWalking Agent 发送 Trace 数据

考虑到减少外部组件的依赖,Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent 写入内存消息队列后台线程异步】发送给 Collector 。

本文涉及的类非常少,如下图所示:

2. TraceSegmentServiceClient

org.skywalking.apm.agent.core.remote.TraceSegmentServiceClient ,TraceSegment 发送服务客户端。它是一个服务,也是一个客户端,负责将 TraceSegment 异步发送到 Collector 。

我们先来看看 TraceSegmentServiceClient 的属性

  • TIMEOUT 静态属性,发送等待超时时长,单位:毫秒。
  • lastLogTime 属性,最后打印日志时间。该属性主要用于开发调试。
  • segmentUplinkedCounter 属性,TraceSegment 发送数量。
  • segmentAbandonedCounter 属性,TraceSegment 被丢弃数量。在 Agent 未连接上 Collector 时,产生的 TraceSegment 将被丢弃。
  • carrier 属性,内存队列。在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 有对 DataCarrier 的详细解析。
  • serviceStub 属性,非阻塞 Stub 。
  • status 属性,连接状态。

下面,我们来介绍 TraceSegmentServiceClient 实现的接口以及对应的方法。

2.1 实现 BootService 接口

#beforeBoot() 方法,代码如下:

  • 第 86 行:调用 GRPCChannelManager#addChannelListener(this) 方法,将自己添加到 GRPCChannelManager 中,作为一个监听器,从而调用 #statusChanged(GRPCChannelStatus) 方法,实现对连接状态( status )的监听处理。

#boot() 方法,代码如下:

  • 第 95 至 97 行:创建 DataCarrier 对象,作为内存队列,并设置自己作为消费者,从而调用 #consume(List<TraceSegment> ) 方法,实现异步发送 TraceSegment 到 Collector 。

#afterBoot() 方法,代码如下:

  • 第 102 行:调用 TracingContext.ListenerManager#add(this) 方法,将自己添加到 ListenerManager 中,作为一个监听器,从而调用 #afterFinished(TraceSegment) 方法,实现收集到新的 TraceSegment ,添加到内存队列。

#shutdown() 方法,代码如下:

  • 第 107 行:调用 DataCarrier#shutdownConsumers() 方法,停止消费。

2.2 实现 GRPCChannelListener 接口

#statusChanged(GRPCChannelStatus) 方法,代码如下:

  • 第 211 至 214 行:连接成功,创建 Stub 对象。
  • 第 215 行:记录连接状态。

2.3 实现 TracingContextListener 接口

#afterFinished(TraceSegment) 方法,代码如下:

  • 第 197 至 199 行:当 TraceSegment.ignore = true 时,忽略该 TraceSegment 。
  • 第 201 行:提交 TraceSegment 到内存队列。

2.4 实现 IConsumer 接口

#consume(List<TraceSegment>) 方法,代码如下:

ps:目前 DataCarrier 最长每 20 秒消费一次。

#onError(List<TraceSegment>, Throwable) 方法,当消费发生异常时,打印日志。

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » SkyWalking 源码解析 —— Agent 发送 Trace 数据

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏