精华内容
下载资源
问答
  • 手写RPC

    2021-04-10 20:32:36
    手写RPC手写RPC1.RPC流程1.1 RPC设计流程1.2 底层调用流程2. RPC工程实现2.1 工程设计2.2 工程结构2.3 RPC之公用组件实现2.3.1 RPC接口注解2.3.2 接口交互信息封装2.3.3. protostuff序列化实现2.3.4. 其他公用封装...

    手写RPC

    1.RPC流程

    1.1 RPC设计流程

    在这里插入图片描述

    1.2 底层调用流程

    在这里插入图片描述

    2. RPC工程实现

    2.1 工程设计

    在这里插入图片描述
    模块说明:
    RPC-CLIENT(客户端工程): 主要包括服务订阅、订单服务接口的调用,以及负载均衡功能。
    RPC-COMMON(公用组件模块):主要包括RPC组件、序列化等公用组件。
    RPC-INTERFACE-API (RPC接口模块) :主要负责RPC接口的定义, 这里提供了订单服务接口用于
    测试。
    RPC-SERVER (RPC服务端) : 包括服务注册、编码解码、负责对订单服务接口的实现。

    2.2 工程结构

    1. 父级工程
      职责:
      根级工程, 统一管理所有工程的依赖配置。
      POM依赖:
    <dependencyManagement> 
    <dependencies> 
    <!-- zookeeper客户端组件依赖 --><dependency> 
    <groupId>com.github.sgroschupf</groupId> 
    <artifactId>zkclient</artifactId> 
    <version>0.1</version> 
    </dependency> 
    <!-- Netty 组件依赖 --> 
    <dependency> 
    <groupId>io.netty</groupId> 
    <artifactId>netty-all</artifactId> 
    <version>4.1.42.Final</version> 
    </dependency> 
    <!-- 实例化组件依赖 --> 
    <dependency> 
    <groupId>org.objenesis</groupId> 
    <artifactId>objenesis</artifactId> 
    <version>2.6</version> 
    </dependency> 
    <!-- protostuff 核心依赖 --> 
    <dependency> 
    <groupId>com.dyuproject.protostuff</groupId> 
    <artifactId>protostuff-core</artifactId> 
    <version>1.0.8</version> 
    </dependency> 
    <!-- protostuff 运行时依赖 --> 
    <dependency> 
    <groupId>com.dyuproject.protostuff</groupId> 
    <artifactId>protostuff-runtime</artifactId> 
    <version>1.0.8</version> 
    </dependency> 
    <!-- spring 上下文组件依赖 --> 
    <dependency> 
    <groupId>org.springframework</groupId> 
    <artifactId>spring-context</artifactId> 
    <version>${spring.version}</version> 
    </dependency> 
    <!-- lombok 代码简化依赖 --> 
    <dependency> 
    <groupId>org.projectlombok</groupId> 
    <artifactId>lombok</artifactId> 
    <version>1.16.22</version> 
    </dependency> 
    <!-- 日志组件依赖 --> 
    <dependency> 
    <groupId>org.slf4j</groupId> 
    <artifactId>slf4j-log4j12</artifactId> 
    <version>1.7.25</version> 
    </dependency> 
    <!-- Google Guava 核心扩展库--> 
    <dependency> 
    <groupId>com.google.guava</groupId> 
    <artifactId>guava</artifactId> 
    <version>19.0</version> 
    </dependency> 
    <!-- Apache 集合 扩展依赖 --> 
    <dependency> 
    <groupId>commons-collections</groupId> 
    <artifactId>commons-collections</artifactId> 
    <version>3.2.2</version></dependency> 
    <!-- Apache lang 包扩展依赖 --> 
    <dependency> 
    <groupId>org.apache.commons</groupId> 
    <artifactId>commons-lang3</artifactId> 
    <version>3.6</version> 
    </dependency> 
    <!-- Apache BeanUtils 辅助工具依赖 --> 
    <dependency> 
    <groupId>commons-beanutils</groupId> 
    <artifactId>commons-beanutils</artifactId> 
    <version>1.9.3</version> 
    </dependency> 
    <!-- cglib动态代理依赖--> 
    <dependency> 
    <groupId>cglib</groupId> 
    <artifactId>cglib</artifactId> 
    <version>3.1</version> 
    </dependency> 
    <!-- Java元数据分析反射依赖--> 
    <dependency> 
    <groupId>org.reflections</groupId> 
    <artifactId>reflections</artifactId> 
    <version>0.9.10</version> 
    </dependency> 
    </dependencies> 
    </dependencyManagement> 
    
    
    1. 公用组件工程
      在这里插入图片描述
      职责:
      主要封装公用的组件信息,RPC通用注解, 序列化组件等, 便于各工程模块的调用。
      POM依赖:
    <dependencies> 
    <!-- spring 上下文组件依赖 --> 
    <dependency> 
    <groupId>org.springframework</groupId> 
    <artifactId>spring-context</artifactId> 
    <version>${spring.version}</version></dependency> 
    <!-- 实例化组件依赖 --> 
    <dependency> 
    <groupId>org.objenesis</groupId> 
    <artifactId>objenesis</artifactId> 
    </dependency> 
    <!-- protostuff 核心依赖 --> 
    <dependency> 
    <groupId>com.dyuproject.protostuff</groupId> 
    <artifactId>protostuff-core</artifactId> 
    </dependency> 
    <!-- protostuff 运行时依赖 --> 
    <dependency> 
    <groupId>com.dyuproject.protostuff</groupId> 
    <artifactId>protostuff-runtime</artifactId> 
    </dependency> 
    <!-- Apache 集合 扩展依赖 --> 
    <dependency> 
    <groupId>commons-collections</groupId> 
    <artifactId>commons-collections</artifactId> 
    </dependency> 
    <!-- Apache lang 包扩展依赖 --> 
    <dependency> 
    <groupId>org.apache.commons</groupId> 
    <artifactId>commons-lang3</artifactId> 
    </dependency> 
    <!-- Apache BeanUtils 辅助工具依赖 --> 
    <dependency> 
    <groupId>commons-beanutils</groupId> 
    <artifactId>commons-beanutils</artifactId> 
    </dependency> 
    <!-- Google Guava 核心扩展库--> 
    <dependency> 
    <groupId>com.google.guava</groupId> 
    <artifactId>guava</artifactId> 
    </dependency> 
    <!-- 日志组件依赖 --> 
    <dependency> 
    <groupId>org.slf4j</groupId> 
    <artifactId>slf4j-log4j12</artifactId> 
    </dependency> 
    </dependencies> 
    
    
    1. 公用RPC接口工程
      在这里插入图片描述
      职责:
      负责RPC接口的定义, 由服务端去做具体实现, 客户端引入接口, 根据注册信息去调用对应服
      务。
      POM依赖
    <dependencies> 
    <!-- 公用模块依赖 --> 
    <dependency> 
    <groupId>com.itcast.rpc</groupId> 
    <artifactId>rpc-common</artifactId> 
    <version>${project.version}</version> 
    </dependency> 
    </dependencies> 
    
    
    1. 客户端工程
      在这里插入图片描述
      职责:
      负责RPC客户端的实现,包含服务订阅、动态代理、基于Netty的同步调用等。
      POM依赖:
    <dependencies> 
    <!-- RPC接口模块依赖 --> 
    <dependency> 
    <groupId>com.itcast.rpc</groupId> 
    <artifactId>rpc-interface-api</artifactId> 
    <version>${project.version}</version> 
    </dependency> 
    <!-- RPC公用组件依赖 --> 
    <dependency> 
    <groupId>com.itcast.rpc</groupId> 
    <artifactId>rpc-common</artifactId> 
    <version>${project.version}</version> 
    </dependency> 
    <!-- Netty 组件依赖 --> 
    <dependency> 
    <groupId>io.netty</groupId> 
    <artifactId>netty-all</artifactId> 
    </dependency> 
    <!-- zookeeper客户端组件依赖 --> 
    <dependency> 
    <groupId>com.github.sgroschupf</groupId> 
    <artifactId>zkclient</artifactId> 
    </dependency> 
    <!-- cglib动态代理依赖--> 
    <dependency> 
    <groupId>cglib</groupId> 
    <artifactId>cglib</artifactId> 
    </dependency> 
    <!-- Java元数据分析反射依赖--> 
    <dependency> 
    <groupId>org.reflections</groupId> 
    <artifactId>reflections</artifactId> 
    </dependency> 
    <!-- Spring Boot 组件依赖 --> 
    <dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-web</artifactId> 
    <version>${spring.boot.version}</version> 
    </dependency> 
    <!-- 日志组件依赖 --> 
    <dependency> 
    <groupId>org.slf4j</groupId> 
    <artifactId>slf4j-log4j12</artifactId> 
    </dependency> 
    </dependencies> 
    
    
    1. 服务端工程

    在这里插入图片描述
    职责:
    负责RPC服务端的实现, 包含服务的注册、RPC接口的实现、基于Netty的RPC服务端通讯等。
    POM依赖:

    <dependencies> 
    <!-- 公用RPC接口模块依赖 --> 
    <dependency> 
    <groupId>com.itcast.rpc</groupId> 
    <artifactId>rpc-interface-api</artifactId> 
    <version>${project.version}</version> 
    </dependency> 
    <!-- 公用封装组件依赖 --> 
    <dependency> 
    <groupId>com.itcast.rpc</groupId> 
    <artifactId>rpc-common</artifactId> 
    <version>${project.version}</version> 
    </dependency> 
    <!-- Netty 通讯依赖--> 
    <dependency> 
    <groupId>io.netty</groupId> 
    <artifactId>netty-all</artifactId> 
    </dependency> 
    <!-- zookeeper客户端依赖 --> 
    <dependency> 
    <groupId>com.github.sgroschupf</groupId> 
    <artifactId>zkclient</artifactId> 
    </dependency> 
    <!-- spring boot 依赖 --> 
    <dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-web</artifactId> 
    <version>${spring.boot.version}</version> 
    </dependency> 
    <!-- 日志组件依赖 --> 
    <dependency> 
    <groupId>org.slf4j</groupId> 
    <artifactId>slf4j-log4j12</artifactId>
    </dependency> 
    </dependencies>
    
    

    2.3 RPC之公用组件实现

    2.3.1 RPC接口注解

    在这里插入图片描述
    RPC客户端接口注解(@RpcClient):
    用于标识RPC接口,动态代理扫描时, 需要依据此注解, 创建代理类。
    RPC服务端接口注解(@RpcService):
    用于声明服务端RPC接口实现, 并且在服务注册时, 会根据注解扫描所有对应接口, 将信息注册
    至服务中心

    2.3.2 接口交互信息封装

    在这里插入图片描述
    请求参数封装对象(RpcRequest):
    对请求参数信息做统一封装, 便于接口的扩展调用处理, 包括请求ID,请求参数, 请求接口等信息。
    返回信息封装对象(RpcResponse):
    对返回信息做统一封装, 在序列化、请求回调与动态代理时做统一处理, 不用再去根据每个接口不同的返回信息做适配。

    2.3.3. protostuff序列化实现

    谷歌的protobuf, 需要写proto文件, 使用工具来编译生成Java文件会比较复杂;protostuff也是Google推出的, 它是基于protobuf做的封装, 可以很好地解决这个问题, 不需要再去编写proto脚本文件, 能够自动扫描对象信息, 并实现序列化。

    • protostuff序列化工具实现:
    
    
    import com.dyuproject.protostuff.LinkedBuffer;
    import com.dyuproject.protostuff.ProtobufIOUtil;
    import com.dyuproject.protostuff.Schema;
    import com.dyuproject.protostuff.runtime.RuntimeSchema;
    import org.objenesis.Objenesis;
    import org.objenesis.ObjenesisStd;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class ProtoSerializerUtil {
        /**
         * 序列化对象信息缓存
         */
        private static Map<Class<?>, Schema<?>> classSchemaMap = new
                ConcurrentHashMap<>();
        /**
         * 负责实例化对象, 支持缓存
         */
        private static Objenesis objenesis = new ObjenesisStd(true);
    
        /**
         * 序列化对象接口
         *
         * @param t
         * @param <T>
         * @return
         */
        public static <T> byte[] serialize(T t) {
            Class<T> cls = (Class<T>) t.getClass();
            LinkedBuffer buffer =
                    LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            try {
                Schema<T> schema = getClassSchema(cls);
                return ProtobufIOUtil.toByteArray(t, schema, buffer);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
    
        /**
         * 反序列化对象接口
         *
         * @param bytes
         * @param cls
         * @param <T>
         * @return
         */
        public static <T> T deserialize(byte[] bytes, Class<T> cls) {
            try {
                Schema<T> schema = getClassSchema(cls);
                T message = objenesis.newInstance(cls);
                ProtobufIOUtil.mergeFrom(bytes, message, schema);
                return message;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    
        /**
         * 获取序列化对象的信息
         *
         * @param cls
         * @param <T>
         * @return
         */
        private static <T> Schema<T> getClassSchema(Class<T> cls) {
            Schema<T> classSchema = null;
            if (classSchemaMap.containsKey(cls)) {
                classSchema = (Schema<T>) classSchemaMap.get(cls);
            } else {
                classSchema = RuntimeSchema.getSchema(cls);
                if (classSchema != null) {
                    classSchemaMap.put(cls, classSchema);
                }
            }
            return classSchema;
        }
    }
    
    
    • 开辟用户空间缓存,LinkedBuffer:

    申请用户空间缓存,默认空间大小DEFAULT_BUFFER_SIZE为512,用于提升序列化处理性能。
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

    • 序列化对象信息缓存, classSchemaMap:

    缓存对象序列化的类信息,主要是类的结构化信息, 比如属性、方法、权限信息等。 通过protostuff-runtime组件扫描获取。
    private static Map<Class, Schema> classSchemaMap = newConcurrentHashMap<>();

    • 序列化接口, serialize:

    分配用户缓存空间, 获取类结构信息, 然后调用protostuff组件实现对象的序列化。

    • 反序列化接口,deserialize:

    先获取类结构概要信息, 然后通过objenesis组件创建序列化对象,最后通过mergeFrom进行反序列化。

    • protoStuff底层序列化实现机制
      序列化接口ProtobufIOUtil.toByteArray:
    public static<T> byte[]toByteArray(T message,Schema<T> schema,
            LinkedBuffer buffer)
            {
    // 1. 判断缓存是否完全占用 
            if(buffer.start!=buffer.offset)
            throw new IllegalArgumentException("Buffer previously used and 
            had not been reset."); 
    // 2. 创建Protobuf输出对象final ProtobufOutput output = new ProtobufOutput(buffer); 
            try
            {
    // 3. 通过Schema对象实现序列化 
            schema.writeTo(output,message);
            }
            catch(IOException e)
            {
            throw new RuntimeException("Serializing to a byte array threw an 
            IOException" + 
            "(should never happen).",e);
            }
            return output.toByteArray();
            } 
    
    

    schema.writeTo的内部调用:
    在这里插入图片描述
    实质调用的MappedSchema的writeTo方法, 它会遍历所有属性, 逐一进行序列化处理。
    在这里插入图片描述
    继续查看writeString方法实现

    public void writeString(int fieldNumber,String value,boolean repeated)
            throws IOException
            {
    // 采用UTF-8编码转为字节数组 
            tail=writeUTF8VarDelimited(
            value,
            this,
            writeRawVarInt32(makeTag(fieldNumber,
            WIRETYPE_LENGTH_DELIMITED),this,tail));
            } 
    
    
    • protoStuff反序列化实现机制

    反序列化接口ProtobufIOUtil.mergeFrom:

    public static<T> void mergeFrom(byte[]data,T message,Schema<T> schema)
            {
    // 指定字节数组的偏移量和长度, 实现反序列化 
            IOUtil.mergeFrom(data,0,data.length,message,schema,false);
            }
    
    

    IOUtil.mergeFrom接口实现

    static<T> void mergeFrom(byte[]data,int offset,int length,T message,
            Schema<T> schema,boolean decodeNestedMessageAsGroup)
            {
            try
            {
    // 创建字节数组记录对象 
    final ByteArrayInput input=new ByteArrayInput(data,offset,
            length,
            decodeNestedMessageAsGroup);
    // 调用schema接口,实现反序列化 
            schema.mergeFrom(input,message);
    // 校验完整性 
            input.checkLastTagWas(0);
            }
            ... 
    
    

    调用MappedSchema的mergeFrom接口:逐一对Field进行反序列化处理。
    在这里插入图片描述

    2.3.4. 其他公用封装

    • IP封装工具
      用于服务注册时, 提供服务端的IP信息。
      IpUtil工具:
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.net.InetAddress;
    import java.net.NetworkInterface;
    import java.net.SocketException;
    import java.net.UnknownHostException;
    import java.util.Enumeration;
    
    public class IpUtil {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(IpUtil.class);
    
        public static String getHostAddress() {
            String host = null;
            try {
                host = InetAddress.getLocalHost().getHostAddress();
                return host;
            } catch (UnknownHostException e) {
                LOGGER.error("Cannot get server host.", e);
            }
            return host;
        }
    
        /**
         * 获取实际的IP
         * @return
         */
        public static String getRealIp()  {
            String localIp = null;
            String netIp = null;
            
            try {
            // 获取当前主机所有网卡信息 
                Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
                boolean finded = false;
                InetAddress ip = null;
                // 遍历所有网卡信息
                while (networkInterfaces.hasMoreElements() && !finded) {
                    NetworkInterface networkInterface = networkInterfaces.nextElement();
                    Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
                    // 遍历IP信息
                    while (addresses.hasMoreElements()) {
                        ip = addresses.nextElement();
                        if (!ip.isSiteLocalAddress() && !ip.isLoopbackAddress() && ip.getHostAddress().indexOf(":") == -1) {                          // 获取真实IP
                            netIp = ip.getHostAddress();
                            finded = true;
                            break;
                        } else if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress() && ip.getHostAddress().indexOf(":") == -1) {
    //当IP地址不是地区本地地址, 直接取主机IP
                            localIp = ip.getHostAddress();
                        }
                    }
                }
    
                if (netIp != null && !"".equals(netIp)) {
                    return netIp;
                } else {
                    return localIp;
                }
            } catch (SocketException ex) {
                throw new RpcException(ex);
            }
        }
    }
    
    
    • 全局分布式ID封装
      snowflake算法:
      snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0。
      在这里插入图片描述
    • 第一位:

    占用1bit,其值始终是0,预置为0。

    • 时间戳

    占用41bit,精确到毫秒,总共可以容纳约140年的时间。

    • 工作机器ID

    占用10bit,其中高位5bit是数据中心ID(datacenterId),低位5bit是工作节点ID(workerId),做多可以容纳1024个节点。

    • 序列号

    占用12bit,这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。
    snowflake同一秒内能生成的全局唯一不重复ID为2^10 * 2^12 = 1024 * 4096 = 4194304, 完全可以满足高并发的场景使用要求。
    代码实现, GlobalIDGenerator:

    
    import java.lang.management.ManagementFactory;
    import java.net.InetAddress;
    import java.net.NetworkInterface;
    
    /**
     * <p>名称:GlobalIDGenerator.java</p>
     * <p>描述:分布式自增长ID</p>
     * <pre>
     *     Twitter的 Snowflake JAVA实现方案
     * </pre>
     * 核心代码为其GlobalIDGenerator这个类实现,其原理结构如下,我分别用一个0表示一位,用—分割开部分的作用:
     * 1||0---0000000000 0000000000 0000000000 0000000000 0 --- 00000 ---00000 ---000000000000
     * 在上面的字符串中,第一位为未使用(实际上也可作为long的符号位),接下来的41位为毫秒级时间,
     * 然后5位datacenter标识位,5位机器ID(并不算标识符,实际是为线程标识),
     * 然后12位该毫秒内的当前毫秒内的计数,加起来刚好64位,为一个Long型。
     * 这样的好处是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由datacenter和机器ID作区分),
     * 并且效率较高,经测试,snowflake每秒能够产生26万ID左右,完全满足需要。
     * <p>
     * 64位ID (42(毫秒)+5(机器ID)+5(业务编码)+12(重复累加))
     * 生成ID示例;
     * 1154628864413139070
     * 1154628864413139071
     * 1154628864413139072
     * 1154628864413139073
     */
    public class GlobalIDGenerator {
    
        // 时间起始标记点,作为基准,一般取系统的最近时间(一旦确定不能变动)
        private final static long twepoch = 1288834974657L;
        // 机器标识位数
        private final static long workerIdBits = 5L;
        // 数据中心标识位数
        private final static long datacenterIdBits = 5L;
        // 机器ID最大值
        private final static long maxWorkerId = -1L ^ (-1L << workerIdBits);
        // 数据中心ID最大值
        private final static long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
        // 毫秒内自增位
        private final static long sequenceBits = 12L;
        // 机器ID偏左移12位
        private final static long workerIdShift = sequenceBits;
        // 数据中心ID左移17位
        private final static long datacenterIdShift = sequenceBits + workerIdBits;
        // 时间毫秒左移22位
        private final static long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    
        private final static long sequenceMask = -1L ^ (-1L << sequenceBits);
        /* 上次生产id时间戳 */
        private static long lastTimestamp = -1L;
        // 0,并发控制
        private long sequence = 0L;
    
        // 机器标识id, 分布式服务需设置不同编号, 不能超过31
        private final long workerId;
        // 数据标识id部分, 业务编码, 不能超过31
        private final long datacenterId;
    
        private static class SingletonGlobalIDGenerator {
            // 单例实现, 默认workerId为1, datacenterId为1
            private static GlobalIDGenerator instance = new GlobalIDGenerator(1, 1);
        }
    
    
        public GlobalIDGenerator() {
            this.datacenterId = getDatacenterId(maxDatacenterId);
            this.workerId = getMaxWorkerId(datacenterId, maxWorkerId);
        }
    
        /**
         * @param workerId     工作机器ID
         * @param datacenterId 序列号
         */
        public GlobalIDGenerator(long workerId, long datacenterId) {
            if (workerId > maxWorkerId || workerId < 0) {
                throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
            }
            if (datacenterId > maxDatacenterId || datacenterId < 0) {
                throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
            }
            this.workerId = workerId;
            this.datacenterId = datacenterId;
        }
    
        /**
         * 获取单例实例
         * @return
         */
        public static GlobalIDGenerator getInstance() {
            return SingletonGlobalIDGenerator.instance;
        }
    
        /**
         * 获取下一个ID
         *
         * @return
         */
        public synchronized long nextId() {
            long timestamp = timeGen();
            if (timestamp < lastTimestamp) {
                throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
            }
    
            if (lastTimestamp == timestamp) {
                // 当前毫秒内,则+1
                sequence = (sequence + 1) & sequenceMask;
                if (sequence == 0) {
                    // 当前毫秒内计数满了,则等待下一秒
                    timestamp = tilNextMillis(lastTimestamp);
                }
            } else {
                sequence = 0L;
            }
            lastTimestamp = timestamp;
            // ID偏移组合生成最终的ID,并返回ID
            long nextId = ((timestamp - twepoch) << timestampLeftShift)
                    | (datacenterId << datacenterIdShift)
                    | (workerId << workerIdShift) | sequence;
    
            return nextId;
        }
    
        /**
         * 获取下一个ID, 字符形式
         * @return
         */
        public synchronized  String nextStrId(){
            return String.valueOf(nextId());
        }
    
        private long tilNextMillis(final long lastTimestamp) {
            long timestamp = this.timeGen();
            while (timestamp <= lastTimestamp) {
                timestamp = this.timeGen();
            }
            return timestamp;
        }
    
        private long timeGen() {
            return System.currentTimeMillis();
        }
    
        /**
         * <p>
         * 获取 maxWorkerId
         * </p>
         */
        protected static long getMaxWorkerId(long datacenterId, long maxWorkerId) {
            StringBuffer mpid = new StringBuffer();
            mpid.append(datacenterId);
            String name = ManagementFactory.getRuntimeMXBean().getName();
            if (!name.isEmpty()) {
                /*
                 * GET jvmPid
                 */
                mpid.append(name.split("@")[0]);
            }
            /*
             * MAC + PID 的 hashcode 获取16个低位
             */
            return (mpid.toString().hashCode() & 0xffff) % (maxWorkerId + 1);
        }
    
        /**
         * <p>
         * 数据标识id部分
         * </p>
         */
        protected static long getDatacenterId(long maxDatacenterId) {
            long id = 0L;
            try {
                InetAddress ip = InetAddress.getLocalHost();
                NetworkInterface network = NetworkInterface.getByInetAddress(ip);
                if (network == null) {
                    id = 1L;
                } else {
                    byte[] mac = network.getHardwareAddress();
                    id = ((0x000000FF & (long) mac[mac.length - 1])
                            | (0x0000FF00 & (((long) mac[mac.length - 2]) << 8))) >> 6;
                    id = id % (maxDatacenterId + 1);
                }
            } catch (Exception e) {
                System.out.println(" getDatacenterId: " + e.getMessage());
            }
            return id;
        }
    
        public static void main(String[] args) {
            GlobalIDGenerator id = new GlobalIDGenerator(0, 1);
            for (int i = 0; i < 100000; i++) {
                System.err.println(id.nextId());
            }
        }
    }
    

    2.4 RPC之公用接口实现

    负责所有RPC接口的定义,客户端与服务端工程都需要引用, 这里提供了订单服务接口。
    需要增加@RpcClient客户端注解标识, 用于客户端工程对RPC接口的扫描
    在这里插入图片描述

    2.5 RPC之客户端实现

    客户端整体功能实现:
    在这里插入图片描述

    2.5.1 客户端Netty通讯配置

    RPC底层采用Netty实现通讯,实现高性能传输, 具体的代码实现:
    RpcRequestManager发送客户端请求接口:

    /**
     * 发送客户端请求
     *
     * @param rpcRequest
     * @throws InterruptedException
     * @throws RpcException
     */
    public static RpcResponse sendRequest(RpcRequest rpcRequest)throws
            InterruptedException,RpcException{
    // 1. 从缓存中获取RPC服务列表信息 
            List<ProviderService> providerServices=
            SERVICE_ROUTE_CACHE.getServiceRoutes(rpcRequest.getClassName());
    // 2. 从服务列表中获取第一个服务信息 
            ProviderService targetServiceProvider=providerServices.get(0);
            if(targetServiceProvider!=null){
            String requestId=rpcRequest.getRequestId();
    // 3. 发起远程调用 
            RpcResponse response=requestByNetty(rpcRequest,
            targetServiceProvider);
            LOGGER.info("Send request[{}:{}] to service provider successfully",
            requestId,rpcRequest.toString());
            return response;
            }else{
            throw new RpcException(StatusEnum.NOT_FOUND_SERVICE_PROVINDER);
            }
            }
    
    

    远程调用requestByNetty具体实现:

    /**
         * 采用Netty进行远程调用
         */
        public static RpcResponse requestByNetty(RpcRequest rpcRequest, ProviderService providerService) {
    
            // 1. 创建Netty连接配置
            EventLoopGroup worker = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(providerService.getServerIp(), providerService.getNetworkPort())
                    .handler(rpcClientInitializer);
            try {
                // 2. 建立连接
                ChannelFuture future = bootstrap.connect().sync();
                if (future.isSuccess()) {
                    ChannelHolder channelHolder = ChannelHolder.builder()
                            .channel(future.channel())
                            .eventLoopGroup(worker)
                            .build();
                    LOGGER.info("Construct a connector with service provider[{}:{}] successfully",
                            providerService.getServerIp(),
                            providerService.getNetworkPort()
                    );
    
                    // 3. 创建请求回调对象
                    final RequestFuture<RpcResponse> responseFuture = new SyncRequestFuture(rpcRequest.getRequestId());
                    // 4. 将请求回调放置缓存
                    SyncRequestFuture.syncRequest.put(rpcRequest.getRequestId(), responseFuture);
                    // 5. 根据连接通道, 下发请求信息
                    ChannelFuture channelFuture = channelHolder.getChannel().writeAndFlush(rpcRequest);
                    // 6. 建立回调监听
                    channelFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            // 7. 设置是否成功的标记
                            responseFuture.setWriteResult(future.isSuccess());
                            if(!future.isSuccess()) {
                                // 调用失败,清除连接缓存
                                SyncRequestFuture.syncRequest.remove(responseFuture.requestId());
                            }
                        }
                    });
                    // 8. 阻塞等待3秒
                    RpcResponse result = responseFuture.get(3, TimeUnit.SECONDS);
                    // 9. 移除连接缓存
                    SyncRequestFuture.syncRequest.remove(rpcRequest.getRequestId());
    
                    return result;
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
    
            return null;
        }
    

    这里是采用同步方式进行调用, 定义SyncRequestFuture请求回调对象, 继承Future接口:
    在上面的RPC同步请求中需要使用, 并且会通过缓存方式记录回调信息。

    package com.itcast.rpc.client.runner;
    
    import com.itcast.common.data.RpcResponse;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * <p>Description: </p>
     * @date 
     * @author 
     * @version 1.0
     * <p>Copyright:Copyright(c)2020</p>
     */
    public class SyncRequestFuture implements RequestFuture<RpcResponse> {
    
        // 请求回调缓存
        public static Map<String, RequestFuture> syncRequest = new ConcurrentHashMap<String, RequestFuture>();
        // 计数器
        private CountDownLatch latch = new CountDownLatch(1);
        // 标记开始时间, 判断是否超时
        private final long begin = System.currentTimeMillis();
        // 超时时间设定
        private long timeout;
        // rpc响应对象
        private RpcResponse response;
        // 请求ID
        private final String requestId;
        // 标记是否有回调结果
        private boolean writeResult;
        // 调用异常记录
        private Throwable cause;
        // 标记调用是否超时
        private boolean isTimeout = false;
    
        public SyncRequestFuture(String requestId) {
            this.requestId = requestId;
        }
    
        /**
         * 构造方法
         * @param requestId
         * @param timeout
         */
        public SyncRequestFuture(String requestId, long timeout) {
            this.requestId = requestId;
            this.timeout = timeout;
            writeResult = true;
            isTimeout = false;
        }
    
        /**
         * 获取异常栈信息
         * @return
         */
        public Throwable cause() {
            return cause;
        }
    
        /**
         * 设置异常栈信息
         * @param cause
         */
        public void setCause(Throwable cause) {
            this.cause = cause;
        }
    
        /**
         * 标记是否成功接收到回调结果
         * @return
         */
        public boolean isWriteSuccess() {
            return writeResult;
        }
    
        /**
         * 标记回调结果
         * @param result
         */
        public void setWriteResult(boolean result) {
            this.writeResult = result;
        }
    
        /**
         * 获取请求ID
         * @return
         */
        public String requestId() {
            return requestId;
        }
    
        /**
         * 获取响应结果
         * @return
         */
        public RpcResponse response() {
            return response;
        }
    
        /**
         * 设置响应结果信息
         * @param response
         */
        public void setResponse(RpcResponse response) {
            this.response = response;
            latch.countDown();
        }
    
        /**
         * 取消调用
         * @param mayInterruptIfRunning
         * @return
         */
        public boolean cancel(boolean mayInterruptIfRunning) {
            return true;
        }
    
        /**
         * 标记是否取消
         * @return
         */
        public boolean isCancelled() {
            return false;
        }
    
        /**
         * 标记是否完成
         * @return
         */
        public boolean isDone() {
            return false;
        }
    
        /**
         * 获取响应结果(阻塞式)
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         */
        public RpcResponse get() throws InterruptedException, ExecutionException {
            latch.wait();
            return response;
        }
    
        /**
         * 获取响应结果(指定阻塞等待时间)
         * @param timeout
         * @param unit
         * @return
         * @throws InterruptedException
         * @throws ExecutionException
         * @throws TimeoutException
         */
        public RpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (latch.await(timeout, unit)) {
                return response;
            }
            return null;
        }
    
        /**
         * 标记请求调用是否超时
         * @return
         */
        public boolean isTimeout() {
            if (isTimeout) {
                return isTimeout;
            }
            return System.currentTimeMillis() - begin > timeout;
        }
    }
    
    

    请求调用处理完成之后, 还需要在客户端接收数据时, 通知请求回调完成。
    RpcResponseHandler的代码实现:

    
    import com.itcast.common.data.RpcResponse;
    import com.itcast.rpc.client.runner.RpcRequestPool;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * Rpc数据接收响应处理器
     */
    @Component
    @ChannelHandler.Sharable
    public class RpcResponseHandler extends SimpleChannelInboundHandler<RpcResponse> {
    
        @Autowired
        private RpcRequestPool requestPool;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponse znsResponse) throws Exception {
            // 数据接收, 通知请求回调
            requestPool.notifyRequest(znsResponse.getRequestId(), znsResponse);
        }
    }
    

    上面就是客户端的整个调用流程实现, 请求数据是如何实现序列化传输的?

    序列化编码,RpcClientEncodeHandler实现, 这里是继承了MessageToByteEncoder对象:

    import com.itcast.common.data.RpcRequest;
    import com.itcast.common.utils.ProtoSerializerUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * Rpc客户端编码器
     */
    public class RpcClientEncodeHandler extends MessageToByteEncoder<RpcRequest> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, RpcRequest RpcRequest, ByteBuf in) throws Exception {
            // 调用封装的protostuff公用组件, 实现序列化
            byte[] bytes = ProtoSerializerUtil.serialize(RpcRequest);
            in.writeInt(bytes.length);
            in.writeBytes(bytes);
        }
    }
    

    反序列化, RpcClientDecodeHandler实现, 继承ByteToMessageDecoder对象

    import com.itcast.common.data.RpcResponse;
    import com.itcast.common.utils.JsonSerializerUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    /**
     * 客户端解码器
     */
    public class RpcClientDecodeHandler extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {
            if (in.readableBytes() <= 4) {
                return;
            }
    
            int length = in.readInt();
            in.markReaderIndex();
            if (in.readableBytes() < length) {
                in.resetReaderIndex();
            } else {
                byte[] bytes = new byte[in.readableBytes()];
                in.readBytes(bytes);
                RpcResponse znsResponse = JsonSerializerUtil.deserialize(bytes, RpcResponse.class);
                list.add(znsResponse);
            }
        }
    }
    
    

    如果将来要修改, 采用其他序列化方式, 可以统一修改ProtoSerializerUtil.deserialize实现即可。

    2.5.2 动态代理配置实现

    这里动态代理是采用cglib做的实现, 负责对RPC接口进行代理与方法拦截。
    动态代理拦截处理器, ProxyHelper:
    提供通过代理创建实例的接口, 内部通过拦截方式,发起远程RPC调用

    import com.itcast.common.data.RpcRequest;
    import com.itcast.common.data.RpcResponse;
    import com.itcast.common.utils.RequestIdUtil;
    import com.itcast.rpc.client.runner.RpcRequestManager;
    import com.itcast.rpc.client.runner.RpcRequestPool;
    import net.sf.cglib.proxy.Enhancer;
    import net.sf.cglib.proxy.MethodInterceptor;
    import net.sf.cglib.proxy.MethodProxy;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Method;
    
    /**
     * 动态代理拦截处理
     */
    @Component
    public class ProxyHelper {
    
        @Autowired
        private RpcRequestPool rpcRequestPool;
    
        public <T> T newProxyInstance(Class<T> cls) {
            Enhancer enhancer = new Enhancer();
            enhancer.setSuperclass(cls);
            enhancer.setCallback(new ProxyCallBackHandler());
            return (T) enhancer.create();
        }
    
        class ProxyCallBackHandler implements MethodInterceptor {
    
            @Override
            public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
                return doIntercept(method, args);
            }
    
            /**
             * 拦截RCP接口调用
             * @param method
             * @param parameters
             * @return
             * @throws Throwable
             */
            private Object doIntercept(Method method, Object[] parameters) throws Throwable {
                String requestId = RequestIdUtil.requestId();
                String className = method.getDeclaringClass().getName();
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
                // 1. 构建RPC请求信息
                RpcRequest znsRequest = RpcRequest.builder()
                        .requestId(requestId)
                        .className(className)
                        .methodName(methodName)
                        .parameterTypes(parameterTypes)
                        .parameters(parameters)
                        .build();
                // 2. 采用异步方式请求调用
                RpcRequestManager.sendRequest(znsRequest);
                // 3. 通过RCP请求连接记录, 获取调用结果
                RpcResponse znsResponse = rpcRequestPool.fetchResponse(requestId);
                if (znsResponse == null) {
                    return null;
                }
    
                if (znsResponse.isError()) {
                    throw znsResponse.getCause();
                }
                // 4. 返回请求调用结果
                return znsResponse.getResult();
            }
        }
    }
    
    

    实现流程:

    1. 构建RPC请求信息
    2. 采用同步方式请求调用
    3. 返回请求调用结果
      有了动态的实现, 还需要动态代理扫描管理器, ServiceProxyManager:
    import com.itcast.common.annotation.RpcClient;
    import com.itcast.common.utils.SpringBeanFactory;
    import com.itcast.rpc.client.config.RpcClientConfiguration;
    import org.apache.commons.collections.CollectionUtils;
    import org.reflections.Reflections;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.support.DefaultListableBeanFactory;
    import org.springframework.stereotype.Component;
    
    import java.util.Set;
    
    /**
     * 动态代理管理器
     */
    @Component
    public class ServiceProxyManager {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProxyManager.class);
    
        @Autowired
        private RpcClientConfiguration configuration;
    
        @Autowired
        private ProxyHelper proxyHelper;
    
        public void initServiceProxyInstance() {
            Reflections reflections = new Reflections(configuration.getRpcClientApiPackage());
            Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcClient.class);
            if (CollectionUtils.isEmpty(typesAnnotatedWith)) {
                return;
            }
    
            DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) SpringBeanFactory.context()
                    .getAutowireCapableBeanFactory();
            for (Class<?> cls : typesAnnotatedWith) {
                RpcClient znsClient = cls.getAnnotation(RpcClient.class);
                String serviceName = cls.getName();
                beanFactory.registerSingleton(serviceName, proxyHelper.newProxyInstance(cls));
            }
    
            LOGGER.info("Initialize proxy for service successfully");
        }
    }
    
    

    主要实现流程:

    1. 创建反射信息, 指定RPC扫描信息
    2. 根据PRC注解标识, 获取类信息
    3. 获取容器bean工厂
    4. 遍历扫描的RPC类信息
    5. 获取RpcClient注解信息
    6. 初始化并注册实例

    2.5.3 ZK服务订阅实现

    客户端服务订阅的具体代码实现:

    import com.itcast.common.annotation.RpcClient;
    import com.itcast.rpc.client.cache.ServiceRouteCache;
    import com.itcast.rpc.client.channel.ProviderService;
    import com.itcast.rpc.client.config.RpcClientConfiguration;
    import org.apache.commons.collections.CollectionUtils;
    import org.reflections.Reflections;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.Set;
    
    /**
     * 注册服务拉取管理器
     */
    @Component
    public class ServicePullManager {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServicePullManager.class);
    
        @Autowired
        private ZKit zKit;
    
        @Autowired
        private ServiceRouteCache serviceRouteCache;
    
        @Autowired
        private RpcClientConfiguration configuration;
    
        public void pullServiceFromZK() {
            Reflections reflections = new Reflections(configuration.getRpcClientApiPackage());
            Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcClient.class);
            if (CollectionUtils.isEmpty(typesAnnotatedWith)) {
                return;
            }
            for (Class<?> cls : typesAnnotatedWith) {
                String serviceName = cls.getName();
    
                // Cache service provider list into local
                List<ProviderService> providerServices = zKit.getServiceInfos(serviceName);
                serviceRouteCache.addCache(serviceName, providerServices);
    
                // Add listener for service node
                zKit.subscribeZKEvent(serviceName);
            }
    
            LOGGER.info("Pull service address list from zookeeper successfully");
        }
    }
    
    

    这里为了提升服务信息的获取性,避免每次与ZK服务端建立连接,加入了缓存处理。

    1. 扫描获取RPC接口
    2. 将拉取ZK服务信息放置缓存
    3. 注册订阅ZK服务节点事件
      ZK客户端的实现
    mport com.google.common.collect.Lists;
    import com.itcast.rpc.client.cache.ServiceRouteCache;
    import com.itcast.rpc.client.channel.ProviderService;
    import com.itcast.rpc.client.config.RpcClientConfiguration;
    import org.I0Itec.zkclient.IZkChildListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.commons.collections.CollectionUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    import java.util.stream.Collectors;
    
    @Component
    public class ZKit {
    
        @Autowired
        private RpcClientConfiguration configuration;
    
        @Autowired
        private ZkClient zkClient;
    
        @Autowired
        private ServiceRouteCache serviceRouteCache;
    
        /**
         * 服务订阅
         * @param serviceName
         */
        public void subscribeZKEvent(String serviceName) {
            String path = configuration.getZkRoot() + "/" + serviceName;
            zkClient.subscribeChildChanges(path, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> list) throws Exception {
                    if (CollectionUtils.isNotEmpty(list)) {
                        List<ProviderService> providerServices = convertToProviderService(list);
                        serviceRouteCache.updateCache(serviceName, providerServices);
                    }
                }
            });
        }
    
        public List<ProviderService> getServiceInfos(String serviceName) {
            String path = configuration.getZkRoot() + "/" + serviceName;
            List<String> children = zkClient.getChildren(path);
    
            List<ProviderService> providerServices = convertToProviderService(children);
            return providerServices;
        }
    
        private List<ProviderService> convertToProviderService(List<String> list) {
            if (CollectionUtils.isEmpty(list)) {
                return Lists.newArrayListWithCapacity(0);
            }
            List<ProviderService> providerServices = list.stream().map(v -> {
                String[] serviceInfos = v.split(":");
                return ProviderService.builder()
                        .serverIp(serviceInfos[0])
                        .serverPort(Integer.parseInt(serviceInfos[1]))
                        .networkPort(Integer.parseInt(serviceInfos[2]))
                        .build();
            }).collect(Collectors.toList());
            return providerServices;
        }
    }
    
    

    实现流程:

    1. 组装服务节点信息。
    2. 订阅服务节点。
    3. 判断获取的节点信息,是否为空。
    4. 将服务端获取的信息, 转换为服务记录对象。
    5. 更新服务记录缓存信息。

    3. RPC之服务端实现

    在这里插入图片描述

    3.1服务端配置

    定义服务端配置, 声明ZK客户端:

    import org.I0Itec.zkclient.ZkClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class BeanConfig {
    
        /**
         * RPC服务端配置
         */
        @Autowired
        private RpcServerConfiguration rpcServerConfiguration;
    
        /**
         * 声音ZK客户端
         * @return
         */
        @Bean
        public ZkClient zkClient() {
            return new ZkClient(rpcServerConfiguration.getZkAddr(), rpcServerConfiguration.getConnectTimeout());
        }
    }
    
    

    RPC服务端配置信息:

    
    import lombok.Data;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    @Data
    @Component
    public class RpcServerConfiguration {
    
        /**
         * ZK根节点名称
         */
        @Value("${rpc.server.zk.root}")
        private String zkRoot;
    
        /**
         * ZK地址信息
         */
        @Value("${rpc.server.zk.addr}")
        private String zkAddr;
    
    
        /**
         * RPC通讯端口
         */
        @Value("${rpc.network.port}")
        private int networkPort;
    
        /**
         * Spring Boot 服务端口
         */
        @Value("${server.port}")
        private int serverPort;
    
        /**
         * ZK连接超时时间配置
         */
        @Value("${rpc.server.zk.timeout:10000}")
        private int connectTimeout;
    }
    

    3.2 服务端Netty通讯配置

    1. 服务端Netty数据接收处理器

    import com.itcast.common.data.RpcRequest;
    import com.itcast.common.data.RpcResponse;
    import com.itcast.common.utils.SpringBeanFactory;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    
    /**
     * Rpc的服务端数据接收处理
     */
    @Component
    @ChannelHandler.Sharable
    public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RpcRequestHandler.class);
    
        /**
         * 数据接收处理
         * @param ctx
         * @param request
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
            //3、封装响应对象->RpcResponse
            RpcResponse response = new RpcResponse();
    
            //1、解析传过来的数据RpcRequest->ClassName->com.itcast.rpc...->Method->parameter..
            String requestId = request.getRequestId();
            String className = request.getClassName();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();
    
            try {
                //2、找到Class对应的方法执行反射调用
                Object targetClass = SpringBeanFactory.getBean(Class.forName(className));
                Method targetMethod = targetClass.getClass().getMethod(methodName, parameterTypes);
                Object result = targetMethod.invoke(targetClass, parameters);
    
                //3、封装响应对象->RpcResponse
                response.setRequestId(requestId);
                response.setResult(result);
            } catch (Throwable e) {
                response.setCause(e);
            }
            //4、响应结果
            ctx.writeAndFlush(response);
        }
    }
    
    

    实现流程:

    1. 定义RPC返回对象

    2. 定义请求数据信息

    3. 获取服务实现类

    4. 获取实现类的方法

    5. 通过反射机制调用方法

    6. 设置返回结果

    7. 设置异常信息

    8. 输出返回结果

    9. 服务Netty初始化配置

    
    import com.itcast.rpc.server.connector.handler.RpcRequestHandler;
    import com.itcast.rpc.server.connector.handler.RpcServerDecodeHandler;
    import com.itcast.rpc.server.connector.handler.RpcServerEncodeHandler;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 服务端Netty连接初始化配置
     */
    @Component
    @ChannelHandler.Sharable
    public class RpcServerInitializer extends ChannelInitializer<Channel> {
    
        @Autowired
        private RpcRequestHandler znsRequestHandler;
    
        /**
         * 初始化连接通道
         * @param channel
         * @throws Exception
         */
        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline()
                    .addLast(new RpcServerDecodeHandler())
                    .addLast(new RpcServerEncodeHandler())
                    .addLast(znsRequestHandler);
        }
    }
    
    1. 服务编码配置
    import com.itcast.common.data.RpcResponse;
    import com.itcast.common.utils.ProtoSerializerUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * 服务端编码器
     */
    public class RpcServerEncodeHandler extends MessageToByteEncoder<RpcResponse> {
    
        /**
         * 编码接口
         * @param ctx
         * @param znsResponse
         * @param byteBuf
         * @throws Exception
         */
        @Override
        protected void encode(ChannelHandlerContext ctx, RpcResponse znsResponse, ByteBuf byteBuf)
                throws Exception {
            // 通过Protostuff实现编码接口
            byte[] bytes = ProtoSerializerUtil.serialize(znsResponse);
            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
        }
    }
    
    
    1. 服务解码配置
    import com.itcast.common.data.RpcRequest;
    import com.itcast.common.utils.ProtoSerializerUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    
    /**
     * 服务端解码器
     */
    public class RpcServerDecodeHandler extends ByteToMessageDecoder {
    
        /**
         * 解码接口实现
         * @param ctx
         * @param in
         * @param list
         * @throws Exception
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws
                Exception {
            if (in.readableBytes() <= 4) {
                return;
            }
    
            int length = in.readInt();
            in.markReaderIndex();
            if (in.readableBytes() < length) {
                in.resetReaderIndex();
            } else {
                byte[] bytes = new byte[in.readableBytes()];
                in.readBytes(bytes);
                // 通过Protostuff实现解码
                RpcRequest znsRequest = ProtoSerializerUtil.deserialize(bytes, RpcRequest.class);
                list.add(znsRequest);
            }
        }
    }
    
    1. 服务启动配置
    import com.itcast.common.utils.SpringBeanFactory;
    import com.itcast.rpc.server.config.RpcServerConfiguration;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * Rpc服务端连接接收器
     */
    public class RpcServerAcceptor implements Runnable {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RpcServerAcceptor.class);
    
        private EventLoopGroup boss = new NioEventLoopGroup();
        private EventLoopGroup worker = new NioEventLoopGroup();
    
        private RpcServerConfiguration znsServerConfiguration;
        private RpcServerInitializer znsServerInitializer;
    
        public RpcServerAcceptor() {
            this.znsServerConfiguration = SpringBeanFactory.getBean(RpcServerConfiguration.class);
            this.znsServerInitializer = SpringBeanFactory.getBean(RpcServerInitializer.class);
        }
    
        /**
         * Netty通讯服务启动
         */
        @Override
        public void run() {
            // 1. Netty服务配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(znsServerInitializer);
    
            try {
                LOGGER.info("ZnsServer acceptor startup at port[{}] successfully", znsServerConfiguration.getNetworkPort());
                // 2. 绑定端口, 启动服务
                ChannelFuture future = bootstrap.bind(znsServerConfiguration.getNetworkPort()).sync();
                // 3. 服务同步阻塞方式运行
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                LOGGER.error("ZnsServer acceptor startup failure!", e);
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully().syncUninterruptibly();
                worker.shutdownGracefully().syncUninterruptibly();
            }
        }
    }
    

    实现流程:

    1. Netty服务配置
    2. 绑定端口, 启动服务
    3. 服务同步阻塞方式运行

    3.3 服务端ZK注册实现

    1. 服务端注册实现
    import com.itcast.common.annotation.RpcService;
    import com.itcast.common.utils.IpUtil;
    import com.itcast.common.utils.SpringBeanFactory;
    import com.itcast.rpc.server.config.RpcServerConfiguration;
    import org.apache.commons.collections.MapUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    /**
     * Zookeeper服务连接注册管理
     */
    @Component
    public class ServicePushManager {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServicePushManager.class);
    
        @Autowired
        private ZKit zKit;
    
        @Autowired
        private RpcServerConfiguration configuration;
    
        /**
         * 服务注册接口
         */
        public void registerIntoZK() {
            //1.找到所有有@RpcService注解的类
            Map<String, Object> beanListByAnnotationClass =
                    SpringBeanFactory.getBeanListByAnnotationClass(RpcService.class);
    
            if(!MapUtils.isEmpty(beanListByAnnotationClass)){
                //根节点创建
                zKit.createRootNode();
    
                for (Object bean : beanListByAnnotationClass.values()) {
                    //2.获取每个类上的注解@RpcService.cls的值
                    RpcService annotation = bean.getClass().getAnnotation(RpcService.class);
                    Class<?> clazz = annotation.cls();
    
                    //3.获取cls的之后,将它的name作为节点名字,注册到Zookeeper中(在rpc节点下创建一个子节点)
                    String serviceName = clazz.getName();
    
                    //创建接口对应的节点
                    zKit.createPersistentNode(serviceName);
    
                    //3.同时为每个节点创建一个子节点  IP:HttpPort:RpcPort
                    String serviceAddress =
                            IpUtil.getRealIp()+
                            ":"+configuration.getServerPort()+
                            ":"+configuration.getNetworkPort();
                    zKit.createNode(serviceName+"/"+serviceAddress);
                }
            }
    
        }
    
    }
    

    流程:

    1. 扫描所有Rpc接口服务。
    2. 将接口服务信息注册至ZK。

    4.流程回顾

    在这里插入图片描述

    在这里插入图片描述

    展开全文
  • 手写rpc

    2019-01-07 14:52:04
    模拟rpc系统:  注册中心:map保存服务提供者信息  协议:http协议,netty协议  消费者:customer  提供者:provider pom: &lt;dependency&gt; &lt;groupId&gt;io.netty&lt;/...

    模拟rpc系统:

      注册中心:map保存服务提供者信息

      协议:http协议,netty协议

     消费者:customer

     提供者:provider

    pom:

    <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.32.Final</version>
                <scope>system</scope>
                <systemPath>${project.basedir}/lib/netty-all-4.1.32.Final.jar</systemPath>
            </dependency>
    
            <!-- 内嵌的tomcat -->
            <dependency>
                <groupId>org.apache.tomcat.embed</groupId>
                <artifactId>tomcat-embed-core</artifactId>
                <version>9.0.12</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.8.1</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>

    tomcat服务:结构

    <?xml version="1.0" encoding="UTF-8"?>
    
    <Server port="8005" shutdown="SHUTDOWN">
    
      <Service name="Catalina">
    
      
        <Connector port="8080" protocol="HTTP/1.1"
                   connectionTimeout="20000"
                   redirectPort="8443" />
    
        <!-- Define an AJP 1.3 Connector on port 8009 -->
        <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
    
        <Engine name="Catalina" defaultHost="localhost">
    
          <Realm className="org.apache.catalina.realm.LockOutRealm">
          
            <Realm className="org.apache.catalina.realm.UserDatabaseRealm"
                   resourceName="UserDatabase"/>
          </Realm>
    
          <Host name="localhost"  appBase="webapps"
                unpackWARs="true" autoDeploy="true">
            <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
                   prefix="localhost_access_log" suffix=".txt"
                   pattern="%h %l %u %t &quot;%r&quot; %s %b" />
    
          </Host>
        </Engine>
      </Service>
    </Server>

    编写tomcat服务器:

    package protocol.http;
    
    import org.apache.catalina.*;
    import org.apache.catalina.connector.Connector;
    import org.apache.catalina.core.StandardContext;
    import org.apache.catalina.core.StandardEngine;
    import org.apache.catalina.core.StandardHost;
    import org.apache.catalina.startup.Tomcat;
    
    /**
     * @program: rpcdemo
     * @description:
     * @author: z.hw
     * @create: 2019-01-06 00:33
     **/
    public class HttpServer {
    
        public void start(String hostname,Integer port){
    
            System.out.println("定制tomcat启动服务");
            Tomcat tomcat=new Tomcat();
    
            Server server = tomcat.getServer();
    
            Service service = server.findService("Tomcat");
    
            Connector connector=new Connector();
    
            connector.setPort(port);
    
            Engine engine=new StandardEngine();
    
            engine.setDefaultHost(hostname);
    
    
            Host host=new StandardHost();
    
            host.setName(hostname);
    
            String contextPath="";
    
            Context context=new StandardContext();
            context.setPath(contextPath);
            context.addLifecycleListener(new Tomcat.FixContextListener());
    
    
            host.addChild(context);
    
            engine.addChild(host);
    
            service.setContainer(engine);
            service.addConnector(connector);
    
            tomcat.addServlet(contextPath,"dispatcher",new DispartceServlet()); //怎加处理的服务器类
    
            context.addServletMappingDecoded("/*","dispatcher");//增加容器上下文处理路径
    
            try {
                tomcat.start();//初始化
                tomcat.getServer().await();//等待
            } catch (LifecycleException e) {
                e.printStackTrace();
            } finally {
            }
    
    
        }
    
    
    
    }
    

    编写Netty服务器:

    package protocol.dubbo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    import java.net.InetSocketAddress;
    
    
    /**
     * @program: rpcdemo
     * @description:
     * @author: z.hw
     * @create: 2019-01-06 03:03
     **/
    public class NettyServer {
    
    
        public void start(String hostname,int port){
            NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
            try {
                final ServerBootstrap serverBootstrap=new ServerBootstrap();
                serverBootstrap.group(nioEventLoopGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline=socketChannel.pipeline();
                                pipeline.addLast("decoder",new ObjectDecoder(ClassResolvers
                                        .weakCachingConcurrentResolver(this.getClass().getClassLoader())
                                ));
                                pipeline.addLast("encoder",new ObjectEncoder());
                                pipeline.addLast("handler",new NettyServerHandler());
                            }
                        });
                        ChannelFuture future = serverBootstrap.bind(hostname,port).sync();
                        System.out.println("Server start listen at " + port );
                        future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                nioEventLoopGroup.shutdownGracefully();
            }
        }
    
    }
    

    相关代码在:点击

    展开全文
  • 手写rpc框架

    2020-02-06 16:51:30
    文章目录手写rpc框架rpc概念rpc是什么为什么要用rpcrpc核心概念术语rpc的流程开发rpc框架设计客户端代理对象生成发现者协议层网络层实现客户端代理对象生成发现者协议层网络层设计服务端...RPC 框架的要素 ...

    手写rpc框架

    rpc概念

    rpc是什么

    远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用远程方法调用

    为什么要用rpc

    RPC框架介于传输层和应用中间,它会帮助处理:

    • 服务化
    • 可重用
    • 系统间交互调用

    rpc核心概念术语

    • Client、Server、calls、replies、service、programs、procedures、version、marshalling(编组)、unmarshalling(解组)
    • 一个网络服务由一个或多个远程程序集构成
    • 一个远程程序实现一个或多个远程过程
    • 过程、过程的参数、结果在程序协议说明书中定义说明
    • 为兼容程序协议变更,一个服务端可能支持多个版本的远程程序

    rpc的流程

    在这里插入图片描述

    1. 客户端处理过程中调用Client Stub(就像调用本地方法一样),传递参数;

    2. Client Stub 将参数编组为消息,然后通过系统调用向服务端发送消息;

    3. 客户端本地操作系统将消息从客户端机器发送到服务端机器;

    4. 服务端操作系统将接收到的数据包传递给Server Stub;

    5. Server Stub解组消息为参数;

    6. Server Stub再调用服务端的过程,过程执行结果以反方向的相同步骤响应给客户端。

    开发rpc框架

    用户使用rpc框架的步骤如下:

    1. 定义过程定义接口

    2. 服务端实现过程

    3. 客户端使用生成的stub代理对象

    所以在开发rpc框架中,需实现客户端和服务端。

    设计客户端

    代理对象生成

    首先考虑客户端如何生成过程接口的代理对象。在设计中,设计客户端代理工厂,用JDK动态代理即可生成接口的代理对象。类图如下图所示。

    在这里插入图片描述

    发现者

    设计客户端的时候,在ClientStubInvocationHandler中需要完成的两件事为编组消息和发送网络请求,而将请求的内容编组为消息这件事就交由客户端的stub代理,它除了消息协议和网络层的事务以外,可能还存在一个服务信息发现。此外消息协议可能也是会存在变化的,我们也需要去支持多种协议。此时我们需要得知某服务用的是什么协议,所以我们需要引入一个服务发现者。

    在这里插入图片描述

    协议层

    想要做到支持多种协议,类该如何设计(面向接口,策略模式,组合)。

    在这里插入图片描述

    此时又存在一些问题,单纯依靠编组和解组的方法是不够的,编组和解组的操作对象是请求、响应,但是它们的内容是不同的,此时我们又需要定义框架标准的请求、响应类。

    在这里插入图片描述

    此时协议层扩展为4个方法。将消息协议独立为一层,因为客户端和服务端都需要使用。

    在这里插入图片描述

    网络层

    网络层的工作主要是发送请求和获得响应,此时我们如果需要发起网络请求必定先要知道服务地址,此时我们利用下图中serviceInfo对象作为必须依赖,setRequest()方法里面会存在发送数据,还有发送给谁。

    在这里插入图片描述

    实现客户端

    按照之前的类图设计,进行填码。

    代理对象生成

    public class ClientStubProxyFactory {
    
    	private ServiceInfoDiscoverer sid;
    
    	private Map<String, MessageProtocol> supportMessageProtocols;
    
    	private NetClient netClient;
    
    	private Map<Class<?>, Object> objectCache = new HashMap<>();
    
    	public <T> T getProxy(Class<T> interf) {
    		T obj = (T) this.objectCache.get(interf);
    		if (obj == null) {
    			obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
    					new ClientStubInvocationHandler(interf));
    			this.objectCache.put(interf, obj);
    		}
    
    		return obj;
    	}
    
    	public ServiceInfoDiscoverer getSid() {
    		return sid;
    	}
    
    	public void setSid(ServiceInfoDiscoverer sid) {
    		this.sid = sid;
    	}
    
    	public Map<String, MessageProtocol> getSupportMessageProtocols() {
    		return supportMessageProtocols;
    	}
    
    	public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
    		this.supportMessageProtocols = supportMessageProtocols;
    	}
    
    	public NetClient getNetClient() {
    		return netClient;
    	}
    
    	public void setNetClient(NetClient netClient) {
    		this.netClient = netClient;
    	}
    
    	private class ClientStubInvocationHandler implements InvocationHandler {
    		private Class<?> interf;
    
    		private Random random = new Random();
    
    		public ClientStubInvocationHandler(Class<?> interf) {
    			super();
    			this.interf = interf;
    		}
    
    		@Override
    		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
    			if (method.getName().equals("toString")) {
    				return proxy.getClass().toString();
    			}
    
    			if (method.getName().equals("hashCode")) {
    				return 0;
    			}
    
    			// 1、获得服务信息
    			String serviceName = this.interf.getName();
    			List<ServiceInfo> sinfos = sid.getServiceInfo(serviceName);
    
    			if (sinfos == null || sinfos.size() == 0) {
    				throw new Exception("远程服务不存在!");
    			}
    
    			// 随机选择一个服务提供者(软负载均衡)
    			ServiceInfo sinfo = sinfos.get(random.nextInt(sinfos.size()));
    
    			// 2、构造request对象
    			Request req = new Request();
    			req.setServiceName(sinfo.getName());
    			req.setMethod(method.getName());
    			req.setPrameterTypes(method.getParameterTypes());
    			req.setParameters(args);
    
    			// 3、协议层编组
    			// 获得该方法对应的协议
    			MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
    			// 编组请求
    			byte[] data = protocol.marshallingRequest(req);
    
    			// 4、调用网络层发送请求
    			byte[] repData = netClient.sendRequest(data, sinfo);
    
    			// 5解组响应消息
    			Response rsp = protocol.unmarshallingResponse(repData);
    
    			// 6、结果处理
    			if (rsp.getException() != null) {
    				throw rsp.getException();
    			}
    
    			return rsp.getReturnValue();
    		}
    	}
    }
    

    发现者

    public class ServiceInfo {
    
    	private String name;
    
    	private String protocol;
    
    	private String address;
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public String getProtocol() {
    		return protocol;
    	}
    
    	public void setProtocol(String protocol) {
    		this.protocol = protocol;
    	}
    
    	public String getAddress() {
    		return address;
    	}
    
    	public void setAddress(String address) {
    		this.address = address;
    	}
    
    }
    
    
    public interface ServiceInfoDiscoverer {
    	List<ServiceInfo> getServiceInfo(String name);
    }
    

    zookeeper的服务发现实现如下:

    public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
    
    	ZkClient client;
    
    	private String centerRootPath = "/Rpc-framework";
    
    	public ZookeeperServiceInfoDiscoverer() {
    		String addr = PropertiesUtils.getProperties("zk.address");
    		client = new ZkClient(addr);
    		client.setZkSerializer(new MyZkSerializer());
    	}
    
    	@Override
    	public List<ServiceInfo> getServiceInfo(String name) {
    		String servicePath = centerRootPath + "/" + name + "/service";
    		List<String> children = client.getChildren(servicePath);
    		List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
    		for (String ch : children) {
    			try {
    				String deCh = URLDecoder.decode(ch, "UTF-8");
    				ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
    				resources.add(r);
    			} catch (UnsupportedEncodingException e) {
    				e.printStackTrace();
    			}
    		}
    		return resources;
    	}
    
    }
    

    协议层

    public interface MessageProtocol {
    
    	byte[] marshallingRequest(Request req) throws Exception;
    
    	Request unmarshallingRequest(byte[] data) throws Exception;
    
    	byte[] marshallingResponse(Response rsp) throws Exception;
    
    	Response unmarshallingResponse(byte[] data) throws Exception;
    }
    
    public class JavaSerializeMessageProtocol implements MessageProtocol {
    
    	private byte[] serialize(Object obj) throws Exception {
    		ByteArrayOutputStream bout = new ByteArrayOutputStream();
    		ObjectOutputStream out = new ObjectOutputStream(bout);
    		out.writeObject(obj);
    
    		return bout.toByteArray();
    	}
    
    	@Override
    	public byte[] marshallingRequest(Request req) throws Exception {
    
    		return this.serialize(req);
    	}
    
    	@Override
    	public Request unmarshallingRequest(byte[] data) throws Exception {
    		ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
    		return (Request) in.readObject();
    	}
    
    	@Override
    	public byte[] marshallingResponse(Response rsp) throws Exception {
    		return this.serialize(rsp);
    	}
    
    	@Override
    	public Response unmarshallingResponse(byte[] data) throws Exception {
    		ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
    		return (Response) in.readObject();
    	}
    
    }
    
    public class Request implements Serializable {
    
    	/**
    	 * 
    	 */
    	private static final long serialVersionUID = -5200571424236772650L;
    
    	private String serviceName;
    
    	private String method;
    
    	private Map<String, String> headers = new HashMap<String, String>();
    
    	private Class<?>[] prameterTypes;
    
    	private Object[] parameters;
    
    	public String getServiceName() {
    		return serviceName;
    	}
    
    	public void setServiceName(String serviceName) {
    		this.serviceName = serviceName;
    	}
    
    	public String getMethod() {
    		return method;
    	}
    
    	public void setMethod(String method) {
    		this.method = method;
    	}
    
    	public Map<String, String> getHeaders() {
    		return headers;
    	}
    
    	public void setHeaders(Map<String, String> headers) {
    		this.headers = headers;
    	}
    
    	public Class<?>[] getPrameterTypes() {
    		return prameterTypes;
    	}
    
    	public void setPrameterTypes(Class<?>[] prameterTypes) {
    		this.prameterTypes = prameterTypes;
    	}
    
    	public void setParameters(Object[] prameters) {
    		this.parameters = prameters;
    	}
    
    	public String getHeader(String name) {
    		return this.headers == null ? null : this.headers.get(name);
    	}
    
    	public Object[] getParameters() {
    		return this.parameters;
    	}
    
    }
    
    
    public class Response implements Serializable {
    
    	/**
    	 * 
    	 */
    	private static final long serialVersionUID = -4317845782629589997L;
    
    	private Status status;
    
    	private Map<String, String> headers = new HashMap<>();
    
    	private Object returnValue;
    
    	private Exception exception;
    
    	public Response() {
    	};
    
    	public Response(Status status) {
    		this.status = status;
    	}
    
    	public void setStatus(Status status) {
    		this.status = status;
    	}
    
    	public void setHeaders(Map<String, String> headers) {
    		this.headers = headers;
    	}
    
    	public void setReturnValue(Object returnValue) {
    		this.returnValue = returnValue;
    	}
    
    	public void setException(Exception exception) {
    		this.exception = exception;
    	}
    
    	public Status getStatus() {
    		return status;
    	}
    
    	public Map<String, String> getHeaders() {
    		return headers;
    	}
    
    	public Object getReturnValue() {
    		return returnValue;
    	}
    
    	public Exception getException() {
    		return exception;
    	}
    
    	public String getHeader(String name) {
    		return this.headers == null ? null : this.headers.get(name);
    	}
    
    	public void setHaader(String name, String value) {
    		this.headers.put(name, value);
    
    	}
    }
    
    
    public enum Status {
    	SUCCESS(200, "SUCCESS"), ERROR(500, "ERROR"), NOT_FOUND(404, "NOT FOUND");
    
    	private int code;
    
    	private String message;
    
    	private Status(int code, String message) {
    		this.code = code;
    		this.message = message;
    	}
    
    	public int getCode() {
    		return code;
    	}
    
    	public String getMessage() {
    		return message;
    	}
    }
    

    网络层

    public interface NetClient {
    	byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable;
    }
    
    public class NettyNetClient implements NetClient {
    
    	private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
    
    	@Override
    	public byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable {
    
    		String[] addInfoArray = sinfo.getAddress().split(":");
    
    		SendHandler sendHandler = new SendHandler(data);
    		byte[] respData = null;
    		// 配置客户端
    		EventLoopGroup group = new NioEventLoopGroup();
    		try {
    			Bootstrap b = new Bootstrap();
    
    			b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
    					.handler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						public void initChannel(SocketChannel ch) throws Exception {
    							ChannelPipeline p = ch.pipeline();
    							p.addLast(sendHandler);
    						}
    					});
    
    			// 启动客户端连接
    			b.connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])).sync();
    			respData = (byte[]) sendHandler.rspData();
    			logger.info("sendRequest get reply: " + respData);
    
    		} finally {
    			// 释放线程组资源
    			group.shutdownGracefully();
    		}
    
    		return respData;
    	}
    
    	private class SendHandler extends ChannelInboundHandlerAdapter {
    
    		private CountDownLatch cdl = null;
    		private Object readMsg = null;
    		private byte[] data;
    
    		public SendHandler(byte[] data) {
    			cdl = new CountDownLatch(1);
    			this.data = data;
    		}
    
    		@Override
    		public void channelActive(ChannelHandlerContext ctx) throws Exception {
    			logger.info("连接服务端成功:" + ctx);
    			ByteBuf reqBuf = Unpooled.buffer(data.length);
    			reqBuf.writeBytes(data);
    			logger.info("客户端发送消息:" + reqBuf);
    			ctx.writeAndFlush(reqBuf);
    		}
    
    		public Object rspData() {
    
    			try {
    				cdl.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    
    			return readMsg;
    		}
    
    		@Override
    		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    			logger.info("client read msg: " + msg);
    			ByteBuf msgBuf = (ByteBuf) msg;
    			byte[] resp = new byte[msgBuf.readableBytes()];
    			msgBuf.readBytes(resp);
    			readMsg = resp;
    			cdl.countDown();
    		}
    
    		@Override
    		public void channelReadComplete(ChannelHandlerContext ctx) {
    			ctx.flush();
    		}
    
    		@Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    			// Close the connection when an exception is raised.
    			cause.printStackTrace();
    			logger.error("发生异常:" + cause.getMessage());
    			ctx.close();
    		}
    	}
    }
    

    设计服务端

    RPCServer

    客户端请求过来了,服务端首先通过RPCServer接收请求。在RPCServer中需开启网络服务。

    在这里插入图片描述

    RequestHandler

    RPCServer接收到请求以后,将请求交给RequestHandler处理,RequestHandler调用协议层来解组请求消息为Request对象,然后调用过程。消息协议层是复用客户端设计的。

    在这里插入图片描述

    ServiceRegister

    ServiceRegister模块实现服务注册、发布。

    在这里插入图片描述

    实现服务端

    RPCServer

    public abstract class RpcServer {
    
    	protected int port;
    
    	protected String protocol;
    
    	protected RequestHandler handler;
    
    	public RpcServer(int port, String protocol, RequestHandler handler) {
    		super();
    		this.port = port;
    		this.protocol = protocol;
    		this.handler = handler;
    	}
    
    	/**
    	 * 开启服务
    	 */
    	public abstract void start();
    
    	/**
    	 * 停止服务
    	 */
    	public abstract void stop();
    
    	public int getPort() {
    		return port;
    	}
    
    	public void setPort(int port) {
    		this.port = port;
    	}
    
    	public String getProtocol() {
    		return protocol;
    	}
    
    	public void setProtocol(String protocol) {
    		this.protocol = protocol;
    	}
    
    	public RequestHandler getHandler() {
    		return handler;
    	}
    
    	public void setHandler(RequestHandler handler) {
    		this.handler = handler;
    	}
    
    }
    
    public class NettyRpcServer extends RpcServer {
    	private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
    
    	private Channel channel;
    
    	public NettyRpcServer(int port, String protocol, RequestHandler handler) {
    		super(port, protocol, handler);
    	}
    
    	@Override
    	public void start() {
    		// 配置服务器
    		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
    					.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						public void initChannel(SocketChannel ch) throws Exception {
    							ChannelPipeline p = ch.pipeline();
    							p.addLast(new ChannelRequestHandler());
    						}
    					});
    
    			// 启动服务
    			ChannelFuture f = b.bind(port).sync();
    			logger.info("完成服务端端口绑定与启动");
    			channel = f.channel();
    			// 等待服务通道关闭
    			f.channel().closeFuture().sync();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 释放线程组资源
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    
    	@Override
    	public void stop() {
    		this.channel.close();
    	}
    
    	private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
    
    		@Override
    		public void channelActive(ChannelHandlerContext ctx) throws Exception {
    			logger.info("激活");
    		}
    
    		@Override
    		public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    			logger.info("服务端收到消息:" + msg);
    			ByteBuf msgBuf = (ByteBuf) msg;
    			byte[] req = new byte[msgBuf.readableBytes()];
    			msgBuf.readBytes(req);
    			byte[] res = handler.handleRequest(req);
    			logger.info("发送响应:" + msg);
    			ByteBuf respBuf = Unpooled.buffer(res.length);
    			respBuf.writeBytes(res);
    			ctx.write(respBuf);
    		}
    
    		@Override
    		public void channelReadComplete(ChannelHandlerContext ctx) {
    			ctx.flush();
    		}
    
    		@Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    			// Close the connection when an exception is raised.
    			cause.printStackTrace();
    			logger.error("发生异常:" + cause.getMessage());
    			ctx.close();
    		}
    	}
    
    }
    
    

    RequestHandler

    public class RequestHandler {
    	private MessageProtocol protocol;
    
    	private ServiceRegister serviceRegister;
    
    	public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
    		super();
    		this.protocol = protocol;
    		this.serviceRegister = serviceRegister;
    	}
    
    	public byte[] handleRequest(byte[] data) throws Exception {
    		// 1、解组消息
    		Request req = this.protocol.unmarshallingRequest(data);
    
    		// 2、查找服务对象
    		ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());
    
    		Response rsp = null;
    
    		if (so == null) {
    			rsp = new Response(Status.NOT_FOUND);
    		} else {
    			// 3、反射调用对应的过程方法
    			try {
    				Method m = so.getInterf().getMethod(req.getMethod(), req.getPrameterTypes());
    				Object returnValue = m.invoke(so.getObj(), req.getParameters());
    				rsp = new Response(Status.SUCCESS);
    				rsp.setReturnValue(returnValue);
    			} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
    					| InvocationTargetException e) {
    				rsp = new Response(Status.ERROR);
    				rsp.setException(e);
    			}
    		}
    
    		// 4、编组响应消息
    		return this.protocol.marshallingResponse(rsp);
    	}
    
    	public MessageProtocol getProtocol() {
    		return protocol;
    	}
    
    	public void setProtocol(MessageProtocol protocol) {
    		this.protocol = protocol;
    	}
    
    	public ServiceRegister getServiceRegister() {
    		return serviceRegister;
    	}
    
    	public void setServiceRegister(ServiceRegister serviceRegister) {
    		this.serviceRegister = serviceRegister;
    	}
    
    }
    
    

    ServiceRegister

    public interface ServiceRegister {
    
    	void register(ServiceObject so, String protocol, int port) throws Exception;
    
    	ServiceObject getServiceObject(String name) throws Exception;
    }
    
    public class ServiceObject {
    
    	private String name;
    
    	private Class<?> interf;
    
    	private Object obj;
    
    	public ServiceObject(String name, Class<?> interf, Object obj) {
    		super();
    		this.name = name;
    		this.interf = interf;
    		this.obj = obj;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public Class<?> getInterf() {
    		return interf;
    	}
    
    	public void setInterf(Class<?> interf) {
    		this.interf = interf;
    	}
    
    	public Object getObj() {
    		return obj;
    	}
    
    	public void setObj(Object obj) {
    		this.obj = obj;
    	}
    
    }
    
    public class DefaultServiceRegister implements ServiceRegister {
    
    	private Map<String, ServiceObject> serviceMap = new HashMap<>();
    
    	@Override
    	public void register(ServiceObject so, String protocolName, int port) throws Exception {
    		if (so == null) {
    			throw new IllegalArgumentException("参数不能为空");
    		}
    
    		this.serviceMap.put(so.getName(), so);
    	}
    
    	@Override
    	public ServiceObject getServiceObject(String name) {
    		return this.serviceMap.get(name);
    	}
    
    }
    
    /**
     * Zookeeper方式获取远程服务信息类。
     * 
     * ZookeeperServiceInfoDiscoverer
     */
    public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {
    
    	private ZkClient client;
    
    	private String centerRootPath = "/Rpc-framework";
    
    	public ZookeeperExportServiceRegister() {
    		String addr = PropertiesUtils.getProperties("zk.address");
    		client = new ZkClient(addr);
    		client.setZkSerializer(new MyZkSerializer());
    	}
    
    	@Override
    	public void register(ServiceObject so, String protocolName, int port) throws Exception {
    		super.register(so, protocolName, port);
    		ServiceInfo soInf = new ServiceInfo();
    
    		String host = InetAddress.getLocalHost().getHostAddress();
    		String address = host + ":" + port;
    		soInf.setAddress(address);
    		soInf.setName(so.getInterf().getName());
    		soInf.setProtocol(protocolName);
    		this.exportService(soInf);
    
    	}
    
    	private void exportService(ServiceInfo serviceResource) {
    		String serviceName = serviceResource.getName();
    		String uri = JSON.toJSONString(serviceResource);
    		try {
    			uri = URLEncoder.encode(uri, "UTF-8");
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    		String servicePath = centerRootPath + "/" + serviceName + "/service";
    		if (!client.exists(servicePath)) {
    			client.createPersistent(servicePath, true);
    		}
    		String uriPath = servicePath + "/" + uri;
    		if (client.exists(uriPath)) {
    			client.delete(uriPath);
    		}
    		client.createEphemeral(uriPath);
    	}
    }
    

    实现高并发 RPC 框架的要素

    实现高并发 RPC 框架的要素,总结起来有三个要点:

    1. 选择高性能的 I/O 模型,这里推荐使用同步多路 I/O 复用模型;
    2. 调试网络参数,这里面有一些经验值的推荐。比如将 tcp_nodelay 设置为 true,也有一些参数需要在运行中来调试,比如接受缓冲区和发送缓冲区的大小,客户端连接请求缓冲队列的大小(back log)等等;
    3. 序列化协议依据具体业务来选择。如果对性能要求不高可以选择 JSON,否则可以从 Thrift 和 Protobuf 中选择其一。
    展开全文
  • 手写RPC框架

    2020-04-20 23:28:06
    手写RPC框架 一、前言 二、简介 三、手写RPC框架 在rpc远程调用服务中,存在生产者和消费者,建立生产者rpcserver工程,消费者rpcclient工程; 生产者工程目录结构: 接口作为将要发布的服务 package com.server.rpc...

    手写RPC框架

    一、前言

    二、简介

    三、手写RPC框架

    在rpc远程调用服务中,存在生产者和消费者,建立生产者rpcserver工程,消费者rpcclient工程;
    生产者工程目录结构在这里插入图片描述

    接口作为将要发布的服务

    package com.server.rpc;
    //将要发布的服务接口
    public interface SayHello {
        public String sayHello(String args);
    }
    

    服务调用间传输类

    package com.server.rpc;
    
    import java.io.Serializable;
    import java.util.Arrays;
    
    public class RpcRequestBean implements Serializable {
        private String className;
        private String methodName;
        private Object[] params;
    
        @Override
        public String toString() {
            return "RpcRequestBean{" +
                    "className='" + className + '\'' +
                    ", methodName='" + methodName + '\'' +
                    ", params=" + Arrays.toString(params) +
                    '}';
        }
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Object[] getParams() {
            return params;
        }
    
        public void setParams(Object[] params) {
            this.params = params;
        }
    }
    
    

    生产者工程中rpc-server-provide模块需引入rpc-server-api工程jar包实现接口

     <dependency>
            <groupId>com.soft.rpc</groupId>
            <artifactId>rpc-server-api</artifactId>
            <version>1.0-SNAPSHOT</version>
     </dependency>
    
    服务实现类
    
    ```java
    package com.server.rpc;
    
    public class SayHelloImpl implements SayHello {
        public String sayHello(String args) {
            return "hello: First Rpc demo";
        }
    }
    

    创建服务发布类

    package com.server.rpc;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class RpcProxyServer {
        private ExecutorService executorService= Executors.newCachedThreadPool();
        /**
         *
         * @param service  要发布的服务
         * @param port      暴露的端口号
         */
        public void publisher(Object service, int port) throws IOException {
            ServerSocket serverSocket=null;
            try {
                serverSocket = new ServerSocket(port);
                while (true) {
                    final Socket socket = serverSocket.accept();
    //                socket.getInputStream();  阻塞IO
                    //进入下一步骤,说明有客户端连接进来
                   executorService.execute(new ProcessHandle(socket,service));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            }
    
        }
    }
    ```创建线程执行客户端请求
    
    ```java
    package com.server.rpc;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.net.Socket;
    
    public class ProcessHandle implements Runnable {
        private Socket socket;
        private Object service;
    
        public ProcessHandle(Socket socket, Object service) {
            this.socket = socket;
            this.service = service;
        }
    
        public void run() {
            ObjectInputStream objectInputStream = null;
            ObjectOutputStream objectOutputStream = null;
            try {
                objectInputStream = new ObjectInputStream(socket.getInputStream());
                //请求的方法,参数,名称,目标类,反序列化
                RpcRequestBean rpcRequestBean = (RpcRequestBean) objectInputStream.readObject();
                Object result = invoke(rpcRequestBean);
                objectOutputStream=new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(result);//序列化,写入到通信管道
                objectOutputStream.flush();
    
    
              objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                if (objectInputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (objectOutputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private Object invoke(RpcRequestBean rpcRequestBean) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
            Object[] args = rpcRequestBean.getParams();//请求参数
            Class<?>[] types =null;
            if (args.length > 0) {
                types = new Class[args.length];
                for (int i = 0; i < args.length; i++) {
                    types[i]=args[i].getClass();
                }
            }
            //反射加载对应的类
            Class<?> clazz = Class.forName(rpcRequestBean.getClassName());
            //通过反射找到对应class中的方法
            Method method = clazz.getMethod(rpcRequestBean.getMethodName(), types);
            Object result = method.invoke(service, args);
            return result;
    
        }
    }
    
    ```创建服务启动类,发布服务
    
    ```java
    package com.server.rpc;
    
    import java.io.IOException;
    
    public class SendServer {
        public static void main(String[] args) throws IOException {
            SayHelloImpl sayHello = new SayHelloImpl();
            RpcProxyServer rpcProxyServer = new RpcProxyServer();
            rpcProxyServer.publisher(sayHello,8888);
    
        }
    }
    

    消费者工程目录结构:*在这里插入图片消费者工程描述
    消费者工程引入生产者工程依赖

    <dependency>
            <groupId>com.soft.rpc</groupId>
            <artifactId>rpc-server-api</artifactId>
            <version>1.0-SNAPSHOT</version>
       </dependency>
    

    请求服务启动类

    package com.rpc;
    
    import com.server.rpc.SayHello;
    
    public class GetService {
        public static void main(String[] args) {
            RpcProxyClient rpcProxyClient = new RpcProxyClient();
            SayHello sayHello=rpcProxyClient.clientProxy(SayHello.class, "localhost", 8888);
            System.out.println(sayHello.sayHello("s"));
        }
    }
    

    动态代理请求

    package com.rpc;
    
    import java.lang.reflect.Proxy;
    
    public class RpcProxyClient {
        public <T> T clientProxy(final Class<T> interFaceClas, final String host
    ,final int port) {//interface com.server.rpc.SayHello
            return (T) Proxy.newProxyInstance(interFaceClas.getClassLoader(),
             new Class<?>[]{interFaceClas}, new RemoteInvocationHandle(host,port));
        }
    }
    

    代理类请求服务结果处理

    package com.rpc;
    
    import com.server.rpc.RpcRequestBean;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    
    public class RemoteInvocationHandle implements InvocationHandler {
        private String host;
        private int port;
    
        public RemoteInvocationHandle(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            System.out.println("begin" + host + "->" + port);
            RpcRequestBean rpcRequestBean = new RpcRequestBean();
            rpcRequestBean.setClassName(method.getDeclaringClass().getName());//com.server.rpc.SayHello
            rpcRequestBean.setMethodName(method.getName());//sayHello
            rpcRequestBean.setParams(args);
            RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
            Object result = rpcNetTransport.send(rpcRequestBean);
            return result;
        }
                }
    

    构建服务调用类

    package com.rpc;
    
    import com.server.rpc.RpcRequestBean;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.Socket;
    
    
    public class RpcNetTransport {
        private String host;
        private int port;
    
        public RpcNetTransport(String host, int port) {
            this.host = host;
            this.port = port;
        }
        public Object send(RpcRequestBean rpcRequestBean)  {//RpcRequestBean{className='com.server.rpc.SayHello', methodName='sayHello', params=[s]}
            //构建服务端请求,把request写入到服务端
            Socket socket=null;
            ObjectOutputStream objectOutputStream = null;
            ObjectInputStream objectInputStream = null;
            Object result =null;
            try {
                socket = new Socket(host, port);
                objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(rpcRequestBean);
                objectOutputStream.flush();
                //得到服务端返回结果
               objectInputStream = new ObjectInputStream(socket.getInputStream());
               result = objectInputStream.readObject();//hello: First Rpc demo
    
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                if (objectInputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (objectOutputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            return result;
    
        }
    }
    

    分别启动生产者工程和消费者工程,消费者端运行结果显示:

    展开全文
  • 基于netty的手写rpc框架。
  • rpc远程过程调用的手写简单源码 学习rpc通信的可以下载下来看看 很有帮助 包括客户端和服务端的网络调用 通信 序列化....等等
  • 手写rpc问题清单

    2020-02-03 09:19:30
    手写rpc问题清单参考链接运行过程 参考链接 轻量级分布式 RPC 框架 一个轻量级分布式RPC框架–NettyRpc 运行过程
  • RPC是一种远程调用的通信协议,例如dubbo、thrift等,我们在互联网高并发应用开发时候都会使用到类似的服务。本专题主要通过三个章节实现一个rpc通信的基础功能,来学习RPC服务...- 手写RPC框架第三章《RPC中间件》
  • 手写RPC框架功能快捷键 一. RPC是什么 RPC全称remote procedure call,翻译过来就是远程过程调用。在分布式系统中,一个模块像调用本地方法一样调用远程方法的过程,就叫RPC。 我们耳熟能详的webservice、restful...
  • 手写RPC两个工程文件 rpcServer rpcClient 博客地址:https://blog.csdn.net/qq_36963950/article/details/106459616
  • 手写篇:如何手写RPC框架? 首先我们讲下什么是RPC? RPC(Remote Procedure Call)远程过程调用协议,他是一种通过网络从远程计算机程序请求服务。简单的来说,就是通过网络进行远程执行,并返回结果。 像阿里的...
  • 基于Netty手写 RPC

    2019-05-27 19:42:35
    手写RPC 整体分析 RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序 上请求服务,而不需要了解底层网络实现的技术。常见的RPC 框架有: 源自阿里的Dubbo, Spring 旗下的Spring Cloud...
  • 基于 Netty 手写 RPC

    2019-05-27 11:46:49
    手写RPC 整体分析 RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序 上请求服务,而不需要了解底层网络实现的技术。常见的RPC 框架有: 源自阿里的Dubbo, Spring 旗下的Spring Cloud...
  • 手写rpc的项目

    2019-01-08 15:02:44
    自己动手手写一个rpc框架,只是简单介绍原理,希望能够学习rpc的你给予一定的帮助。
  • 手写RPC机制--理解Dubbo核心内容1、Dubbo框架的概述2、RPC如何理解更加准确3、手写JAVA版RPC机制4、理解分布式微服务体系的痛点问题一、Dubbo概念介绍二、RPC概述1、什么是RPC2、RPC 核心功能三、手写JAVA版RPC机制1...
  • 手写RPC从零开始

    2018-07-03 21:02:00
    前言:现在随着微服务、分布式的流行,基本大点的项目必用RPC框架,比如阿里的dubbo,Thrift等,现在我将一步步来手写rpc,我们来慢慢熟悉这个过程,也便于看dubbo的源码,不过在这之间肯定也会遇到很多问题,希望可以和大家...
  • 手写RPC第一版

    2020-06-26 10:55:03
    手写RPC第一版前言创建项目goods-apigoods-provider如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一...
  • 从零开始手写RPC框架

    2020-11-30 21:22:43
    提示:文章写完后,目录可以...在此记录一下从头手写RPC框架的全过程 一、项目实现功能 实现多个服务之间的远程过程调用,使用起来非常简单。 二、使用步骤 1.引入库 Maven依赖如下(示例,最新版本请到Maven仓库获
  • 简单实现 手写RPC通讯框架前言设计思路代码实现功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右...
  • 文章目录RPC介绍Fegin原理@FeignClientDubbo理解手写rpc框架时序图个人的实战启动zkmaven编写rpc-provider请求类编写rpc-consumer调用远程接口手写框架相关改造动态代理调用远程接口返回结果@Component理解源码地址 ...
  • 上一篇我们介绍了整个手写RPC的设计思路,本篇我们来动手实现手写rpc框架的代码。这里我们先从框架层开始写起。 我们先来回顾一下之前的工程结构: 我们分别从客户端框架、服务端框架、公共模块,一直到应用层模块...
  • 【诗90:10】我们一生的年日是七十岁,若是强壮可到八十岁;...在第一篇「手写RPC(一)基于BIO」我们已经完成了 RPC 的实现,那么服务的发布和调用怎样交给 spring 管理呢? 这正是下面要探讨完成的事情
  • 所以沉下心看清代码本质很重要,这次给大家带来的是手写RPC框架。 完整代码以及说明文档,点我跳跃~ 1. 什么是RPC? RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用...
  • 手写Rpc(一) 1、框架模型 2、开工 2.1 代理对象大概框架 要像调用本地方法一样调用远程方法,那么我们需要对本地调用的方法进行动态代理。 @Slf4j public class InvokeProxy { public static <T> T proxy...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,366
精华内容 546
关键字:

手写rpc