精华内容
下载资源
问答
  • dubbo参数回调机制
    2021-12-12 19:16:46

    概念

    参数回调就是dubbo的provider调用consumer代码.
    参数回调分为同步回调和异步回调

    同步回调官网文档地址:https://dubbo.apache.org/zh/docs/v2.7/user/examples/callback-parameter/
    异步回调官网文档地址
    https://dubbo.apache.org/zh/docs/v2.7/user/examples/async-call/
    异步回调就是利用了Java的CompletableFuture

    gitee代码地址

    https://gitee.com/zjj19941/ZJJ_Dubbo.gitcallback项目就是代码,自己准备个zookeeper,然后改一下配置文件,先执行provider,再执行consumer,即可看到效果

    使用

    interface项目声明一个回调接口类

    package com.zjj;
    // 回调参数
    public interface DemoServiceListener {
        String changed(String msg) throws InterruptedException;
    }
    
    

    interface项目接口添加一个回调方法

    package com.zjj;
    
    
    public interface DemoService {
    
        // 添加回调逻辑,不然的话消费者无法进行回调
        //参数2是key, 不同的key的回调函数可以不一样. 这个key就类似于
        default String sayHello(String name,  DemoServiceListener listener) throws InterruptedException {
            return null;
        };
    }
    
    

    provider编写回调方法

    public String sayHello(String name, DemoServiceListener callback) 是回调方法

    @Service也配置了一些参数回调相关的参数

    package com.zjj.provider.service;
    
    import com.zjj.DemoService;
    import com.zjj.DemoServiceListener;
    import org.apache.dubbo.config.annotation.Argument;
    import org.apache.dubbo.config.annotation.Method;
    import org.apache.dubbo.config.annotation.Service;
    
    
    // DemoService的sayHello方法的index=1的参数是回调对象,服务消费者可以调用addListener方法来添加回调对象,服务提供者一旦执行回调对象的方法就会通知给服务消费者
    //arguments = {@Argument(index = 1, callback = true)})} 的意思是 sayHello方法的第二个参数是回调参数
    //callbacks = 3 的意思是这个服务最多支持多少个回调
    @Service(version = "default", timeout =1000000  ,  methods = {@Method(name = "sayHello", arguments = {@Argument(index = 1, callback = true)})}, callbacks = 3)
    public class CallBackDemoService implements DemoService {
    
    
    
        /**
         * 回调方法
         */
        @Override
        public String sayHello(String name,  DemoServiceListener callback) throws InterruptedException {
            System.out.println("1.开始执行回调服务,参数name是:" + name + "   毫秒值是 :" + System.currentTimeMillis());
    
            //在执行回调函数
            String rs = callback.changed("我是provider传过去的入参");
            Thread.sleep(1000);
    
    
            System.out.println("3.回调的结果是: " + rs  + "   毫秒值是 :" + System.currentTimeMillis());
            return String.format( name);  // 正常访问
        }
    
    }
    
    

    consumer编写回调类

    这个回调类的逻辑会被provider调用

    package com.zjj;
    
    //参数回调
    public class DemoServiceListenerImpl implements DemoServiceListener {
    
        @Override
        public String changed(String msg) throws InterruptedException {
    
            System.out.println("2.被回调了:"+msg  + "   毫秒值是 :" + System.currentTimeMillis());
            Thread.sleep(1000);
            return "回调结果";
        }
    }
    
    

    consumer方法访问provider

    参数2就是指定回调方法.指定的这个回调方法会被provider调用到.

    String rs = demoService.sayHello(“周瑜”, new DemoServiceListenerImpl());

    package com.zjj;
    
    import org.apache.dubbo.config.annotation.Reference;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    
    @SpringBootApplication
    public class DubboConsumerDemo  {
    
      //@Reference注解就是用于标记这个服务具体使用了生产者的哪个接口实现
        @Reference(version = "default",timeout = 100000000)
        private DemoService demoService;
    
        public static void main(String[] args) throws InterruptedException {
            ConfigurableApplicationContext context = SpringApplication.run(DubboConsumerDemo.class);
            DemoService demoService = context.getBean(DemoService.class);
            //等provider调用完了回调之后,这个方法才会给返回值结果.
            String rs = demoService.sayHello("周瑜",  new DemoServiceListenerImpl());
            System.out.println("4.rs:" + rs  + "   毫秒值是 :" + System.currentTimeMillis());
        }
    
    }
    
    

    执行结果

    provider控制台

    1.开始执行回调服务,参数name是:周瑜   毫秒值是 :1639306614182
    3.回调的结果是: 回调结果   毫秒值是 :1639306616205
    

    consumer控制台

    2.被回调了:我是provider传过去的入参   毫秒值是 :1639306614201
    4.rs:周瑜   毫秒值是 :1639306616210
    

    看控制台打印的毫秒值就能看出来方法执行顺序.

    1.consumer先调用provider的sayHello方法,
    2.provider调用consumer的回调方法
    3.provider收到回调结果后方法执行完毕
    4.consumer的sayHello执行完毕,获取到方法的返回结果

    上面是同步回调的处理顺序

    更多相关内容
  • dubbo 参数验证

    2022-02-21 22:42:56
    dubbo 参数验证

    dubbo 参数验证

              

    官网:https://dubbo.apache.org/zh/docs/advanced/parameter-validation/

                   

                 

                                      

    参数验证

               

    相关依赖

    <dependency>
        <groupId>jakarta.validation</groupId>
        <artifactId>jakarta.validation-api</artifactId>
        <version>3.0.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.hibernate.validator</groupId>
        <artifactId>hibernate-validator</artifactId>
        <version>7.0.2.Final</version>
    </dependency>
    

                    

    pojo 层:标注验证注解

    @Data
    public class ValidationParameter implements Serializable {
        private static final long serialVersionUID = 7158911668568000392L;
     
        @NotNull // 不允许为空
        @Size(min = 1, max = 20) // 长度或大小范围
        private String name;
     
        @NotNull(groups = ValidationService.Save.class) // 保存时不允许为空,更新时允许为空 ,表示不更新该字段
        @Pattern(regexp = "^\\s*\\w+(?:\\.{0,1}[\\w-]+)*@[a-zA-Z0-9]+(?:[-.][a-zA-Z0-9]+)*\\.[a-zA-Z]+\\s*$")
        private String email;
     
        @Min(18) // 最小值
        @Max(100) // 最大值
        private int age;
     
        @Past // 必须为一个过去的时间
        private Date loginDate;
     
        @Future // 必须为一个未来的时间
        private Date expiryDate;
    
    }

                  

    service 层:对pojo参数分组验证

    public interface ValidationService { // 缺省可按服务接口区分验证场景,如:@NotNull(groups = ValidationService.class)   
        @interface Save{}   //与方法同名接口,首字母大写,用于区分验证场景
                            //分组名也可以自定义,无大小写要求
                            //如:@NotNull(groups = ValidationService.Save.class),可选
        void save(ValidationParameter parameter);
        void update(ValidationParameter parameter);
    }
    

                     

    servcie 层:定义分组验证顺序

    @GroupSequence(value = {ValidationService.Save.class, ValidationService.Update.class})
    public interface ValidationService {   
    
        @interface Save{}
        void save(ValidationParameter parameter);
     
        @interface Update{} 
        void update(ValidationParameter parameter);
    }

                  

    service 层:对方法参数进行验证

    public interface ValidationService {
    
        void save(@NotNull ValidationParameter parameter);  //参数不为空
        void delete(@Min(1) int id);      //验证基本类型参数
    }

                  

    开启验证:如果不开启验证,不会检验参数

    # 服务端开启验证
    <dubbo:service id="validationService" 
    interface="org.apache.dubbo.examples.validation.api.ValidationService" 
    ref="validationService" validation="true" />
    
    # 消费端开启验证
    <dubbo:reference id="validationService" 
    interface="org.apache.dubbo.examples.validation.api.ValidationService"
    validation="true" />

                   

                     

                                      

    使用示例

            

    ************

    服务端

                

                             

                

    application.yml

    dubbo:
      application:
        name: dubbo-provider
        #register-mode: instance
      registry:
        address: localhost:2181
        protocol: zookeeper
        group: dubbo
      protocol:
        name: dubbo
        #port: 20880

           

    Product:需实现seriable接口

    @Data
    public class Product implements Serializable {
    
        private Integer id;
        private String name;
        private Integer num;
        private Double price;
        private LocalDateTime createTime;
        private LocalDateTime updateTime;
    }
    

               

    ProductService

    public interface ProductService {
    
        Map<Integer,Product> save(Product product);
        Product updatePrice(Product product);
        void delete(Integer id);
    }
    

             

    ProductServiceImpl

    @DubboService
    public class ProductServiceImpl implements ProductService {
    
        private Map<Integer,Product> map = new HashMap<>();
    
        @Override
        public Map<Integer,Product> save(Product product) {
            map.put(product.getId(),product);
    
            return map;
        }
    
        @Override
        public Product updatePrice(Product product) {
            Product object = map.get(product.getId());
            object.setPrice(product.getPrice());
    
            return map.get(product.getId());
        }
    
        @Override
        public void delete(Integer id) {
            map.remove(id);
        }
    }
    

               

    ************

    消费端

         

                             

                          

    application.yml

    dubbo:
      application:
        name: dubbo-consumer
      registry:
        protocol: zookeeper
        address: localhost:2181
        group: dubbo
        #register-mode: instance
      protocol:
        name: dubbo
        #port: 20880
    
    server:
      port: 8081

               

    Product:添加验证注解,在消费端验证

    @Data
    public class Product implements Serializable {
    
        @NotNull
        private Integer id;
    
        @NotNull(groups = ProductService.Save.class)
        private String name;
    
        @NotNull(groups = ProductService.Save.class)
        private Integer num;
    
        @NotNull(groups = ProductService.UpdatePrice.class)
        @DecimalMin(value = "1")
        private Double price;
    
        @Past(groups = ProductService.Save.class)
        @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
        private LocalDateTime createTime;
    
        @Future(groups = ProductService.Save.class)
        @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
        private LocalDateTime updateTime;
    }
    

             

     ProductService:添加验证注解,对方法参数进行验证

    public interface ProductService {
    
        @GroupSequence(value = {UpdatePrice.class})
        @interface Save{}
        Map<Integer,Product> save(Product product);
    
        @interface UpdatePrice{}
        Product updatePrice(Product product);
    
        void delete(Integer id);
    }
    

              

    HelloController

    @RestController
    public class HelloController {
    
        @DubboReference(validation = "true")
        private ProductService productService;
    
        @RequestMapping("/save")
        public Map<Integer,Product> save(Product product){
            return productService.save(product);
        }
    
        @RequestMapping("/update")
        public Product updatePrice(Product product){
            return productService.updatePrice(product);
        }
    
        @RequestMapping("/delete")
        public void save(@Min(1) @NotNull Integer id){
            productService.delete(id);
        }
    }
    

                

    ************

    使用测试

           

    save:时间不满足验证要求,报错

            

    java.lang.ClassNotFoundException: javax.validation.ValidationException
    	at org.apache.dubbo.validation.filter.ValidationFilter.invoke(ValidationFilter.java:88) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:364) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) ~[dubbo-3.0.5.jar:3.0.5]
        ...

                

    save:所有参数均满足验证要求,可成功插入数据

            

                  

    update:更新数据时,price为空不满足验证规则报错

            

    javax.validation.ValidationException: Failed to validate service: com.example.demo.service.ProductService, method: updatePrice, cause: [ConstraintViolationImpl{interpolatedMessage='不能为null', propertyPath=price, rootBeanClass=class com.example.demo.pojo.Product, messageTemplate='{javax.validation.constraints.NotNull.message}'}]
    	at org.apache.dubbo.validation.filter.ValidationFilter.invoke(ValidationFilter.java:96) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:364) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) ~[dubbo-3.0.5.jar:3.0.5]
        ...

                

    update:更新数据时,price不为空可正常插入

            

                

     delete:数据删除时,id最小为1,不满足报错

            

    javax.validation.ValidationException: Failed to validate service: com.example.demo.service.ProductService, method: delete, cause: [ConstraintViolationImpl{interpolatedMessage='最小不能小于2', propertyPath=deleteArgument0, rootBeanClass=class com.example.demo.service.ProductService_DeleteParameter_java.lang.Integer, messageTemplate='{javax.validation.constraints.Min.message}'}]
    	at org.apache.dubbo.validation.filter.ValidationFilter.invoke(ValidationFilter.java:96) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:364) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) ~[dubbo-3.0.5.jar:3.0.5]
    	at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) ~[dubbo-3.0.5.jar:3.0.5]
        ...

                    

    delete:数据删除时,id为1,可正常删除

            

                 

                    

    展开全文
  • Dubbo参数回调

    千次阅读 2021-12-11 16:26:49
    Dubbo参数回调

    dubbo参数回调配置:

    <!-- callbacks代表回调的实例数 -->
    <dubbo:service interface="com.tiger.dubbo.common.service.DubboCallbackService" ref="callbackService" callbacks="2">
            <dubbo:method name="addListener">
                <dubbo:argument callback="true" index="1"/>
            </dubbo:method>
        </dubbo:service>

    在开发中会出现callback的实例数超限情况。对于每个连接,即每个channel默认回调的实例数为1.如果超过该值,则会报错。

     回调处理类CallbackServiceCodec源代码分析:

    客户端:

    private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
        // 获取回调实例的hash code
        int instid = System.identityHashCode(inst);
    
        Map<String, String> params = new HashMap<>(3);
        // no need to new client again
        params.put(IS_SERVER_KEY, Boolean.FALSE.toString());
        // mark it's a callback, for troubleshooting
        params.put(IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
        String group = (url == null ? null : url.getParameter(GROUP_KEY));
        if (group != null && group.length() > 0) {
            params.put(GROUP_KEY, group);
        }
        // add method, for verifying against method, automatic fallback (see dubbo protocol)
        params.put(METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));
    
        Map<String, String> tmpMap = new HashMap<>(url.getParameters());
        tmpMap.putAll(params);
        tmpMap.remove(VERSION_KEY);// doesn't need to distinguish version for callback
        tmpMap.put(INTERFACE_KEY, clazz.getName());
        URL exportUrl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap);
    
        // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide.
        // 该Channel关联的回调实例的key
        String cacheKey = getClientSideCallbackServiceCacheKey(instid);
        // 该Channel关联的回调实例的数量
        String countKey = getClientSideCountKey(clazz.getName());
        if (export) {
            // one channel can have multiple callback instances, no need to re-export for different instance.
            // 如果已经存在
            if (!channel.hasAttribute(cacheKey)) {
                 // 判断实例数是否超限
                if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(inst, clazz, exportUrl);
                    // should destroy resource?
                    Exporter<?> exporter = PROTOCOL.export(invoker);
                    // this is used for tracing if instid has published service or not.
                    channel.setAttribute(cacheKey, exporter);
                    logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url);
                    increaseInstanceCount(channel, countKey);
                }
            }
        } else {
            if (channel.hasAttribute(cacheKey)) {
                Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey);
                exporter.unexport();
                channel.removeAttribute(cacheKey);
                decreaseInstanceCount(channel, countKey);
            }
        }
        return String.valueOf(instid);
    }

    首先会根据回调对象实例得到一个唯一的instid,不同的回调对象有不同的instid。当第一次执行时:

    callback.service.instid.com.tiger.service.CallbackListener.COUNT=1 
    
    callback.service.instid.1639044302=
    
    {DubboExporter@9643} "interface com.tiger.service.CallbackListener

    当第二次调用时: 

    cacheKey=callback.service.instid.1862630585
    countKey=callback.service.instid.com.tiger.service.CallbackListener.COUNT

    由于cacheKey不存在,则会调用isInstancesOverLimit()方法,会将callback.service.instid.com.tiger.service.CallbackListener.COUNT中的值取出来和limit比较,如果大于等于limit,则抛出异常,否则callback.service.instid.com.tiger.service.CallbackListener.COUNT对应的值加1。如果回调对象每次都是同一个,则不会进入isInstancesOverLimit()方法。

    服务端源代码:

    private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz, Invocation inv, int instid, boolean isRefer) {
        Object proxy = null;
        String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
        String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
        proxy = channel.getAttribute(proxyCacheKey);
        String countkey = getServerSideCountKey(channel, clazz.getName());
        if (isRefer) {
            if (proxy == null) {
                URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + Constants.INTERFACE_KEY + "=" + clazz.getName());
                referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(Constants.METHODS_KEY);
                // 判断Channel对应的实例数是否超限
                if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) {
                    @SuppressWarnings("rawtypes")
                    Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid));
                    proxy = proxyFactory.getProxy(invoker);
                    channel.setAttribute(proxyCacheKey, proxy);
                    channel.setAttribute(invokerCacheKey, invoker);
                    increaseInstanceCount(channel, countkey);
    
                    //convert error fail fast .
                    //ignore concurrent problem. 
                    Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
                    if (callbackInvokers == null) {
                        callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1);
                        callbackInvokers.add(invoker);
                        channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers);
                    }
                    logger.info("method " + inv.getMethodName() + " include a callback service :" + invoker.getUrl() + ", a proxy :" + invoker + " has been created.");
                }
            }
        } else {
            if (proxy != null) {
                Invoker<?> invoker = (Invoker<?>) channel.getAttribute(invokerCacheKey);
                try {
                    Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
                    if (callbackInvokers != null) {
                        callbackInvokers.remove(invoker);
                    }
                    invoker.destroy();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                // cancel refer, directly remove from the map
                channel.removeAttribute(proxyCacheKey);
                channel.removeAttribute(invokerCacheKey);
                decreaseInstanceCount(channel, countkey);
            }
        }
        return proxy;
    }

     每个连接,即Channel绑定的callback实例数可以配置,默认为1。所以每个Channel绑定的实例数取决于最小的那个配置。

    当前Dubbo尚未提供取消参数回调的方式。在开发中,如果回调实例数被限制,则回调实例可以被共享。

    展开全文
  • dubbo 参数回调

    2022-02-24 10:01:05
    dubbo 参数回调

    dubbo 参数回调

                

    官网:https://dubbo.apache.org/zh/docs/advanced/callback-parameter/

                     

                       

                                          

    参数回调

               

    参数回调:服务端反向调用消费端方法

            

    服务端:服务接口

    public interface CallbackService {
        void addListener(String key, CallbackListener listener);
    }
    

                  

    服务端:回调接口,该接口回调客户端操作

    public interface CallbackListener {
        void changed(String msg);
    }
    

               

    服务端:服务接口实现类

    public class CallbackServiceImpl implements CallbackService {
         
        public void addListener(String key, CallbackListener listener) {
            listener.changed(getChanged(key)); //调用客户端处理逻辑
        }
         
        private String getChanged(String key) {
            return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        }
    }

               

    服务端:配置回调接口

    <bean id="callbackService" class="com.callback.impl.CallbackServiceImpl" />
    <dubbo:service interface="com.callback.CallbackService" ref="callbackService" connections="1" callbacks="1000">
        <dubbo:method name="addListener">
            <!--index从0开始计数-->
            <dubbo:argument index="1" callback="true" />
            
            <!--也可以通过指定类型的方式-->
            <!--<dubbo:argument type="com.demo.CallbackListener" callback="true" />-->
        </dubbo:method>
    </dubbo:service>
    

               

    消费端:提供回调接口实现

    # 注册服务
    <dubbo:reference id="callbackService" interface="com.callback.CallbackService" />
    
    # 服务调用时提供接口实现
    callbackService.addListener("foo.bar", new CallbackListener(){
        public void changed(String msg) {
            System.out.println("callback1:" + msg);
        }
    });

                    

                     

                                          

    实现原理

          

    参数回调处理过程

    # 消费端
    消费端启动时会获取服务元数据,由于服务端配置了回调参数,客户端会获取回调信息;
    消费端在发起请求时,异步执行以下操作:
     * 将回调参数对象的内存id存储在attachment中,key为sys_callback_arg-index(index
    为实际索引值,如0)
     * 暴露回调接口的dubbo协议服务(复用消费端、客户端建立的tcp连接)
    sys_callback_arg-index会随请求一起发送给服务端;
    
    # 服务端
    服务度端处理回调参数时,会将sys_callback_arg-index的值传递给消费端,key为:callback.service.instid
    
    # 消费端
    消费端接到请求后,根据callback.service.instid查找对应的回调对象,执行对应操作

                 

    CallbackServiceCodec:客户端回调参数对象暴露

    public class CallbackServiceCodec {
    
        public Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException {
                      //将回调参数对象内存id存储在attachment中
            // get URL directly
            URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl();
            byte callbackStatus = isCallBack(url, inv.getProtocolServiceKey(), inv.getMethodName(), paraIndex);
            Object[] args = inv.getArguments();
            Class<?>[] pts = inv.getParameterTypes();
            switch (callbackStatus) {
                case CallbackServiceCodec.CALLBACK_CREATE:  //新建回调参数对象
                    inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv,  url, pts[paraIndex], args[paraIndex], true));
                                    //key:sys_callback_arg- paraIndex
                                    //value:回调参数对象内存id(System.identityHashCode(args[paraIndex]))
                                    //exportOrUnexportCallbackService暴露或者卸载回调参数服务
                    return null;
                case CallbackServiceCodec.CALLBACK_DESTROY:
                    inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv,  url, pts[paraIndex], args[paraIndex], false));
                    return null;
                default:
                    return args[paraIndex];
            }
        }
    
        /**
         * export or unexport callback service on client side
              //在客户端暴露或者卸载回调服务
         *
         * @param channel
         * @param url
         * @param clazz
         * @param inst
         * @param export
         * @throws IOException
         */
        @SuppressWarnings({"unchecked", "rawtypes"})
        private String exportOrUnexportCallbackService(Channel channel, RpcInvocation inv, URL url, Class clazz, Object inst, Boolean export) throws IOException {
            int instid = System.identityHashCode(inst);
    
            Map<String, String> params = new HashMap<>(3);
            // no need to new client again
            params.put(IS_SERVER_KEY, Boolean.FALSE.toString());
            // mark it's a callback, for troubleshooting
            params.put(IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
            String group = (url == null ? null : url.getGroup());
            if (group != null && group.length() > 0) {
                params.put(GROUP_KEY, group);
            }
            // add method, for verifying against method, automatic fallback (see dubbo protocol)
            params.put(METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));
    
            Map<String, String> tmpMap = new HashMap<>();
            if (url != null) {
                Map<String, String> parameters = url.getParameters();
                if (parameters != null && !parameters.isEmpty()) {
                    tmpMap.putAll(parameters);
                }
            }
            tmpMap.putAll(params);
    
            tmpMap.remove(VERSION_KEY);// doesn't need to distinguish version for callback
            tmpMap.remove(Constants.BIND_PORT_KEY); //callback doesn't needs bind.port
            tmpMap.put(INTERFACE_KEY, clazz.getName());
            URL exportUrl = new ServiceConfigURL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(),
                channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap);
    
            // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide.
            String cacheKey = getClientSideCallbackServiceCacheKey(instid);
            String countKey = getClientSideCountKey(clazz.getName());
            if (export) {
                // one channel can have multiple callback instances, no need to re-export for different instance.
                if (!channel.hasAttribute(cacheKey)) {
                    if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
                        ModuleModel moduleModel;
                        if (inv.getServiceModel() == null) {
                            //TODO should get scope model from url?
                            moduleModel = ApplicationModel.defaultModel().getDefaultModule();
                            logger.error("Unable to get Service Model from Invocation. Please check if your invocation failed! " +
                                "This error only happen in UT cases! Invocation:" + inv);
                        } else {
                            moduleModel = inv.getServiceModel().getModuleModel();
                        }
    
                        ServiceDescriptor serviceDescriptor = moduleModel.getServiceRepository().registerService(clazz);
                        ServiceMetadata serviceMetadata = new ServiceMetadata(clazz.getName() + "." + instid, exportUrl.getGroup(), exportUrl.getVersion(), clazz);
                        String serviceKey = BaseServiceMetadata.buildServiceKey(exportUrl.getPath(), group, exportUrl.getVersion());
                        ProviderModel providerModel = new ProviderModel(serviceKey, inst, serviceDescriptor, null, moduleModel, serviceMetadata);
                        moduleModel.getServiceRepository().registerProvider(providerModel);
    
                        exportUrl = exportUrl.setScopeModel(moduleModel);
                        exportUrl = exportUrl.setServiceModel(providerModel);
                        Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exportUrl);
                        // should destroy resource?
                        Exporter<?> exporter = protocolSPI.export(invoker);
                        // this is used for tracing if instid has published service or not.
                        channel.setAttribute(cacheKey, exporter);
                        logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url);
                        increaseInstanceCount(channel, countKey);
                    }
                }
            } else {
                if (channel.hasAttribute(cacheKey)) {
                    Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey);
                    exporter.unexport();
                    channel.removeAttribute(cacheKey);
                    decreaseInstanceCount(channel, countKey);
                }
            }
            return String.valueOf(instid);
        }
    

                  

                          

                                       

    InvokerInvocationHandler:服务端参数回调请求调用

    public class InvokerInvocationHandler implements InvocationHandler {
        private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
        private final Invoker<?> invoker;
        private ServiceModel serviceModel;
        private URL url;
        private String protocolServiceKey;
    
        public static Field stackTraceField;
    
        static {
            try {
                stackTraceField = Throwable.class.getDeclaredField("stackTrace");
                stackTraceField.setAccessible(true);
            } catch (NoSuchFieldException e) {
                // ignore
            }
        }
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
            this.url = invoker.getUrl();
            this.protocolServiceKey = this.url.getProtocolServiceKey();
            this.serviceModel = this.url.getServiceModel();
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                      //回调参数请求调用
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 0) {
                if ("toString".equals(methodName)) {
                    return invoker.toString();
                } else if ("$destroy".equals(methodName)) {
                    invoker.destroy();
                    return null;
                } else if ("hashCode".equals(methodName)) {
                    return invoker.hashCode();
                }
            } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
                return invoker.equals(args[0]);
            }
            RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method, invoker.getInterface().getName(), protocolServiceKey, args);
            String serviceKey = url.getServiceKey();
            rpcInvocation.setTargetServiceUniqueName(serviceKey);
    
            // invoker.getUrl() returns consumer url.
            RpcServiceContext.setRpcContext(url);  //设置消费地址
    
            if (serviceModel instanceof ConsumerModel) {
                rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
                rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
            }
    
            return invoker.invoke(rpcInvocation).recreate();
        }
    }
    

                  

                  

                        

    ChannelWrapper:服务端发起回调请求,将回调参数对象内存id添加到请求中

    class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
    
        private final Channel channel;
        private final String serviceKey;   //回调参数对象内存id
        private final ExchangeClient currentClient;
    
        ChannelWrappedInvoker(Class<T> serviceType, Channel channel, URL url, String serviceKey) {
            super(serviceType, url, new String[]{GROUP_KEY, TOKEN_KEY});
            this.channel = channel;
            this.serviceKey = serviceKey;
            this.currentClient = new HeaderExchangeClient(new ChannelWrapper(this.channel), false);
        }
    
        @Override
        protected Result doInvoke(Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            // use interface's name as service path to export if it's not found on client side
            inv.setAttachment(PATH_KEY, getInterface().getName());
            inv.setAttachment(CALLBACK_SERVICE_KEY, serviceKey);  
                //将回调参数对象内存id附加到attachment上,key为:callback.service.instid
    
            try {
                if (RpcUtils.isOneway(getUrl(), inv)) { // may have concurrency issue
                    currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false));
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv).thenApply(obj -> (AppResponse) obj);
                    return new AsyncRpcResult(appResponseFuture, inv);
                }
            } catch (RpcException e) {
                throw e;
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e);
            } catch (Throwable e) { // here is non-biz exception, wrap it.
                throw new RpcException(e.getMessage(), e);
            }
        }
    

                                                  

                                  

    HeaderExchangeHandler:消费端处理回调请求

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    
        void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
            Response res = new Response(req.getId(), req.getVersion());
            if (req.isBroken()) {
                Object data = req.getData();
    
                String msg;
                if (data == null) {
                    msg = null;
                } else if (data instanceof Throwable) {
                    msg = StringUtils.toString((Throwable) data);
                } else {
                    msg = data.toString();
                }
                res.setErrorMessage("Fail to decode request due to: " + msg);
                res.setStatus(Response.BAD_REQUEST);
    
                channel.send(res);
                return;
            }
            // find handler by message class.
            Object msg = req.getData();   //获取请求数据
            try {
                CompletionStage<Object> future = handler.reply(channel, msg);  //处理请求
                future.whenComplete((appResult, t) -> {
                    try {
                        if (t == null) {
                            res.setStatus(Response.OK);
                            res.setResult(appResult);
                        } else {
                            res.setStatus(Response.SERVICE_ERROR);
                            res.setErrorMessage(StringUtils.toString(t));
                        }
                        channel.send(res);
                    } catch (RemotingException e) {
                        logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                    }
                });
            } catch (Throwable e) {
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(e));
                channel.send(res);
            }
        }
    

                    

                      

     JavassistProxyFactory:创建回调参数对象wrapper对象,调用对应方法

    public class JavassistProxyFactory extends AbstractProxyFactory {
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    
        @Override
        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    
    }
    

                    

              

                        

                                          

    使用示例

          

    ************

    服务端

            

                             

               

    application.yml

    dubbo:
      application:
        name: dubbo-provider
        #register-mode: instance
      registry:
        address: localhost:2181
        protocol: zookeeper
        group: dubbo
      protocol:
        name: dubbo
        #port: 20880

              

    HelloService

    public interface HelloService {
    
        String hello(CustomListener listener);
    }
    

                

    CustomListener

    public interface CustomListener {
    
        void listen(LocalDateTime localDateTime);
    }
    

            

    HelloServiceImpl

    public class HelloServiceImpl implements HelloService {
    
        @Override
        public String hello(CustomListener listener) {
            listener.listen(LocalDateTime.now());
    
            return "hello";
        }
    }
    

                  

    DemoApplication

    @EnableDubbo
    @SpringBootApplication
    @ImportResource("classpath:dubbo/provider.xml")  //导入配置文件
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    
    }
    

                   

    dubbo/provider.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
           xmlns="http://www.springframework.org/schema/beans"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
           http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    
        <bean id="helloServiceImpl" class="com.example.demo.service.impl.HelloServiceImpl"/>
        <dubbo:service id="helloService" ref="helloServiceImpl"
                         interface="com.example.demo.service.HelloService">
            <dubbo:method name="hello">
                <dubbo:argument index="0" callback="true"/>
            </dubbo:method>
        </dubbo:service>
    </beans>
    

                     

    ************

    消费端

         

                             

                

    application.yml

    dubbo:
      application:
        name: dubbo-consumer
      registry:
        protocol: zookeeper
        address: localhost:2181
        group: dubbo
        #register-mode: instance
      protocol:
        name: dubbo
        #port: 20880
    
    server:
      port: 8081

             

    HelloService

    public interface HelloService {
    
        String hello(CustomListener listener);
    }
    

               

    CustomListener

    public interface CustomListener {
    
        void listen(LocalDateTime localDateTime);
    }
    

                

    HelloController

    @RestController
    public class HelloController {
    
        @DubboReference
        private HelloService helloService;
    
        @RequestMapping("/hello")
        public String hello(){
            return helloService.hello(localDateTime -> System.out.println("当前时间为:"+localDateTime));
                                     //调用服务时提供回调接口实现
        }
    }
    

                   

    ************

    使用测试

         

    localhost:8080/hello,控制台输出:

    2022-02-24 10:59:22.527  INFO 1436 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 2.839 seconds (JVM running for 3.403)
    2022-02-24 10:59:25.232  INFO 1436 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
    2022-02-24 10:59:25.233  INFO 1436 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
    2022-02-24 10:59:25.233  INFO 1436 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
    当前时间为:2022-02-24T10:59:25.281622
    

    消费端执行任务,不在服务端执行

        

             

    展开全文
  • Dubbo参数验证

    千次阅读 2019-09-02 16:10:43
    参数验证功能 [备注1]是基于 JSR303 实现的,用户只需标识 JSR303 标准的验证 annotation,并通过声明 filter 来实现验证 [备注2]。 Maven 依赖 <dependency> <groupId>javax.validation</...
  • 自己实现dubbo参数校验

    千次阅读 2020-06-03 15:00:26
    自己实现dubbo调用参数验证背景原理自定义实现 背景 因为工作中经常需要做参数校验,在springboot项目中使用@Valid+@NotNull、@NotBlank…注解开发API接口非常丝滑,相反在开发RPC接口时却还是需要编写大量的参数...
  • 主要介绍了springboot+dubbo+validation 进行rpc参数校验的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • dubbo 参数配置详解

    2019-09-25 03:22:11
    前提:前几天在查询接口超时时,查看日志时,发现某个接口调用了很多次,最终发现是因为dubbo的重试次数,导致接口超时时,会重试多次; 一、dubbo常用配置 <dubbo:service/> 服务配置,用于暴露一个服务,...
  • Dubbo参数配置解析

    千次阅读 2018-11-22 16:14:26
    Dubbo参数配置解析 注意 在dubbo中,group,version,interface是服务的匹配条件,也只有这三个参数来决定是不是同一个服务,其他的配置均为调优和治理参数。 所有的配置项分为三大类:(参见下表中的“作用”一列) ...
  • Dubbo参数验证(五)

    千次阅读 2019-06-03 18:38:07
    1、参考 ...Dubbo中的参数验证基于JSR-303,它是JAVA EE 6中的一项子规范,通过注解的方式用来对 Java Bean中字段的值进行简单验证。Consumer端要调用Provider端的接口,调用接口的话就会有参数...
  • dubbo参数配置解析

    千次阅读 2017-03-28 19:59:23
    dubbo参数配置解析注意在dubbo中,group,version,interface是服务的匹配条件,也只有这三个参数来决定是不是同一个服务,其他的配置均为调优和治理参数。所有的配置项分为三大类:(参见下表中的“作用”一列) 服务...
  • dubbo参数调优

    千次阅读 2018-05-09 22:29:15
    dubbo中配置优先级规律:方法级配置优先级高于接口级,consumer的优先级高于provider。详细:consumer的method配置 &gt; provider的method配置consumer的reference配置 &gt; provider的service配置consumer...
  • 在原作者代码上增加了参数 大家可以试着区分下不同,说明下,本来想免费的,但是现在C网发布必须有资源分。。。哪位知道怎么发免费资源 可私聊,多谢!
  • dubbo参数调优说明

    万次阅读 2016-07-13 22:28:50
    但很多朋友在使用dubbo的时候,只是简单的参考官方说明进行搭建,并没有过多的去思考一些关键参数的意义(也可能是时间紧任务多,没空出来研究),最终做出来的效果不可预知。 这里我根据目前我们项目的使用情况列出...
  • dubbo参数验证

    2021-11-10 18:26:08
    dubbo参数验证基于JSR303规范就是定义了校验注解,错误信息如何提示等的规范,我们只需要使用规定的注解,并声明Filter就可以了。 源码。 1:改造rpc-service-api 1.1:引入依赖 <dependencies> <!-- ...
  • JDK 1.8 JVM参数 对于无大量代理类和需要装载大量第三方外部包的情况下,元空间需要设置一个上限值,并且建议MetaspaceSize值等于MaxMetaspaceSize值,防止触发容器被直接kill掉,具体原因可参考...
  • dubbo参数配置

    2021-01-09 16:40:39
    特别注意dubbo提供者的threads,这个参数很重要,确定了并发的能力。 详情见这个链接:https://blog.csdn.net/youaremoon/article/details/51884644 参数名 作用范围 默认值 说明 备注 actives ...
  • Dubbo配置参数优先级

    2020-08-14 18:35:23
    配置之间的关系 标签 用途 解释 < dubbo:service/> 服务配置 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,... dubbo:reference/>... dubbo:protocol/>... dubbo:application/>
  • dubbo参数校验

    千次阅读 2019-03-10 19:46:00
    dubbo的SPI扩展是有关于参数校验的SP扩展,校难的原理其实也是用的拦截器原理。dubbo包有一个 ValidationFilter的拦截器实现。dubbo是有自己的校验器实现的JValidator,如果需要自定义的话,则需要实现 ...
  • Dubbo性能调优参数需要根据各自业务进行调整。 建议多在provider端配置属性,原因如下: 作为服务的提供方,比服务消费方更清楚服务的性能参数,如调用的超时时间、合理的重试次数等 在 Provider 端配置后,...
  • dubbo 参数杂谈

    2018-05-29 19:02:09
    dubbo 可以通过register group 进行zk 空间隔离, register 隔离是物理隔离, 也就是在zk 上的节点隔离, 不在的情况下默认是在dubbo 节点下面。 2). 服务提供和服务发现的 service group 和 reference group 进行...
  • 接上篇文章... 完善service参数校验 service-api模块依赖 <dependencies> <!-- 参数校验相关依赖 --> <dependency> <groupId>javax.validation</groupId> &l
  • Dubbo参数回调Callback 到底有什么用? 到底是个怎么样的代码逻辑?
  • Dubbo参数配置(一)

    千次阅读 2019-03-21 09:55:20
    目录 启动时检查 集群容错 负载均衡 线程模型 直连提供者 服务只订阅,不注册 ... dubbo默认会在启动时检查依赖的服务是否可用,不可用会抛出异常 1)xml中配置 没有提供者时报错 关闭某个服务的启动...
  • 但当该A服务调用RPC接口请求B服务时,http请求中的Header并不会随RPC请求带入到B服务中,这时可以通过dubbo的RpcContext进行统一参数传递。 RpcContext本质上是一个ThreadLocal,当接收到RPC请求或发起RPC请求时,...
  • 参数回调 事件通知 本地调用 使用场景 本地调用使用Injvm协议,是一个伪协议,它不开启端口,不发起远程调用,只在JVM内直接关联,但执行Dubbo的Filter链。 配置 //定义injvm协议 <dubbo:protocol name=...
  • dubbo--参数回调

    2019-07-21 20:54:30
    在服务调用方,传递服务的实现类。在服务提供方可执行实现类中的方法。目前只支持配置文件的方式。 定义两个接口: public interface CallbackListener { void changed(String msg); } ...public interface ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 55,557
精华内容 22,222
关键字:

dubbo参数

友情链接: RecyclerViewTest.zip