随着微服务的流行,为微服务提供支持的各种组件也日益丰富,而Consul就是其中一款常用的服务注册和发现工具。.NET Core Consul是一套基于.NET Core的API,提供了对Consul注册中心的简便访问,使用.NET Core Consul可以轻松地实现服务的注册、发现和负载均衡等操作。
一、Consul基础概念介绍
在使用.NET Core Consul前,先对Consul的基础概念做简单介绍:
- 服务注册:在Consul上注册服务,Consul会使用心跳机制检查服务是否健康。
- 服务发现:通过Consul的服务发现功能,可以实现轻松发现和访问已注册的服务。
- 健康检查:在服务注册时需要提供服务的健康检查地址,Consul会通过该地址定时发送检查请求,以检查服务的健康状况。
- 分布式锁:Consul提供了分布式锁,用于处理在分布式环境下的并发操作。
- 事件处理:Consul提供了事件处理机制,可以在特定的事件触发时执行预定的操作。
二、与.NET Core集成
使用.NET Core Consul需要先安装其依赖包——ConsulSharp。
dotnet add package ConsulSharp
然后在Startup.cs文件中进行配置:
services.AddSingleton<ConsulService>();
services.Configure<ConsulConfig>(Configuration.GetSection("ConsulConfig"));
这其中,ConsulConfig用于配置Consul的相关参数,可以在appsettings.json文件中定义:
{
"ConsulConfig": {
"Address": "http://localhost:8500",
"Datacenter": "dc1",
"Token": ""
}
}
三、服务注册
首先定义一个用于服务注册的接口:
public interface IServiceRegistry
{
Task<bool> RegisterService(ServiceRegistration registration);
}
其中,ServiceRegistration就是服务注册的相关信息,包括服务名称、IP、端口、健康检查地址等。下面是IServiceRegistry的具体实现:
public class ConsulServiceRegistry : IServiceRegistry
{
private readonly IConsulClient _consul;
private readonly ConsulConfig _consulConfig;
public ConsulServiceRegistry(IOptions<ConsulConfig> consulConfig, IConsulClient consul)
{
_consul = consul;
_consulConfig = consulConfig.Value;
}
public async Task<bool> RegisterService(ServiceRegistration registration)
{
var registrationRequest = new AgentServiceRegistration
{
Name = registration.Name,
ID = registration.ID,
Address = registration.Address,
Port = registration.Port,
Check = new AgentServiceCheck
{
HTTP = registration.HealthCheckURL,
Interval = TimeSpan.FromSeconds(registration.HealthCheckIntervalSeconds),
Timeout = TimeSpan.FromSeconds(registration.HealthCheckTimeoutSeconds)
}
};
var result = await _consul.Agent.ServiceRegister(registrationRequest);
return result.StatusCode == HttpStatusCode.OK;
}
}
使用IServiceRegistry进行服务注册:
var registration = new ServiceRegistration
{
Name = "serviceName",
ID = "serviceId",
Address = "localhost",
Port = 5000,
HealthCheckIntervalSeconds = 10,
HealthCheckTimeoutSeconds = 5,
HealthCheckURL = "http://localhost:5000/health"
};
await _serviceRegistry.RegisterService(registration);
四、服务发现
Consul提供三种服务发现机制:DNS、HTTP和gRPC,其中HTTP方式最为常用,下面以此为例讲解。
定义IServiceDiscovery接口:
public interface IServiceDiscovery
{
Task<List<Uri>> DiscoverServiceUri(string serviceName);
}
ConsulServiceDiscovery的实现:
public class ConsulServiceDiscovery : IServiceDiscovery
{
private readonly IConsulClient _consul;
private readonly ConsulConfig _consulConfig;
public ConsulServiceDiscovery(IOptions<ConsulConfig> consulConfig, IConsulClient consul)
{
_consul = consul;
_consulConfig = consulConfig.Value;
}
public async Task<List<Uri>> DiscoverServiceUri(string serviceName)
{
var queryResult = await _consul.Catalog.Service(serviceName);
return queryResult.Response.Select(service => new Uri($"http://{service.ServiceAddress}:{service.ServicePort}/")).ToList();
}
}
使用IServiceDiscovery进行服务发现:
var serviceUris = await _serviceDiscovery.DiscoverServiceUri("serviceName");
五、负载均衡
对于服务集群,常常需要进行负载均衡,.NET Core Consul提供了两种负载均衡方式:随机模式和轮询模式。
定义IServiceLoadBalancer接口:
public interface IServiceLoadBalancer
{
Task<Uri> ChooseServer(List<Uri> servers);
}
ConsulServiceLoadBalancer的实现:
public class ConsulServiceLoadBalancer : IServiceLoadBalancer
{
private readonly ConcurrentDictionary<string, int> _indexDictionary = new ConcurrentDictionary<string, int>();
public async Task<Uri> ChooseServer(List<Uri> servers)
{
var serviceName = servers.First().AbsolutePath.Split('/')[1];
var index = _indexDictionary.AddOrUpdate(serviceName, 0, (_, i) => (i + 1) % servers.Count);
return servers[index];
}
}
使用IServiceLoadBalancer进行负载均衡:
var chosenServer = await _serviceLoadBalancer.ChooseServer(serviceUris);
六、完整示例代码
以下是一个演示.NET Core Consul的完整示例代码:
using Consul;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace Sample
{
public class ConsulServiceRegistry : IServiceRegistry
{
private readonly IConsulClient _consul;
private readonly ConsulConfig _consulConfig;
public ConsulServiceRegistry(IOptions<ConsulConfig> consulConfig, IConsulClient consul)
{
_consul = consul;
_consulConfig = consulConfig.Value;
}
public async Task<bool> RegisterService(ServiceRegistration registration)
{
var registrationRequest = new AgentServiceRegistration
{
Name = registration.Name,
ID = registration.ID,
Address = registration.Address,
Port = registration.Port,
Check = new AgentServiceCheck
{
HTTP = registration.HealthCheckURL,
Interval = TimeSpan.FromSeconds(registration.HealthCheckIntervalSeconds),
Timeout = TimeSpan.FromSeconds(registration.HealthCheckTimeoutSeconds)
}
};
var result = await _consul.Agent.ServiceRegister(registrationRequest);
return result.StatusCode == HttpStatusCode.OK;
}
}
public class ConsulServiceDiscovery : IServiceDiscovery
{
private readonly IConsulClient _consul;
private readonly ConsulConfig _consulConfig;
public ConsulServiceDiscovery(IOptions<ConsulConfig> consulConfig, IConsulClient consul)
{
_consul = consul;
_consulConfig = consulConfig.Value;
}
public async Task<List<Uri>> DiscoverServiceUri(string serviceName)
{
var queryResult = await _consul.Catalog.Service(serviceName);
return queryResult.Response.Select(service => new Uri($"http://{service.ServiceAddress}:{service.ServicePort}/")).ToList();
}
}
public class ConsulServiceLoadBalancer : IServiceLoadBalancer
{
private readonly ConcurrentDictionary<string, int> _indexDictionary = new ConcurrentDictionary<string, int>();
public async Task<Uri> ChooseServer(List<Uri> servers)
{
var serviceName = servers.First().AbsolutePath.Split('/')[1];
var index = _indexDictionary.AddOrUpdate(serviceName, 0, (_, i) => (i + 1) % servers.Count);
return servers[index];
}
}
public interface IServiceRegistry
{
Task<bool> RegisterService(ServiceRegistration registration);
}
public interface IServiceDiscovery
{
Task<List<Uri>> DiscoverServiceUri(string serviceName);
}
public interface IServiceLoadBalancer
{
Task<Uri> ChooseServer(List<Uri> servers);
}
public class ServiceRegistration
{
public string ID { get; set; }
public string Name { get; set; }
public string Address { get; set; }
public int Port { get; set; }
public string HealthCheckURL { get; set; }
public int HealthCheckIntervalSeconds { get; set; } = 10;
public int HealthCheckTimeoutSeconds { get; set; } = 5;
}
public class ConsulConfig
{
public string Address { get; set; }
public string Datacenter { get; set; }
public string Token { get; set; }
}
public class ConsulService
{
private readonly IServiceRegistry _serviceRegistry;
private readonly IServiceDiscovery _serviceDiscovery;
private readonly IServiceLoadBalancer _serviceLoadBalancer;
public ConsulService(IServiceRegistry serviceRegistry, IServiceDiscovery serviceDiscovery, IServiceLoadBalancer serviceLoadBalancer)
{
_serviceRegistry = serviceRegistry;
_serviceDiscovery = serviceDiscovery;
_serviceLoadBalancer = serviceLoadBalancer;
}
public async Task RegisterService(ServiceRegistration registration)
{
await _serviceRegistry.RegisterService(registration);
}
public async Task<Uri> GetServiceUri(string serviceName)
{
var serviceUris = await _serviceDiscovery.DiscoverServiceUri(serviceName);
var chosenServer = await _serviceLoadBalancer.ChooseServer(serviceUris);
return chosenServer;
}
}
class Program
{
static async Task Main(string[] args)
{
var consulConfig = new ConsulConfig
{
Address = "http://localhost:8500",
Datacenter = "dc1",
Token = ""
};
using var consul = new ConsulClient(config =>
{
config.Address = new Uri(consulConfig.Address);
});
var serviceRegistry = new ConsulServiceRegistry(Options.Create(consulConfig), consul);
var serviceDiscovery = new ConsulServiceDiscovery(Options.Create(consulConfig), consul);
var serviceLoadBalancer = new ConsulServiceLoadBalancer();
var consulService = new ConsulService(serviceRegistry, serviceDiscovery, serviceLoadBalancer);
var registration = new ServiceRegistration
{
Name = "serviceName",
ID = "serviceId",
Address = "localhost",
Port = 5000,
HealthCheckIntervalSeconds = 10,
HealthCheckTimeoutSeconds = 5,
HealthCheckURL = "http://localhost:5000/health"
};
await consulService.RegisterService(registration);
var serviceUri = await consulService.GetServiceUri("serviceName");
Console.WriteLine(serviceUri);
}
}
}