您的位置:

深入理解SkyAPM

一、简介

SkyAPM是提供分布式应用程序性能管理监控解决方案的开源项目。它可以跟踪整个分布式系统的调用链,提供分布式事务追踪,应用程序性能管理和分析,以支持运维人员进行问题排查和性能优化。

二、使用

SkyAPM的使用需要引入SkyWalking的Agent和Collector,分别用于在本地应用收集性能指标数据,和将收集到的数据上报到服务端进行分析处理与可视化。

  
    # 引入SkyWalking的Agent
    java -javaagent:/path/to/skywalking-agent.jar -Dskywalking.agent.service_name=myapp -jar myapp.jar
  
  
    # 启动SkyWalking OAP Server作为收集器以处理数据并生成报表
    docker run --name skywalking -d -p 12800:12800 apache/skywalking-oap-server:8.5.0-es7
  

三、核心概念

1. 服务实例

服务实例是指运行着同一服务的单个应用程序或进程。SkyAPM跟踪服务实例之间的请求和响应,以分析整个分布式系统的性能。

  
    // Java Spring Boot应用的服务实例配置
    spring.application.name=myapp
    server.port=8080
  

2. Spans

Span是指一次请求的一部分处理或事件,例如HTTP请求处理、SQL查询、RPC调用等。每个Span都包含一个时间戳和其他属性,可以用于调试、故障排除和性能优化。

  
    Span span = tracer.createLocalSpan("http: /api/user/1");
    span.tag("url", "/api/user/1");
    span.start();
    ...
    span.finish();
  

3. Traces

Trace是多个Span的有向无环图,代表整个请求的调用链,包括客户端和服务器端的Span。SkyAPM利用Trace来给一个完整的请求进行性能分析,以便检测并诊断问题。

  
    val traceSegment: TraceSegmentObject = ContextManager.capture()
    segmentObserver.beforeSerialize(traceSegment)
    buffer.write(traceSegment.toByteArray())
  

四、插件开发

SkyAPM提供了各种插件,例如Kafka、Redis、Dubbo等,以便于您更好地监控和分析分布式系统的各个方面。插件API可以让您扩展SkyAPM并添加自己的插件,以支持监控不同类型的应用程序和服务。

以下代码是一个自定义的SkyWalking插件样例,用于跟踪ActiveMQ的生产者和消费者的消息发送和接收:

  
    @Component
    public class AmqInstrumentation implements SpanObserver, EnhanceRequireObjectCache {
      private static final Logger logger = LoggerFactory.getLogger(AmqInstrumentation.class);
    
      private static final String SEND_OPERATION_NAME = "activemq-send";
      private static final String CONSUME_OPERATION_NAME = "activemq-consume";
    
      private final ConcurrentMap
    sendCache = new ConcurrentHashMap<>();
      private final ConcurrentMap
     consumeCache = new ConcurrentHashMap<>();
    
      @Override
      public void afterFinished(Span span) {
        if (span.getOperationName().equals(SEND_OPERATION_NAME)) {
          SendCache cache = sendCache.remove(span.getSpanId());
          if (cache != null) {
            span.tag("activemq-broker-url", cache.brokerUrl);
            span.tag("activemq-destination", cache.destination);
            span.tag("activemq-message-queue-size", String.valueOf(cache.size));
          }
        } else if (span.getOperationName().equals(CONSUME_OPERATION_NAME)) {
          ConsumeCache cache = consumeCache.remove(span.getSpanId());
          if (cache != null) {
            span.tag("activemq-broker-url", cache.brokerUrl);
            span.tag("activemq-destination", cache.destination);
          }
        }
      }
    
      public void onSend(@Advice.Local("sendCache") SendCache cache,
                         @FieldName(className = "org.apache.activemq.command.ActiveMQDestination", value = "destinationType") byte destinationType,
                         @FieldValue(className = "org.apache.activemq.command.ActiveMQDestination", value = "physicalName") String physicalName,
                         @TargetObject Object messageProducer) {
        Span span = ContextManager.createExitSpan(SEND_OPERATION_NAME, ContextManager.getRemotePeer(messageProducer));
        if (span == null) {
          return;
        }
    
        cache.size = ((MessageProducer) messageProducer).getSendQueue().size();
        cache.brokerUrl = JmsHelper.getBrokerUrlFromConnection(JmsHelper.getJmsConnectionFromSession(JmsHelper.getSessionFromProducer((MessageProducer) messageProducer)));
        cache.destination = physicalName;
        cache.spanId = span.getSpanId();
        sendCache.put(span.getSpanId(), cache);
        ContextManager.continued(span);
      }
    
      public void onConsume(@Advice.Local("consumeCache") ConsumeCache cache,
                            @FieldName(className = "org.apache.activemq.command.ActiveMQDestination", value = "destinationType") byte destinationType,
                            @FieldValue(className = "org.apache.activemq.command.ActiveMQDestination", value = "physicalName") String physicalName,
                            @TargetObject Object messageConsumer) {
        Span span = ContextManager.createEntrySpan(CONSUME_OPERATION_NAME, new ActiveMqConsumerPeer((MessageConsumer) messageConsumer));
        if (span == null) {
          return;
        }
        cache.brokerUrl = JmsHelper.getBrokerUrlFromConnection(JmsHelper.getJmsConnectionFromSession(JmsHelper.getSessionFromConsumer((MessageConsumer) messageConsumer)));
        cache.destination = physicalName;
        cache.spanId = span.getSpanId();
        consumeCache.put(span.getSpanId(), cache);
        ContextManager.continued(span);
      }
    
      static class SendCache {
        String brokerUrl;
        String destination;
        int size;
        int spanId;
      }
    
      static class ConsumeCache {
        String brokerUrl;
        String destination;
        int spanId;
      }
    }