精华内容
下载资源
问答
  • 一起写个Dubbo——1. 一最简单的实现

    千次阅读 多人点赞 2020-06-26 17:53:05
    面试问你RPC,一起写个Dubbo吧! 一起写个Dubbo第一章,一最简单的RPC框架实现

    本文原载于我的博客,地址:https://blog.guoziyang.top/archives/61/,项目地址:https://github.com/CN-GuoZiyang/My-RPC-Framework

    本章对应的commit为73aa960,完整项目为https://github.com/CN-GuoZiyang/My-RPC-Framework/tree/73aa960b0c457770859f81a3210de56370862439

    思路

    用(抄)一下Guide哥的一张图:

    RPC框架思路

    那么我们首先要思考,RPC框架的原理。

    原理很简单,客户端和服务端都可以访问到通用的接口,但是只有服务端有这个接口的实现类,客户端调用这个接口的方式,是通过网络传输,告诉服务端我要调用这个接口,服务端收到之后找到这个接口的实现类,并且执行,将执行的结果返回给客户端,作为客户端调用接口方法的返回值。

    原理很简单,但是实现值得商榷,例如客户端怎么知道服务端的地址?客户端怎么告诉服务端我要调用的接口?客户端怎么传递参数?只有接口客户端怎么生成实现类……等等等等。

    这一章,我们就来探讨一个最简单的实现。一个最简单的实现,基于这样一个假设,那就是客户端已经知道了服务端的地址,这部分会由后续的服务发现机制完善。

    通用接口

    我们先把通用的接口写好,然后再来看怎么实现客户端和服务端。

    接口如下:

    public interface HelloService {
        String hello(HelloObject object);
    }
    

    hello方法需要传递一个对象,HelloObject对象,定义如下:

    @Data
    @AllArgsConstructor
    public class HelloObject implements Serializable {
        private Integer id;
        private String message;
    }
    

    注意这个对象需要实现Serializable接口,因为它需要在调用过程中从客户端传递给服务端。

    接着我们在服务端对这个接口进行实现,实现的方式也很简单,返回一个字符串就行:

    public class HelloServiceImpl implements HelloService {
        private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
        @Override
        public String hello(HelloObject object) {
            logger.info("接收到:{}", object.getMessage());
            return "这是掉用的返回值,id=" + object.getId();
        }
    }
    

    传输协议

    严格来说,这并不能算是协议……但也大致算一个传输格式吧。

    我们来思考一下,服务端需要哪些信息,才能唯一确定服务端需要调用的接口的方法呢?

    首先,就是接口的名字,和方法的名字,但是由于方法重载的缘故,我们还需要这个方法的所有参数的类型,最后,客户端调用时,还需要传递参数的实际值,那么服务端知道以上四个条件,就可以找到这个方法并且调用了。我们把这四个条件写到一个对象里,到时候传输时传输这个对象就行了。即RpcRequest对象:

    @Data
    @Builder
    public class RpcRequest implements Serializable {
        /**
         * 待调用接口名称
         */
        private String interfaceName;
        /**
         * 待调用方法名称
         */
        private String methodName;
        /**
         * 调用方法的参数
         */
        private Object[] parameters;
        /**
         * 调用方法的参数类型
         */
        private Class<?>[] paramTypes;
    }
    

    参数类型我是直接使用Class对象,其实用字符串也是可以的。

    那么服务器调用完这个方法后,需要给客户端返回哪些信息呢?如果调用成功的话,显然需要返回值,如果调用失败了,就需要失败的信息,这里封装成一个RpcResponse对象:

    @Data
    public class RpcResponse<T> implements Serializable {
        /**
         * 响应状态码
         */
        private Integer statusCode;
        /**
         * 响应状态补充信息
         */
        private String message;
        /**
         * 响应数据
         */
        private T data;
      
        public static <T> RpcResponse<T> success(T data) {
            RpcResponse<T> response = new RpcResponse<>();
            response.setStatusCode(ResponseCode.SUCCESS.getCode());
            response.setData(data);
            return response;
        }
        public static <T> RpcResponse<T> fail(ResponseCode code) {
            RpcResponse<T> response = new RpcResponse<>();
            response.setStatusCode(code.getCode());
            response.setMessage(code.getMessage());
            return response;
        }
    }
    

    这里还多写了两个静态方法,用于快速生成成功与失败的响应对象。其中,statusCode属性可以自行定义,客户端服务端一致即可。

    客户端的实现——动态代理

    客户端方面,由于在客户端这一侧我们并没有接口的具体实现类,就没有办法直接生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法时生成需要的RpcRequest对象并且发送给服务端。

    这里我们采用JDK动态代理,代理类是需要实现InvocationHandler接口的。

    public class RpcClientProxy implements InvocationHandler {
        private String host;
        private int port;
    
        public RpcClientProxy(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Class<T> clazz) {
            return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
        }
    }
    

    我们需要传递host和port来指明服务端的位置。并且使用getProxy()方法来生成代理对象。

    InvocationHandler接口需要实现invoke()方法,来指明代理对象的方法被调用时的动作。在这里,我们显然就需要生成一个RpcRequest对象,发送出去,然后返回从服务端接收到的结果即可:

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            RpcRequest rpcRequest = RpcRequest.builder()
                    .interfaceName(method.getDeclaringClass().getName())
                    .methodName(method.getName())
                    .parameters(args)
                    .paramTypes(method.getParameterTypes())
                    .build();
            RpcClient rpcClient = new RpcClient();
            return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
        }
    

    生成RpcRequest很简单,我使用Builder模式来生成这个对象。发送的逻辑我使用了一个RpcClient对象来实现,这个对象的作用,就是将一个对象发过去,并且接受返回的对象。

    public class RpcClient {
    
        private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
    
        public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
            try (Socket socket = new Socket(host, port)) {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                objectOutputStream.writeObject(rpcRequest);
                objectOutputStream.flush();
                return objectInputStream.readObject();
            } catch (IOException | ClassNotFoundException e) {
                logger.error("调用时有错误发生:", e);
                return null;
            }
        }
    }
    

    我的实现很简单,直接使用Java的序列化方式,通过Socket传输。创建一个Socket,获取ObjectOutputStream对象,然后把需要发送的对象传进去即可,接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。

    服务端的实现——反射调用

    服务端的实现就简单多了,使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池:

    public class RpcServer {
    
        private final ExecutorService threadPool;
        private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    
        public RpcServer() {
            int corePoolSize = 5;
            int maximumPoolSize = 50;
            long keepAliveTime = 60;
            BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
            ThreadFactory threadFactory = Executors.defaultThreadFactory();
            threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
        }
      
    }
    

    这里简化了一下,RpcServer暂时只能注册一个接口,即对外提供一个接口的调用服务,添加register方法,在注册完一个服务后立刻开始监听:

        public void register(Object service, int port) {
            try (ServerSocket serverSocket = new ServerSocket(port)) {
                logger.info("服务器正在启动...");
                Socket socket;
                while((socket = serverSocket.accept()) != null) {
                    logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                    threadPool.execute(new WorkerThread(socket, service));
                }
            } catch (IOException e) {
                logger.error("连接时有错误发生:", e);
            }
        }
    

    这里向工作线程WorkerThread传入了socket和用于服务端实例service。

    WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去。run方法如下:

        @Override
        public void run() {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
                RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
                Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
                Object returnObject = method.invoke(service, rpcRequest.getParameters());
                objectOutputStream.writeObject(RpcResponse.success(returnObject));
                objectOutputStream.flush();
            } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
                logger.error("调用或发送时有错误发生:", e);
            }
        }
    

    其中,通过class.getMethod方法,传入方法名和方法参数类型即可获得Method对象。如果你上面RpcRequest中使用String数组来存储方法参数类型的话,这里你就需要通过反射生成对应的Class数组了。通过method.invoke方法,传入对象实例和参数,即可调用并且获得返回值。

    测试

    服务端侧,我们已经在上面实现了一个HelloService的实现类HelloServiceImpl的实现类了,我们只需要创建一个RpcServer并且把这个实现类注册进去就行了:

    public class TestServer {
        public static void main(String[] args) {
            HelloService helloService = new HelloServiceImpl();
            RpcServer rpcServer = new RpcServer();
            rpcServer.register(helloService, 9000);
        }
    }
    

    服务端开放在9000端口。

    客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的:

    public class TestClient {
        public static void main(String[] args) {
            RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
            HelloService helloService = proxy.getProxy(HelloService.class);
            HelloObject object = new HelloObject(12, "This is a message");
            String res = helloService.hello(object);
            System.out.println(res);
        }
    }
    

    我们这里生成了一个HelloObject对象作为方法的参数。

    首先启动服务端,再启动客户端,服务端输出:

    服务器正在启动...
    客户端连接!Ip为:127.0.0.1
    接收到:This is a message
    

    客户端输出:

    这是调用的返回值,id=12
    

    欢迎关注我的微信公众号:楚狂声哥,更新各种有深度的八股文!

    展开全文
  • 一起写个Dubbo——0. 一些不得不说的话

    千次阅读 多人点赞 2020-06-26 17:49:54
    面试问你RPC?一起写个Dubbo吧! 一RPC框架的渐进式实现教程

    前言

    本文原载于我的博客,地址:https://blog.guoziyang.top/archives/60/,项目地址:https://github.com/CN-GuoZiyang/My-RPC-Framework

    差不多五月份左右,我开始准备秋招。除了每天刷题或者复习,就得开始准备准备项目了。因为我主要是Java后端,项目是很好找的,主要就是各种管理系统和商城项目。于此同时,我也在思考,怎么可以准备一个更加出彩的、与众不同的项目,可以直接引导你与面试官的聊天方向?直到,我看到了Guide哥RPC框架,眼前一亮。

    众所周知,用框架和写框架所需要的对框架的熟悉程度显然不在一个级别。如果你能亲手写一个RPC框架,那么几乎所有的RPC相关的问题就很难问倒你了(除非是具体到某个框架)。于是,我开始研究这个项目。

    苦于Guide哥没有写教程,我只能按照commits历史,一点一点揣测每一点变化的意义,并且临摹下来(照葫芦画瓢),当然,在临摹过程中,我还是有很多自己的想法的,例如自定义协议、以及序列化与负载均衡算法可配置、注解方式服务发现等。感兴趣的同学可以照着这个教程自行完成一个。

    介绍

    介绍其实没有什么,主要还是Readme里那一套,详见上一章Readme。

    这个教程和Guide哥的commits一样,是循序渐进的,从一个最简单的BIO + Java序列化开始,逐步完善成Netty + 多序列化方式的比较完整的框架,并且配置了Nacos服务发现(没用Zookeeper的原因仅仅是因为我不会)。

    在每一章节的开头,我都会放出该章节对应的commit地址,方便查看代码。

    整个项目位于https://github.com/CN-GuoZiyang/My-RPC-Framework

    项目结构

    项目结构

    • docs文件夹:这个文档的源文件
    • images文件夹:Readme所用到的图片(其实只有一张)
    • rpc-api文件夹:服务端与客户端的公共调用接口
    • rpc-common文件夹:项目中的一些通用的枚举类和工具类
    • rpc-core文件夹:框架的核心实现
    • test-client文件夹:测试用的客户端项目
    • test-server文件夹:测试用的服务端项目
    • .gitignore:就是.gitignore
    • .travis.yml:持续集成的脚本(其实什么也没干)
    • LICENSE:基于MIT开源协议哦
    • README.md:就是Readme
    • pom.xml:项目的总的pom

    开发环境

    • macOS Catalina 10.15.5
    • Java SE 1.8.0_231
    • JetBrain IntelliJ IDEA

    欢迎关注我的微信公众号:楚狂声哥,更新各种有深度的八股文!

    展开全文
  • 一起写个 Dubbo 第六节,实现服务自动注销和负载均衡策略

    本文原载于我的博客,地址:https://blog.guoziyang.top/archives/67/,项目地址:https://github.com/CN-GuoZiyang/My-RPC-Framework

    本文对应的 commit 截止到 cab84e1,完整的项目目录

    本节主要分为两部分,自动注销和负载均衡。

    服务自动注销

    上一节我们实现了服务的自动注册和发现,但是有些细心的同学就可能会发现,如果你启动完成服务端后把服务端给关闭了,并不会自动地注销 Nacos 中对应的服务信息,这样就导致了当客户端再次向 Nacos 请求服务时,会获取到已经关闭的服务端信息,最终就有可能因为连接不到服务器而调用失败。

    那么我们就需要一种办法,在服务端关闭之前自动向 Nacos 注销服务。但是有一个问题,我们不知道什么时候服务器会关闭,也就不知道这个方法调用的时机,就没有办法手工去调用。这时,我们就需要钩子。

    钩子是什么呢?是在某些事件发生后自动去调用的方法。那么我们只需要把注销服务的方法写到关闭系统的钩子方法里就行了。

    首先先写向 Nacos 注销所有服务的方法,这部分被放在了 NacosUtils 中作为一个静态方法,NacosUtils 是一个 Nacos 相关的工具类:

        public static void clearRegistry() {
            if(!serviceNames.isEmpty() && address != null) {
                String host = address.getHostName();
                int port = address.getPort();
                Iterator<String> iterator = serviceNames.iterator();
                while(iterator.hasNext()) {
                    String serviceName = iterator.next();
                    try {
                        namingService.deregisterInstance(serviceName, host, port);
                    } catch (NacosException e) {
                        logger.error("注销服务 {} 失败", serviceName, e);
                    }
                }
            }
        }
    

    所有的服务名称都被存储在 NacosUtils 类中的 serviceNames 中,在注销时只需要用迭代器迭代所有服务名,调用 deregisterInstance 即可。

    接着就是钩子了,新建一个类,ShutdownHook:

    public class ShutdownHook {
    
        private static final Logger logger = LoggerFactory.getLogger(ShutdownHook.class);
    
        private final ExecutorService threadPool = ThreadPoolFactory.createDefaultThreadPool("shutdown-hook");
        private static final ShutdownHook shutdownHook = new ShutdownHook();
    
        public static ShutdownHook getShutdownHook() {
            return shutdownHook;
        }
    
        public void addClearAllHook() {
            logger.info("关闭后将自动注销所有服务");
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                NacosUtil.clearRegistry();
                threadPool.shutdown();
            }));
        }
    
    }
    

    使用了单例模式创建其对象,在 addClearAllHook 中,Runtime 对象是 JVM 虚拟机的运行时环境,调用其 addShutdownHook 方法增加一个钩子函数,创建一个新线程调用 clearRegistry 方法完成注销工作。这个钩子函数会在 JVM 关闭之前被调用。

    这样在 RpcServer 启动之前,只需要调用 addClearAllHook,就可以注册这个钩子了。例如在 NettyServer 中:

                 ChannelFuture future = serverBootstrap.bind(host, port).sync();
    +            ShutdownHook.getShutdownHook().addClearAllHook();
                 future.channel().closeFuture().sync();
    

    启动服务端后再关闭,就会发现 Nacos 中的注册信息都被注销了。

    负载均衡策略

    负载均衡大家应该都熟悉,在上一节中客户端在 lookupService 方法中,从 Nacos 获取到的是所有提供这个服务的服务端信息列表,我们就需要从中选择一个,这便涉及到客户端侧的负载均衡策略。我们新建一个接口:LoadBalancer:

    public interface LoadBalancer {
        Instance select(List<Instance> instances);
    }
    

    接口中的 select 方法用于从一系列 Instance 中选择一个。这里我就实现两个比较经典的算法:随机和转轮。

    随机算法顾名思义,就是随机选一个,毫无技术含量:

    public class RandomLoadBalancer implements LoadBalancer {
    
        @Override
        public Instance select(List<Instance> instances) {
            return instances.get(new Random().nextInt(instances.size()));
        }
    
    }
    

    而转轮算法大家也应该了解,按照顺序依次选择第一个、第二个、第三个……这里就需要一个变量来表示当前选到了第几个:

    public class RoundRobinLoadBalancer implements LoadBalancer {
    
        private int index = 0;
    
        @Override
        public Instance select(List<Instance> instances) {
            if(index >= instances.size()) {
                index %= instances.size();
            }
            return instances.get(index++);
        }
    
    }
    

    index 就表示当前选到了第几个服务器,并且每次选择后都会自增一。

    最后在 NacosServiceRegistry 中集成就可以了,这里选择外部传入的方式传入 LoadBalancer:

    public class NacosServiceDiscovery implements ServiceDiscovery {
        private final LoadBalancer loadBalancer;
    
        public NacosServiceDiscovery(LoadBalancer loadBalancer) {
            if(loadBalancer == null) this.loadBalancer = new RandomLoadBalancer();
            else this.loadBalancer = loadBalancer;
        }
        
        public InetSocketAddress lookupService(String serviceName) {
            try {
                List<Instance> instances = NacosUtil.getAllInstance(serviceName);
                Instance instance = loadBalancer.select(instances);
                return new InetSocketAddress(instance.getIp(), instance.getPort());
            } catch (NacosException e) {
                logger.error("获取服务时有错误发生:", e);
            }
            return null;
        }
    }
    

    而这个负载均衡策略,也可以在创建客户端时指定,例如无参构造 NettyClient 时就用默认的策略,也可以有参构造传入策略,具体的实现留给大家。

    欢迎关注我的微信公众号:楚狂声哥,更新各种有深度的八股文!

    展开全文
  • 一起写个Dubbo——2. 注册多服务

    千次阅读 多人点赞 2020-08-02 16:17:07
    本文原载于我的博客,地址:...上一节中,我们使用 JDK 序列化和 Socket 实现了一最基本的 RPC 框架,服务端测试时是这样的: public class TestServer { public static void main(String[] arg

    本文原载于我的博客,地址:https://blog.guoziyang.top/archives/63/,项目地址:https://github.com/CN-GuoZiyang/My-RPC-Framework

    本文对应的commit为8467b19,完整的项目目录

    上一节中,我们使用 JDK 序列化和 Socket 实现了一个最基本的 RPC 框架,服务端测试时是这样的:

    public class TestServer {
        public static void main(String[] args) {
            HelloService helloService = new HelloServiceImpl();
            RpcServer rpcServer = new RpcServer();
            rpcServer.register(helloService, 9000);
        }
    }
    

    在注册完 helloService 后,服务器就自行启动了,也就是说,一个服务器只能注册一个服务,这个设计非常不好(毕竟是简易实现)。这一节,我们就将服务的注册和服务器启动分离,使得服务端可以提供多个服务。

    服务注册表

    我们需要一个容器,这个容器很简单,就是保存一些本地服务的信息,并且在获得一个服务名字的时候能够返回这个服务的信息。创建一个 ServiceRegistry 接口:

    package top.guoziyang.rpc.registry;
    
    public interface ServiceRegistry {
        <T> void register(T service);
        Object getService(String serviceName);
    }
    

    一目了然,一个register注册服务信息,一个getService获取服务信息。

    我们新建一个默认的注册表类 DefaultServiceRegistry 来实现这个接口,提供服务注册服务,如下:

    public class DefaultServiceRegistry implements ServiceRegistry {
    
        private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class);
    
        private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
        private final Set<String> registeredService = ConcurrentHashMap.newKeySet();
    
        @Override
        public synchronized <T> void register(T service) {
            String serviceName = service.getClass().getCanonicalName();
            if(registeredService.contains(serviceName)) return;
            registeredService.add(serviceName);
            Class<?>[] interfaces = service.getClass().getInterfaces();
            if(interfaces.length == 0) {
                throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
            }
            for(Class<?> i : interfaces) {
                serviceMap.put(i.getCanonicalName(), service);
            }
            logger.info("向接口: {} 注册服务: {}", interfaces, serviceName);
        }
    
        @Override
        public synchronized Object getService(String serviceName) {
            Object service = serviceMap.get(serviceName);
            if(service == null) {
                throw new RpcException(RpcError.SERVICE_NOT_FOUND);
            }
            return service;
        }
    }
    

    我们将服务名与提供服务的对象的对应关系保存在一个 ConcurrentHashMap 中,并且使用一个 Set 来保存当前有哪些对象已经被注册。在注册服务时,默认采用这个对象实现的接口的完整类名作为服务名,例如某个对象 A 实现了接口 X 和 Y,那么将 A 注册进去后,会有两个服务名 X 和 Y 对应于 A 对象。这种处理方式也就说明了某个接口只能有一个对象提供服务。

    获得服务的对象就更简单了,直接去 Map 里查找就行了。

    其他处理

    为了降低耦合度,我们不会把 ServiceRegistry 和某一个 RpcServer 绑定在一起,而是在创建 RpcServer 对象时,传入一个 ServiceRegistry 作为这个服务的注册表。

    那么 RpcServer 这个类现在就变成了这样:

    public class RpcServer {
    
        private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    
        private static final int CORE_POOL_SIZE = 5;
        private static final int MAXIMUM_POOL_SIZE = 50;
        private static final int KEEP_ALIVE_TIME = 60;
        private static final int BLOCKING_QUEUE_CAPACITY = 100;
        private final ExecutorService threadPool;
        private RequestHandler requestHandler = new RequestHandler();
        private final ServiceRegistry serviceRegistry;
    
        public RpcServer(ServiceRegistry serviceRegistry) {
            this.serviceRegistry = serviceRegistry;
            BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
            ThreadFactory threadFactory = Executors.defaultThreadFactory();
            threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workingQueue, threadFactory);
        }
    
        public void start(int port) {
            try (ServerSocket serverSocket = new ServerSocket(port)) {
                logger.info("服务器启动……");
                Socket socket;
                while((socket = serverSocket.accept()) != null) {
                    logger.info("消费者连接: {}:{}", socket.getInetAddress(), socket.getPort());
                    threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
                }
                threadPool.shutdown();
            } catch (IOException e) {
                logger.error("服务器启动时有错误发生:", e);
            }
        }
    }
    

    在创建 RpcServer 时需要传入一个已经注册好服务的 ServiceRegistry,而原来的 register 方法也被改成了 start 方法,因为服务的注册已经不由 RpcServer 处理了,它只需要启动就行了。

    而在每一个请求处理线程(RequestHandlerThread)中也就需要传入 ServiceRegistry 了,这里把处理线程和处理逻辑分成了两个类:RequestHandlerThread 只是一个线程,从ServiceRegistry 获取到提供服务的对象后,就会把 RpcRequest 和服务对象直接交给 RequestHandler 去处理,反射等过程被放到了 RequestHandler 里。

    RequesthandlerThread.java:处理线程,接受对象等

    public class RequestHandlerThread implements Runnable {
    
        private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);
    
        private Socket socket;
        private RequestHandler requestHandler;
        private ServiceRegistry serviceRegistry;
    
        public RequestHandlerThread(Socket socket, RequestHandler requestHandler, ServiceRegistry serviceRegistry) {
            this.socket = socket;
            this.requestHandler = requestHandler;
            this.serviceRegistry = serviceRegistry;
        }
    
        @Override
        public void run() {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
                RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
                String interfaceName = rpcRequest.getInterfaceName();
                Object service = serviceRegistry.getService(interfaceName);
                Object result = requestHandler.handle(rpcRequest, service);
                objectOutputStream.writeObject(RpcResponse.success(result));
                objectOutputStream.flush();
            } catch (IOException | ClassNotFoundException e) {
                logger.error("调用或发送时有错误发生:", e);
            }
        }
    }
    

    RequestHandler.java:通过反射进行方法调用

    public class RequestHandler {
    
        private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
    
        public Object handle(RpcRequest rpcRequest, Object service) {
            Object result = null;
            try {
                result = invokeTargetMethod(rpcRequest, service);
                logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
            } catch (IllegalAccessException | InvocationTargetException e) {
                logger.error("调用或发送时有错误发生:", e);
            } return result;
        }
        private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws IllegalAccessException, InvocationTargetException {
            Method method;
            try {
                method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            } catch (NoSuchMethodException e) {
                return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
            }
            return method.invoke(service, rpcRequest.getParameters());
        }
    }
    

    在这种情况下,客户端完全不需要做任何改动。

    测试

    我比较懒,还是搞一个服务的,就是测试下兼容性而已(理论上没问题)。

    服务端的测试:

    public class TestServer {
        public static void main(String[] args) {
            HelloService helloService = new HelloServiceImpl();
            ServiceRegistry serviceRegistry = new DefaultServiceRegistry();
            serviceRegistry.register(helloService);
            RpcServer rpcServer = new RpcServer(serviceRegistry);
            rpcServer.start(9000);
        }
    }
    

    客户端不需要变动。

    执行后应当获得和上次相同的结果。

    这一节比较水,但是有不太好和别的章节合并,但又不能不讲……so,水一篇咯

    欢迎关注我的微信公众号:楚狂声哥,更新各种有深度的八股文!

    展开全文
  • 一起写个Dubbo——4. Kryo序列化

    千次阅读 2020-08-03 10:55:23
    上一节我们实现了一通用的序列化框架,使得序列化方式具有了较高的扩展性,并且实现了一基于 JSON 的序列化器。这一节我们就来实现一基于 Kryo 的序列化器。
  • 一起写个Dubbo——7. 服务端自动注册服务

    千次阅读 热门讨论 2020-08-04 17:04:21
    到目前为止,客户端看起来挺完美了,但是在服务端,我们却需要手动创建服务对象,并且手动进行注册,如果服务端提供了很多服务,这操作就会变得很繁琐。本节就会介绍如何基于注解进行服务的自动注册。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 19,875
精华内容 7,950
关键字:

一起写个dubbo