您的位置:

手写rpc实现详解

一、手写rpc框架

RPC(Remote Procedure Call,远程过程调用),是一种用于客户端和服务器端进行通信的协议。通过RPC可以实现跨语言、跨平台、不同机器之间的服务调用,让远程的服务器和本地的客户端就像本地调用一样方便地进行通信。手写rpc框架的实现,可以让我们更加深入地了解RPC背后的原理,同时也能够提高我们的编程能力。

下面,我们将从手写rpc框架的搭建和实现两个方面,展开对手写rpc的详细阐述。

二、手写rpc框架搭建

1、搭建服务端

在搭建服务端时,我们需要考虑如何处理请求、如何暴露服务、如何启动服务等问题。

private Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

public void start() throws IOException {
    ServerSocket serverSocket = new ServerSocket(port);
    while (true) {
        Socket socket = serverSocket.accept();
        executor.execute(new RequestHandlerRunnable(socket, service));
    }
}

private static class RequestHandlerRunnable implements Runnable {
    private final Socket socket;
    private final Object service;

    private RequestHandlerRunnable(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        try {
            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
            try {
                String methodName = input.readUTF();
                Class[] parameterTypes = (Class[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();

                Method method = service.getClass().getMethod(methodName, parameterTypes);
                Object result = method.invoke(service, arguments);

                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
            } catch (Throwable t) {
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(t);
            } finally {
                input.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、搭建客户端

在搭建客户端时,我们需要考虑如何处理请求、如何发送请求、如何接收返回值等问题。

public class RpcClient {
    private final String host;
    private final int port;

    public RpcClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object call(String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
        Socket socket = new Socket(host, port);
        try {
            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
            output.writeUTF(methodName);
            output.writeObject(parameterTypes);
            output.writeObject(arguments);

            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
            Object result = input.readObject();

            if (result instanceof Throwable) {
                throw (Throwable) result;
            }
            return result;
        } finally {
            socket.close();
        }
    }
}

三、手写rpc框架实现

1、服务注册与发现

服务注册和发现是指将提供的服务注册到注册中心,并通过查询注册中心获取已注册服务的IP和端口,从而实现客户端的远程调用。

public interface ServiceRegistry {
    /**
     * Register a service to the registry with its host and port
     *
     * @param serviceName  the name of service to register
     * @param serviceHost  the host of the service provider
     * @param servicePort  the port of the service provider
     */
    void register(String serviceName, String serviceHost, int servicePort) throws Exception;

    /**
     * Lookup for available service providers of the {@code serviceName}
     *
     * @param serviceName the name of service that given the service providers provides
     * @return a list of all service providers' {@link ServiceEndPoint} of the given service name
     */
    List lookup(String serviceName) throws Exception;
}

  

2、动态代理和序列化

客户端利用动态代理,将需要远程调用的接口动态生成代码,实现远程接口的本地代理。在本地代理方法中,需要将接口、方法、参数等信息序列化并发送到服务端,同时需要接收服务端返回的数据,并进行反序列化,将结果返回给调用方。

public class RpcProxy implements InvocationHandler, Serializable {
    private static final long serialVersionUID = 9130440123381758788L;
    private final Class
    interfaceClazz;
    private final String serviceName;

    public RpcProxy(Class
     interfaceClazz, String serviceName) {
        this.interfaceClazz = interfaceClazz;
        this.serviceName = serviceName;
    }

    @SuppressWarnings("unchecked")
    public T getProxy() {
        return (T) Proxy.newProxyInstance(interfaceClazz.getClassLoader(), new Class[] {interfaceClazz}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl();
        InetSocketAddress serviceAddress = serviceDiscovery.lookup(serviceName);

        try {
            RpcClient rpcClient = new RpcClient(serviceAddress.getAddress().getHostAddress(), serviceAddress.getPort());
            RpcRequest request = new RpcRequest();
            request.setServiceName(serviceName);
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setArguments(args);

            return rpcClient.call(request);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

public class RpcDecoder extends ByteToMessageDecoder {
    private Class genericClass;

    public RpcDecoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List
      out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }

        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }
}

public class RpcEncoder extends MessageToByteEncoder {
    private Class genericClass;

    public RpcEncoder(Class genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}


以上是手写rpc框架的主要实现内容,注释较为详细,可供参考。

文章目录
顶部