-
2021-12-12 19:16:46
概念
参数回调就是dubbo的provider调用consumer代码.
参数回调分为同步回调和异步回调
同步回调官网文档地址:https://dubbo.apache.org/zh/docs/v2.7/user/examples/callback-parameter/
异步回调官网文档地址
https://dubbo.apache.org/zh/docs/v2.7/user/examples/async-call/
异步回调就是利用了Java的CompletableFuture
gitee代码地址
https://gitee.com/zjj19941/ZJJ_Dubbo.git 的callback项目就是代码,自己准备个zookeeper,然后改一下配置文件,先执行provider,再执行consumer,即可看到效果
使用
interface项目声明一个回调接口类
package com.zjj; // 回调参数 public interface DemoServiceListener { String changed(String msg) throws InterruptedException; }
interface项目接口添加一个回调方法
package com.zjj; public interface DemoService { // 添加回调逻辑,不然的话消费者无法进行回调 //参数2是key, 不同的key的回调函数可以不一样. 这个key就类似于 default String sayHello(String name, DemoServiceListener listener) throws InterruptedException { return null; }; }
provider编写回调方法
public String sayHello(String name, DemoServiceListener callback) 是回调方法
@Service也配置了一些参数回调相关的参数
package com.zjj.provider.service; import com.zjj.DemoService; import com.zjj.DemoServiceListener; import org.apache.dubbo.config.annotation.Argument; import org.apache.dubbo.config.annotation.Method; import org.apache.dubbo.config.annotation.Service; // DemoService的sayHello方法的index=1的参数是回调对象,服务消费者可以调用addListener方法来添加回调对象,服务提供者一旦执行回调对象的方法就会通知给服务消费者 //arguments = {@Argument(index = 1, callback = true)})} 的意思是 sayHello方法的第二个参数是回调参数 //callbacks = 3 的意思是这个服务最多支持多少个回调 @Service(version = "default", timeout =1000000 , methods = {@Method(name = "sayHello", arguments = {@Argument(index = 1, callback = true)})}, callbacks = 3) public class CallBackDemoService implements DemoService { /** * 回调方法 */ @Override public String sayHello(String name, DemoServiceListener callback) throws InterruptedException { System.out.println("1.开始执行回调服务,参数name是:" + name + " 毫秒值是 :" + System.currentTimeMillis()); //在执行回调函数 String rs = callback.changed("我是provider传过去的入参"); Thread.sleep(1000); System.out.println("3.回调的结果是: " + rs + " 毫秒值是 :" + System.currentTimeMillis()); return String.format( name); // 正常访问 } }
consumer编写回调类
这个回调类的逻辑会被provider调用
package com.zjj; //参数回调 public class DemoServiceListenerImpl implements DemoServiceListener { @Override public String changed(String msg) throws InterruptedException { System.out.println("2.被回调了:"+msg + " 毫秒值是 :" + System.currentTimeMillis()); Thread.sleep(1000); return "回调结果"; } }
consumer方法访问provider
参数2就是指定回调方法.指定的这个回调方法会被provider调用到.
String rs = demoService.sayHello(“周瑜”, new DemoServiceListenerImpl());
package com.zjj; import org.apache.dubbo.config.annotation.Reference; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class DubboConsumerDemo { //@Reference注解就是用于标记这个服务具体使用了生产者的哪个接口实现 @Reference(version = "default",timeout = 100000000) private DemoService demoService; public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext context = SpringApplication.run(DubboConsumerDemo.class); DemoService demoService = context.getBean(DemoService.class); //等provider调用完了回调之后,这个方法才会给返回值结果. String rs = demoService.sayHello("周瑜", new DemoServiceListenerImpl()); System.out.println("4.rs:" + rs + " 毫秒值是 :" + System.currentTimeMillis()); } }
执行结果
provider控制台
1.开始执行回调服务,参数name是:周瑜 毫秒值是 :1639306614182 3.回调的结果是: 回调结果 毫秒值是 :1639306616205
consumer控制台
2.被回调了:我是provider传过去的入参 毫秒值是 :1639306614201 4.rs:周瑜 毫秒值是 :1639306616210
看控制台打印的毫秒值就能看出来方法执行顺序.
1.consumer先调用provider的sayHello方法,
2.provider调用consumer的回调方法
3.provider收到回调结果后方法执行完毕
4.consumer的sayHello执行完毕,获取到方法的返回结果
上面是同步回调的处理顺序
更多相关内容 -
dubbo 参数验证
2022-02-21 22:42:56dubbo 参数验证
dubbo 参数验证
官网:https://dubbo.apache.org/zh/docs/advanced/parameter-validation/
参数验证
相关依赖
<dependency> <groupId>jakarta.validation</groupId> <artifactId>jakarta.validation-api</artifactId> <version>3.0.1</version> </dependency> <dependency> <groupId>org.hibernate.validator</groupId> <artifactId>hibernate-validator</artifactId> <version>7.0.2.Final</version> </dependency>
pojo 层:标注验证注解
@Data public class ValidationParameter implements Serializable { private static final long serialVersionUID = 7158911668568000392L; @NotNull // 不允许为空 @Size(min = 1, max = 20) // 长度或大小范围 private String name; @NotNull(groups = ValidationService.Save.class) // 保存时不允许为空,更新时允许为空 ,表示不更新该字段 @Pattern(regexp = "^\\s*\\w+(?:\\.{0,1}[\\w-]+)*@[a-zA-Z0-9]+(?:[-.][a-zA-Z0-9]+)*\\.[a-zA-Z]+\\s*$") private String email; @Min(18) // 最小值 @Max(100) // 最大值 private int age; @Past // 必须为一个过去的时间 private Date loginDate; @Future // 必须为一个未来的时间 private Date expiryDate; }
service 层:对pojo参数分组验证
public interface ValidationService { // 缺省可按服务接口区分验证场景,如:@NotNull(groups = ValidationService.class) @interface Save{} //与方法同名接口,首字母大写,用于区分验证场景 //分组名也可以自定义,无大小写要求 //如:@NotNull(groups = ValidationService.Save.class),可选 void save(ValidationParameter parameter); void update(ValidationParameter parameter); }
servcie 层:定义分组验证顺序
@GroupSequence(value = {ValidationService.Save.class, ValidationService.Update.class}) public interface ValidationService { @interface Save{} void save(ValidationParameter parameter); @interface Update{} void update(ValidationParameter parameter); }
service 层:对方法参数进行验证
public interface ValidationService { void save(@NotNull ValidationParameter parameter); //参数不为空 void delete(@Min(1) int id); //验证基本类型参数 }
开启验证:如果不开启验证,不会检验参数
# 服务端开启验证 <dubbo:service id="validationService" interface="org.apache.dubbo.examples.validation.api.ValidationService" ref="validationService" validation="true" /> # 消费端开启验证 <dubbo:reference id="validationService" interface="org.apache.dubbo.examples.validation.api.ValidationService" validation="true" />
使用示例
************
服务端
application.yml
dubbo: application: name: dubbo-provider #register-mode: instance registry: address: localhost:2181 protocol: zookeeper group: dubbo protocol: name: dubbo #port: 20880
Product:需实现seriable接口
@Data public class Product implements Serializable { private Integer id; private String name; private Integer num; private Double price; private LocalDateTime createTime; private LocalDateTime updateTime; }
ProductService
public interface ProductService { Map<Integer,Product> save(Product product); Product updatePrice(Product product); void delete(Integer id); }
ProductServiceImpl
@DubboService public class ProductServiceImpl implements ProductService { private Map<Integer,Product> map = new HashMap<>(); @Override public Map<Integer,Product> save(Product product) { map.put(product.getId(),product); return map; } @Override public Product updatePrice(Product product) { Product object = map.get(product.getId()); object.setPrice(product.getPrice()); return map.get(product.getId()); } @Override public void delete(Integer id) { map.remove(id); } }
************
消费端
application.yml
dubbo: application: name: dubbo-consumer registry: protocol: zookeeper address: localhost:2181 group: dubbo #register-mode: instance protocol: name: dubbo #port: 20880 server: port: 8081
Product:添加验证注解,在消费端验证
@Data public class Product implements Serializable { @NotNull private Integer id; @NotNull(groups = ProductService.Save.class) private String name; @NotNull(groups = ProductService.Save.class) private Integer num; @NotNull(groups = ProductService.UpdatePrice.class) @DecimalMin(value = "1") private Double price; @Past(groups = ProductService.Save.class) @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private LocalDateTime createTime; @Future(groups = ProductService.Save.class) @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private LocalDateTime updateTime; }
ProductService:添加验证注解,对方法参数进行验证
public interface ProductService { @GroupSequence(value = {UpdatePrice.class}) @interface Save{} Map<Integer,Product> save(Product product); @interface UpdatePrice{} Product updatePrice(Product product); void delete(Integer id); }
HelloController
@RestController public class HelloController { @DubboReference(validation = "true") private ProductService productService; @RequestMapping("/save") public Map<Integer,Product> save(Product product){ return productService.save(product); } @RequestMapping("/update") public Product updatePrice(Product product){ return productService.updatePrice(product); } @RequestMapping("/delete") public void save(@Min(1) @NotNull Integer id){ productService.delete(id); } }
************
使用测试
save:时间不满足验证要求,报错
java.lang.ClassNotFoundException: javax.validation.ValidationException at org.apache.dubbo.validation.filter.ValidationFilter.invoke(ValidationFilter.java:88) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:364) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) ~[dubbo-3.0.5.jar:3.0.5] ...
save:所有参数均满足验证要求,可成功插入数据
update:更新数据时,price为空不满足验证规则报错
javax.validation.ValidationException: Failed to validate service: com.example.demo.service.ProductService, method: updatePrice, cause: [ConstraintViolationImpl{interpolatedMessage='不能为null', propertyPath=price, rootBeanClass=class com.example.demo.pojo.Product, messageTemplate='{javax.validation.constraints.NotNull.message}'}] at org.apache.dubbo.validation.filter.ValidationFilter.invoke(ValidationFilter.java:96) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:364) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) ~[dubbo-3.0.5.jar:3.0.5] ...
update:更新数据时,price不为空可正常插入
delete:数据删除时,id最小为1,不满足报错
javax.validation.ValidationException: Failed to validate service: com.example.demo.service.ProductService, method: delete, cause: [ConstraintViolationImpl{interpolatedMessage='最小不能小于2', propertyPath=deleteArgument0, rootBeanClass=class com.example.demo.service.ProductService_DeleteParameter_java.lang.Integer, messageTemplate='{javax.validation.constraints.Min.message}'}] at org.apache.dubbo.validation.filter.ValidationFilter.invoke(ValidationFilter.java:96) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CopyOfFilterChainNode.invoke(FilterChainBuilder.java:313) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder$CallbackRegistrationInvoker.invoke(FilterChainBuilder.java:187) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invokeWithContext(AbstractClusterInvoker.java:364) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:80) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:332) ~[dubbo-3.0.5.jar:3.0.5] at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:99) ~[dubbo-3.0.5.jar:3.0.5] ...
delete:数据删除时,id为1,可正常删除
-
Dubbo参数回调
2021-12-11 16:26:49Dubbo参数回调dubbo参数回调配置:
<!-- callbacks代表回调的实例数 --> <dubbo:service interface="com.tiger.dubbo.common.service.DubboCallbackService" ref="callbackService" callbacks="2"> <dubbo:method name="addListener"> <dubbo:argument callback="true" index="1"/> </dubbo:method> </dubbo:service>
在开发中会出现callback的实例数超限情况。对于每个连接,即每个channel默认回调的实例数为1.如果超过该值,则会报错。
回调处理类CallbackServiceCodec源代码分析:
客户端:
private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException { // 获取回调实例的hash code int instid = System.identityHashCode(inst); Map<String, String> params = new HashMap<>(3); // no need to new client again params.put(IS_SERVER_KEY, Boolean.FALSE.toString()); // mark it's a callback, for troubleshooting params.put(IS_CALLBACK_SERVICE, Boolean.TRUE.toString()); String group = (url == null ? null : url.getParameter(GROUP_KEY)); if (group != null && group.length() > 0) { params.put(GROUP_KEY, group); } // add method, for verifying against method, automatic fallback (see dubbo protocol) params.put(METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ",")); Map<String, String> tmpMap = new HashMap<>(url.getParameters()); tmpMap.putAll(params); tmpMap.remove(VERSION_KEY);// doesn't need to distinguish version for callback tmpMap.put(INTERFACE_KEY, clazz.getName()); URL exportUrl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap); // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide. // 该Channel关联的回调实例的key String cacheKey = getClientSideCallbackServiceCacheKey(instid); // 该Channel关联的回调实例的数量 String countKey = getClientSideCountKey(clazz.getName()); if (export) { // one channel can have multiple callback instances, no need to re-export for different instance. // 如果已经存在 if (!channel.hasAttribute(cacheKey)) { // 判断实例数是否超限 if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) { Invoker<?> invoker = PROXY_FACTORY.getInvoker(inst, clazz, exportUrl); // should destroy resource? Exporter<?> exporter = PROTOCOL.export(invoker); // this is used for tracing if instid has published service or not. channel.setAttribute(cacheKey, exporter); logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url); increaseInstanceCount(channel, countKey); } } } else { if (channel.hasAttribute(cacheKey)) { Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey); exporter.unexport(); channel.removeAttribute(cacheKey); decreaseInstanceCount(channel, countKey); } } return String.valueOf(instid); }
首先会根据回调对象实例得到一个唯一的instid,不同的回调对象有不同的instid。当第一次执行时:
callback.service.instid.com.tiger.service.CallbackListener.COUNT=1 callback.service.instid.1639044302= {DubboExporter@9643} "interface com.tiger.service.CallbackListener
当第二次调用时:
cacheKey=callback.service.instid.1862630585 countKey=callback.service.instid.com.tiger.service.CallbackListener.COUNT
由于cacheKey不存在,则会调用isInstancesOverLimit()方法,会将callback.service.instid.com.tiger.service.CallbackListener.COUNT中的值取出来和limit比较,如果大于等于limit,则抛出异常,否则callback.service.instid.com.tiger.service.CallbackListener.COUNT对应的值加1。如果回调对象每次都是同一个,则不会进入isInstancesOverLimit()方法。
服务端源代码:
private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz, Invocation inv, int instid, boolean isRefer) { Object proxy = null; String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid); String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid); proxy = channel.getAttribute(proxyCacheKey); String countkey = getServerSideCountKey(channel, clazz.getName()); if (isRefer) { if (proxy == null) { URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + Constants.INTERFACE_KEY + "=" + clazz.getName()); referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(Constants.METHODS_KEY); // 判断Channel对应的实例数是否超限 if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) { @SuppressWarnings("rawtypes") Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid)); proxy = proxyFactory.getProxy(invoker); channel.setAttribute(proxyCacheKey, proxy); channel.setAttribute(invokerCacheKey, invoker); increaseInstanceCount(channel, countkey); //convert error fail fast . //ignore concurrent problem. Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); if (callbackInvokers == null) { callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1); callbackInvokers.add(invoker); channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers); } logger.info("method " + inv.getMethodName() + " include a callback service :" + invoker.getUrl() + ", a proxy :" + invoker + " has been created."); } } } else { if (proxy != null) { Invoker<?> invoker = (Invoker<?>) channel.getAttribute(invokerCacheKey); try { Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); if (callbackInvokers != null) { callbackInvokers.remove(invoker); } invoker.destroy(); } catch (Exception e) { logger.error(e.getMessage(), e); } // cancel refer, directly remove from the map channel.removeAttribute(proxyCacheKey); channel.removeAttribute(invokerCacheKey); decreaseInstanceCount(channel, countkey); } } return proxy; }
每个连接,即Channel绑定的callback实例数可以配置,默认为1。所以每个Channel绑定的实例数取决于最小的那个配置。
当前Dubbo尚未提供取消参数回调的方式。在开发中,如果回调实例数被限制,则回调实例可以被共享。
-
dubbo 参数回调
2022-02-24 10:01:05dubbo 参数回调
dubbo 参数回调
官网:https://dubbo.apache.org/zh/docs/advanced/callback-parameter/
参数回调
参数回调:服务端反向调用消费端方法
服务端:服务接口
public interface CallbackService { void addListener(String key, CallbackListener listener); }
服务端:回调接口,该接口回调客户端操作
public interface CallbackListener { void changed(String msg); }
服务端:服务接口实现类
public class CallbackServiceImpl implements CallbackService { public void addListener(String key, CallbackListener listener) { listener.changed(getChanged(key)); //调用客户端处理逻辑 } private String getChanged(String key) { return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); } }
服务端:配置回调接口
<bean id="callbackService" class="com.callback.impl.CallbackServiceImpl" /> <dubbo:service interface="com.callback.CallbackService" ref="callbackService" connections="1" callbacks="1000"> <dubbo:method name="addListener"> <!--index从0开始计数--> <dubbo:argument index="1" callback="true" /> <!--也可以通过指定类型的方式--> <!--<dubbo:argument type="com.demo.CallbackListener" callback="true" />--> </dubbo:method> </dubbo:service>
消费端:提供回调接口实现
# 注册服务 <dubbo:reference id="callbackService" interface="com.callback.CallbackService" /> # 服务调用时提供接口实现 callbackService.addListener("foo.bar", new CallbackListener(){ public void changed(String msg) { System.out.println("callback1:" + msg); } });
实现原理
参数回调处理过程
# 消费端 消费端启动时会获取服务元数据,由于服务端配置了回调参数,客户端会获取回调信息; 消费端在发起请求时,异步执行以下操作: * 将回调参数对象的内存id存储在attachment中,key为sys_callback_arg-index(index 为实际索引值,如0) * 暴露回调接口的dubbo协议服务(复用消费端、客户端建立的tcp连接) sys_callback_arg-index会随请求一起发送给服务端; # 服务端 服务度端处理回调参数时,会将sys_callback_arg-index的值传递给消费端,key为:callback.service.instid # 消费端 消费端接到请求后,根据callback.service.instid查找对应的回调对象,执行对应操作
CallbackServiceCodec:客户端回调参数对象暴露
public class CallbackServiceCodec { public Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException { //将回调参数对象内存id存储在attachment中 // get URL directly URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl(); byte callbackStatus = isCallBack(url, inv.getProtocolServiceKey(), inv.getMethodName(), paraIndex); Object[] args = inv.getArguments(); Class<?>[] pts = inv.getParameterTypes(); switch (callbackStatus) { case CallbackServiceCodec.CALLBACK_CREATE: //新建回调参数对象 inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv, url, pts[paraIndex], args[paraIndex], true)); //key:sys_callback_arg- paraIndex //value:回调参数对象内存id(System.identityHashCode(args[paraIndex])) //exportOrUnexportCallbackService暴露或者卸载回调参数服务 return null; case CallbackServiceCodec.CALLBACK_DESTROY: inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, inv, url, pts[paraIndex], args[paraIndex], false)); return null; default: return args[paraIndex]; } } /** * export or unexport callback service on client side //在客户端暴露或者卸载回调服务 * * @param channel * @param url * @param clazz * @param inst * @param export * @throws IOException */ @SuppressWarnings({"unchecked", "rawtypes"}) private String exportOrUnexportCallbackService(Channel channel, RpcInvocation inv, URL url, Class clazz, Object inst, Boolean export) throws IOException { int instid = System.identityHashCode(inst); Map<String, String> params = new HashMap<>(3); // no need to new client again params.put(IS_SERVER_KEY, Boolean.FALSE.toString()); // mark it's a callback, for troubleshooting params.put(IS_CALLBACK_SERVICE, Boolean.TRUE.toString()); String group = (url == null ? null : url.getGroup()); if (group != null && group.length() > 0) { params.put(GROUP_KEY, group); } // add method, for verifying against method, automatic fallback (see dubbo protocol) params.put(METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ",")); Map<String, String> tmpMap = new HashMap<>(); if (url != null) { Map<String, String> parameters = url.getParameters(); if (parameters != null && !parameters.isEmpty()) { tmpMap.putAll(parameters); } } tmpMap.putAll(params); tmpMap.remove(VERSION_KEY);// doesn't need to distinguish version for callback tmpMap.remove(Constants.BIND_PORT_KEY); //callback doesn't needs bind.port tmpMap.put(INTERFACE_KEY, clazz.getName()); URL exportUrl = new ServiceConfigURL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap); // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide. String cacheKey = getClientSideCallbackServiceCacheKey(instid); String countKey = getClientSideCountKey(clazz.getName()); if (export) { // one channel can have multiple callback instances, no need to re-export for different instance. if (!channel.hasAttribute(cacheKey)) { if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) { ModuleModel moduleModel; if (inv.getServiceModel() == null) { //TODO should get scope model from url? moduleModel = ApplicationModel.defaultModel().getDefaultModule(); logger.error("Unable to get Service Model from Invocation. Please check if your invocation failed! " + "This error only happen in UT cases! Invocation:" + inv); } else { moduleModel = inv.getServiceModel().getModuleModel(); } ServiceDescriptor serviceDescriptor = moduleModel.getServiceRepository().registerService(clazz); ServiceMetadata serviceMetadata = new ServiceMetadata(clazz.getName() + "." + instid, exportUrl.getGroup(), exportUrl.getVersion(), clazz); String serviceKey = BaseServiceMetadata.buildServiceKey(exportUrl.getPath(), group, exportUrl.getVersion()); ProviderModel providerModel = new ProviderModel(serviceKey, inst, serviceDescriptor, null, moduleModel, serviceMetadata); moduleModel.getServiceRepository().registerProvider(providerModel); exportUrl = exportUrl.setScopeModel(moduleModel); exportUrl = exportUrl.setServiceModel(providerModel); Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exportUrl); // should destroy resource? Exporter<?> exporter = protocolSPI.export(invoker); // this is used for tracing if instid has published service or not. channel.setAttribute(cacheKey, exporter); logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url); increaseInstanceCount(channel, countKey); } } } else { if (channel.hasAttribute(cacheKey)) { Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey); exporter.unexport(); channel.removeAttribute(cacheKey); decreaseInstanceCount(channel, countKey); } } return String.valueOf(instid); }
InvokerInvocationHandler:服务端参数回调请求调用
public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; private ServiceModel serviceModel; private URL url; private String protocolServiceKey; public static Field stackTraceField; static { try { stackTraceField = Throwable.class.getDeclaredField("stackTrace"); stackTraceField.setAccessible(true); } catch (NoSuchFieldException e) { // ignore } } public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; this.url = invoker.getUrl(); this.protocolServiceKey = this.url.getProtocolServiceKey(); this.serviceModel = this.url.getServiceModel(); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //回调参数请求调用 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method, invoker.getInterface().getName(), protocolServiceKey, args); String serviceKey = url.getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); // invoker.getUrl() returns consumer url. RpcServiceContext.setRpcContext(url); //设置消费地址 if (serviceModel instanceof ConsumerModel) { rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel); rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method)); } return invoker.invoke(rpcInvocation).recreate(); } }
ChannelWrapper:服务端发起回调请求,将回调参数对象内存id添加到请求中
class ChannelWrappedInvoker<T> extends AbstractInvoker<T> { private final Channel channel; private final String serviceKey; //回调参数对象内存id private final ExchangeClient currentClient; ChannelWrappedInvoker(Class<T> serviceType, Channel channel, URL url, String serviceKey) { super(serviceType, url, new String[]{GROUP_KEY, TOKEN_KEY}); this.channel = channel; this.serviceKey = serviceKey; this.currentClient = new HeaderExchangeClient(new ChannelWrapper(this.channel), false); } @Override protected Result doInvoke(Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; // use interface's name as service path to export if it's not found on client side inv.setAttachment(PATH_KEY, getInterface().getName()); inv.setAttachment(CALLBACK_SERVICE_KEY, serviceKey); //将回调参数对象内存id附加到attachment上,key为:callback.service.instid try { if (RpcUtils.isOneway(getUrl(), inv)) { // may have concurrency issue currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false)); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv).thenApply(obj -> (AppResponse) obj); return new AsyncRpcResult(appResponseFuture, inv); } } catch (RpcException e) { throw e; } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e); } catch (Throwable e) { // here is non-biz exception, wrap it. throw new RpcException(e.getMessage(), e); } }
HeaderExchangeHandler:消费端处理回调请求
public class HeaderExchangeHandler implements ChannelHandlerDelegate { void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // find handler by message class. Object msg = req.getData(); //获取请求数据 try { CompletionStage<Object> future = handler.reply(channel, msg); //处理请求 future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
JavassistProxyFactory:创建回调参数对象wrapper对象,调用对应方法
public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
使用示例
************
服务端
application.yml
dubbo: application: name: dubbo-provider #register-mode: instance registry: address: localhost:2181 protocol: zookeeper group: dubbo protocol: name: dubbo #port: 20880
HelloService
public interface HelloService { String hello(CustomListener listener); }
CustomListener
public interface CustomListener { void listen(LocalDateTime localDateTime); }
HelloServiceImpl
public class HelloServiceImpl implements HelloService { @Override public String hello(CustomListener listener) { listener.listen(LocalDateTime.now()); return "hello"; } }
DemoApplication
@EnableDubbo @SpringBootApplication @ImportResource("classpath:dubbo/provider.xml") //导入配置文件 public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
dubbo/provider.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <bean id="helloServiceImpl" class="com.example.demo.service.impl.HelloServiceImpl"/> <dubbo:service id="helloService" ref="helloServiceImpl" interface="com.example.demo.service.HelloService"> <dubbo:method name="hello"> <dubbo:argument index="0" callback="true"/> </dubbo:method> </dubbo:service> </beans>
************
消费端
application.yml
dubbo: application: name: dubbo-consumer registry: protocol: zookeeper address: localhost:2181 group: dubbo #register-mode: instance protocol: name: dubbo #port: 20880 server: port: 8081
HelloService
public interface HelloService { String hello(CustomListener listener); }
CustomListener
public interface CustomListener { void listen(LocalDateTime localDateTime); }
HelloController
@RestController public class HelloController { @DubboReference private HelloService helloService; @RequestMapping("/hello") public String hello(){ return helloService.hello(localDateTime -> System.out.println("当前时间为:"+localDateTime)); //调用服务时提供回调接口实现 } }
************
使用测试
localhost:8080/hello,控制台输出:
2022-02-24 10:59:22.527 INFO 1436 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 2.839 seconds (JVM running for 3.403) 2022-02-24 10:59:25.232 INFO 1436 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet' 2022-02-24 10:59:25.233 INFO 1436 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet' 2022-02-24 10:59:25.233 INFO 1436 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms 当前时间为:2022-02-24T10:59:25.281622
消费端执行任务,不在服务端执行
-
Dubbo参数验证
2019-09-02 16:10:43参数验证功能 [备注1]是基于 JSR303 实现的,用户只需标识 JSR303 标准的验证 annotation,并通过声明 filter 来实现验证 [备注2]。 Maven 依赖 <dependency> <groupId>javax.validation</... -
自己实现dubbo参数校验
2020-06-03 15:00:26自己实现dubbo调用参数验证背景原理自定义实现 背景 因为工作中经常需要做参数校验,在springboot项目中使用@Valid+@NotNull、@NotBlank…注解开发API接口非常丝滑,相反在开发RPC接口时却还是需要编写大量的参数... -
springboot+dubbo+validation 进行rpc参数校验的实现方法
2020-08-25 14:17:59主要介绍了springboot+dubbo+validation 进行rpc参数校验的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 -
dubbo 参数配置详解
2019-09-25 03:22:11前提:前几天在查询接口超时时,查看日志时,发现某个接口调用了很多次,最终发现是因为dubbo的重试次数,导致接口超时时,会重试多次; 一、dubbo常用配置 <dubbo:service/> 服务配置,用于暴露一个服务,... -
Dubbo参数配置解析
2018-11-22 16:14:26Dubbo参数配置解析 注意 在dubbo中,group,version,interface是服务的匹配条件,也只有这三个参数来决定是不是同一个服务,其他的配置均为调优和治理参数。 所有的配置项分为三大类:(参见下表中的“作用”一列) ... -
Dubbo参数验证(五)
2019-06-03 18:38:071、参考 ...Dubbo中的参数验证基于JSR-303,它是JAVA EE 6中的一项子规范,通过注解的方式用来对 Java Bean中字段的值进行简单验证。Consumer端要调用Provider端的接口,调用接口的话就会有参数... -
dubbo参数配置解析
2017-03-28 19:59:23dubbo参数配置解析注意在dubbo中,group,version,interface是服务的匹配条件,也只有这三个参数来决定是不是同一个服务,其他的配置均为调优和治理参数。所有的配置项分为三大类:(参见下表中的“作用”一列) 服务... -
dubbo参数调优
2018-05-09 22:29:15dubbo中配置优先级规律:方法级配置优先级高于接口级,consumer的优先级高于provider。详细:consumer的method配置 > provider的method配置consumer的reference配置 > provider的service配置consumer... -
Spingboot-Dubbo-zookeeper实例-带参数
2017-11-29 14:06:21在原作者代码上增加了参数 大家可以试着区分下不同,说明下,本来想免费的,但是现在C网发布必须有资源分。。。哪位知道怎么发免费资源 可私聊,多谢! -
dubbo参数调优说明
2016-07-13 22:28:50但很多朋友在使用dubbo的时候,只是简单的参考官方说明进行搭建,并没有过多的去思考一些关键参数的意义(也可能是时间紧任务多,没空出来研究),最终做出来的效果不可预知。 这里我根据目前我们项目的使用情况列出... -
dubbo之参数验证
2021-11-10 18:26:08dubbo的参数验证基于JSR303规范就是定义了校验注解,错误信息如何提示等的规范,我们只需要使用规定的注解,并声明Filter就可以了。 源码。 1:改造rpc-service-api 1.1:引入依赖 <dependencies> <!-- ... -
容器化JVM系统Dubbo参数配置 经验总结
2019-07-09 16:27:57JDK 1.8 JVM参数 对于无大量代理类和需要装载大量第三方外部包的情况下,元空间需要设置一个上限值,并且建议MetaspaceSize值等于MaxMetaspaceSize值,防止触发容器被直接kill掉,具体原因可参考... -
dubbo的参数配置
2021-01-09 16:40:39特别注意dubbo提供者的threads,这个参数很重要,确定了并发的能力。 详情见这个链接:https://blog.csdn.net/youaremoon/article/details/51884644 参数名 作用范围 默认值 说明 备注 actives ... -
Dubbo配置参数优先级
2020-08-14 18:35:23配置之间的关系 标签 用途 解释 < dubbo:service/> 服务配置 用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,... dubbo:reference/>... dubbo:protocol/>... dubbo:application/> -
【dubbo】参数校验
2019-03-10 19:46:00dubbo的SPI扩展是有关于参数校验的SP扩展,校难的原理其实也是用的拦截器原理。dubbo包有一个 ValidationFilter的拦截器实现。dubbo是有自己的校验器实现的JValidator,如果需要自定义的话,则需要实现 ... -
(十)Dubbo性能调优参数
2022-03-07 17:43:01Dubbo性能调优参数需要根据各自业务进行调整。 建议多在provider端配置属性,原因如下: 作为服务的提供方,比服务消费方更清楚服务的性能参数,如调用的超时时间、合理的重试次数等 在 Provider 端配置后,... -
dubbo 参数杂谈
2018-05-29 19:02:09dubbo 可以通过register group 进行zk 空间隔离, register 隔离是物理隔离, 也就是在zk 上的节点隔离, 不在的情况下默认是在dubbo 节点下面。 2). 服务提供和服务发现的 service group 和 reference group 进行... -
DUBBO RPC Service参数校验
2020-10-13 17:59:47接上篇文章... 完善service参数校验 service-api模块依赖 <dependencies> <!-- 参数校验相关依赖 --> <dependency> <groupId>javax.validation</groupId> &l -
Dubbo进阶(十四)- Dubbo中参数回调 Callback 实现深究
2019-09-20 22:18:06Dubbo 中 参数回调Callback 到底有什么用? 到底是个怎么样的代码逻辑? -
Dubbo之参数配置(一)
2019-03-21 09:55:20目录 启动时检查 集群容错 负载均衡 线程模型 直连提供者 服务只订阅,不注册 ... dubbo默认会在启动时检查依赖的服务是否可用,不可用会抛出异常 1)xml中配置 没有提供者时报错 关闭某个服务的启动... -
Dubbo微服务调用时公共参数的传递
2022-04-21 13:38:32但当该A服务调用RPC接口请求B服务时,http请求中的Header并不会随RPC请求带入到B服务中,这时可以通过dubbo的RpcContext进行统一参数传递。 RpcContext本质上是一个ThreadLocal,当接收到RPC请求或发起RPC请求时,... -
Dubbo之参数配置(三)
2019-03-22 15:57:12参数回调 事件通知 本地调用 使用场景 本地调用使用Injvm协议,是一个伪协议,它不开启端口,不发起远程调用,只在JVM内直接关联,但执行Dubbo的Filter链。 配置 //定义injvm协议 <dubbo:protocol name=... -
dubbo--参数回调
2019-07-21 20:54:30在服务调用方,传递服务的实现类。在服务提供方可执行实现类中的方法。目前只支持配置文件的方式。 定义两个接口: public interface CallbackListener { void changed(String msg); } ...public interface ...