精华内容
下载资源
问答
  • AWS Load Balancer Controller是一个控制器,可帮助管理Kubernetes集群的Elastic Load Balancer。 通过配置它可以满足Kubernetes。 它通过配置满足Kubernetes。 该项目以前称为“ AWS ALB Ingress Controller”,...
  • 现在,Load Balancer是一个单独的存储库,而不是praqma / LearnKubernetes内的子目录 该负载平衡器将模仿例如来自的云负载平衡器提供的功能。 Google和AWS以监视apiServer事件的方式,并在需要时自动重新配置。 此...
  • 这个Lua库可以与balancer_by_lua*一起使用。 概要 lua_package_path " /path/to/lua-resty-chash/lib/?.lua;; " ; lua_package_cpath " /path/to/lua-resty-chash/?.so;; " ; init_by_lua_block { local resty_...
  • NULL 博文链接:https://57832638.iteye.com/blog/2003855
  • kubernetes-traefik 使用Traefik作为Network Ingress和LoadBalancer的开源Kubernetes部署。
  • LoadBalancer.js LoadBalancer.js是一个粘性会话TCP负载平衡器,已针对与实时框架(支持HTTP长轮询回退)进行了优化。 它捕获来自指定端口的原始TCP连接,并将其转发到各种目标(定义为主机和端口组合)。 它根据...
  • VMware Advanced Load Balancer(以前称为Avi)Ansible集合 Ansible版本兼容性 该集合已针对以下Ansible版本进行了测试: > = 2.9.10 。 安装及使用 必须安装Ansible pip install ansible 使用ansible-galaxy CLI...
  • 包含一个proxy和多个providers LoadBalancer为kubernetes应用程序提供了外部流量负载平衡。 proxy是一个入口控制器,它监视入口资源以提供允许入站连接访问群集服务的访问。 provider是集群的入口,为与代理(入口...
  • server_loadbalancer-源码

    2021-04-22 17:59:33
    负载平衡器:平衡提供的服务器之间的负载。 用法:./loadbalancer -R r -N n clientport httpserverport1 httpserverport2 ......使用示例:./ loadbalancer 1234 8080 8081 ./ loadbalancer 1234 8080 8081 -R 5 -N 6
  • loadbalancer-源码

    2021-05-22 07:48:36
    Rancher负载均衡器 ...对于问题,评论,更正,建议等,请在打开一个问题,标题以[Load Balancer]开头。 或只是[单击此处](// github.com/rancher/rancher/issues/new?title=[Load Balancer%5D%20)创建新问题。
  • hdfs balancer

    2018-08-16 17:04:07
    hadoop hdfs balancer数据均衡,在集群扩容或数据缺失的情况下,可以重新均衡数据
  • OpenShift跨集群负载均衡器 这是一个tcp负载平衡器,它知道多个OpenShift群集及其导出的路由。 还使用Pod筛选器来确定HA-Proxy在何处运行。 这是我的硕士论文期间创建的原型,旨在证明一种无需中断时间和风险即可...
  • Maglev - A Fast and Reliable Software Network Load Balancer Maglev - A Fast and Reliable Software Network Load Balancer
  • 去运行loadbalancer / loadbalancer.go 现在,当您转到localhost:8080时。 基于启动的服务器,每次刷新页面时,它应该显示一个新端口。 方法 它通过使用轮询方法遍历所有可用服务器,然后将请求路由到适当的...
  • 服务库(例如Task Runner,LoadBalancer,HealthCheck或Retry)支持Go1.9+ 。 1 2 2.1 2.2 2.2.1 2.2.2 1.安装 $ go get -u github.com/xgfone/go-service 2.例子 2.1任务执行器 package main import ( "context...
  • Metallb部署 安装 1.验证需求 描述的要求 2.通过允许以下内容的strictARP编辑kube-proxy apiVersion: kubeproxy.config.k8s.io/v1alpha1 kind: KubeProxyConfiguration mode: "ipvs" ipvs: strictARP: true ...
  • kubernetes-http-loadbalancer 使用 nginx 和 kubernetes 的 HTTP 负载均衡器的完整工作示例。 它用: :基于 nginx + confd 的负载均衡器(在这种情况下,etcd 用作后端)。 : 侦听 kubernetes pod 事件并根据...
  • DPDK技术峰会讲稿分享,DPDK开发者大会讲稿分享,DPDK Accelerated Load Balancer 演讲者Lei CHEN @ IQIYI.com, DPVS: open source HIGH PERFORMANCE l4 Load balancer BASED ON DPDK DPVS是基于DPDK的高性能四层...
  • 用于 Apache Hadoop HDFS 的 DataNode 卷重新平衡工具 该项目旨在填补和系列的空白:当一个硬盘驱动器在 Datanode 上死机并被替换时,没有真正的方法将块从最常用的硬盘移动到新添加的硬盘上——因此是空的。...
  • MongoDB之balancer

    2021-11-26 14:10:04
    前言:使用MongoDB当你对Chunk、Split、Balancer(甚至于jumbo chunk、autosplit)有一定了解后,Mongo对于你就不在是一个整体了,分分合合、动态平衡的视角应该在你的脑海中呈现。 一、balancer(均衡器)简介 1、...

    可以参考分片集群的介绍  这里

    可以参见chunk预分片的介绍  这里

    关于均衡及官网介绍 这里 这里

    前言:使用MongoDB当你对Chunk、Split、Balancer(甚至于jumbo chunk、autosplit)有一定了解后,Mongo对于你就不在是一个整体了,分分合合、动态平衡的视角应该在你的脑海中呈现。

    一、balancer(均衡器)简介

    1、简介

    ①Balancer是一个监视各个shard上的chunk数的后台进程;

    ②他运行在Config Server副本集的primary节点上。

    ③当给定分片上的chunk数达到特定的 migration thresholds(迁移阈值) 时,均衡器会尝试在分片之间自动进行chunk迁移,以使得每个分片上的chunk数“相同”(注:也并非完全的一个不差)。

    ④分片集群的平balance过程对用户和应用程序层都是完全透明的,只不过在执行该过程时可能会对数据库性能产生一些影响。默认情况下balanced进程是一直开启的。

    2、chunk迁移会对数据库性能有负面影响

            chunk迁移会在带宽和工作负载方面会带来一定的开销,这两方面都会影响数据库性能。均衡器通过以下方式将影响降至最低:1)限制分片在任何给定时间最多只有一个迁移;也就是说一个shard不能同时参与多个块迁移。举个例子:为了从一个shard迁移多个块,均衡器会一次迁移一个块进行多次迁移,而不是多个块同时迁移。2)仅当分片集合中块数最多的分片与该集合中块数最少的分片之间的块数差异达到迁移阈值时,才触发平衡操作。

    注:从MongoDB 3.4开始,MongoDB可以执行并行chunk迁移。观察到一个shard一次最多只能参与一次迁移的限制,对于一个有n个shard的shard集群,MongoDB最多可以同时进行n/2(向下舍入)块迁移。

    另外用户可以暂时停用平衡器进行维护。有关详细信息,请参阅禁用平衡器。

    默认情况下balancer始终处于开启状态。

    二、balancer(均衡器)相关指令

    1、balancer基本指令

    (1)查看balance状态(是否开启)

    sh.getBalancerState()

    注:也可以在mongos上执行sh.status()查看balance状态。

    (2)查看balancer是否在工作(是否正在有数据迁移)

    sh.isBalancerRunning()

     (3)开启balancer功能

    sh.setBalancerState(true)

    (4)关闭balancer( 停止balancer)

    sh.stopBalancer()

    注:关闭balancer要注意确保没有任务处于执行状态,如果正在执行块迁移指定关闭命令可能引发数据不一致。如下提供了一个工具:在运行就是输出“waiting...”,此时等待一下;如果直接退出则说明没有chunk在迁移,此时可以关闭balancer。

    while( sh.isBalancerRunning() ) {
              print("waiting...");
              sleep(1000);
    }

    2、balancer支持窗口时间

            均衡器在执行块迁移操作时候会占用实例中节点的资源,对业务多少会有些影响。为了避免块迁移给业务带来影响我们可以设置均衡器的活动窗口,让其在指定的时间段内工作。步骤如下:

    (1)mongos命令行切换至config数据库

    use config

    (2)执行如下命令设置balancer的活动窗口

    db.settings.update(
       { _id: "balancer" },
       { $set: { activeWindow : { start : "<start-time>", stop : "<stop-time>" } } },
       { upsert: true }
    )
    
    
    db.settings.update(
       { _id: "balancer" },
       { $set: { activeWindow : { start : "00:00", stop : "06:00" } } },
       { upsert: true }
    )
    <start-time>:开始时间,时间格式为HH:MM(实例所在地域的当地时间),HH取值范围为00 - 23,MM取值范围为00 - 59。
    <stop-time>:结束时间,时间格式为HH:MM(实例所在地域的当地时间),HH取值范围为00 - 23,MM取值范围为00 - 59。

     MongDB会以config数据库的primary节点的时间为参考,开始执行chunk的迁移。

    (3)可通过sh.status()查看balancer的活动窗口

    (4)移除balancer搬迁时间窗口

    use config
    db.settings.update({ _id : "balancer" }, { $unset : { activeWindow : true } })

    (5)备份和balancer。

             MongoDB中,不要在备份的时候启用balancer,否则,备份的数据将会不一致。通常情况下,备份的时间窗口要和balancer的时间窗口错开,如果balancer没有设置时间窗口,则在备份的时候,关闭balancer。

            要确保时间窗口足够搬迁完所有的数据,否则数据库将一直处于不平衡的状态。

    三、balancer效果验证

    1、关闭balancer查看效果

            当前某集群存在jumbo chunk的问题,另外业务量也相对较大,综合表现为cpu占用过高。

            现在尝试关闭balancer看看cpu是否能降低。另外再尝试关闭 auto split观察效果如何。

    2021年11月26日14:56执行关闭指令,对比30分钟后监控。

    结论:好像有一点点作用,但十分有限;大概就是98%变成96%这个样子。也就是说cpu高并不是因为balancer,从这个角度可以知道balancer为降低对业务的影响确实“很克制”。针对cpu高的问题,可能就是autosplit导致的了。

    2、关闭autosplit查看效果——就是它

    2021年11月26日15:40关闭了autosplit。首先分片0的cpu占用就立马下来了;十分钟后分片1的占用也下来了;再过20分钟后分片2的也降下来了。关于这一点是因为虽然关闭autosplit能立即生效但是对于正在执行的过程需要等执行结束,是完全可以说通的。可以参考  Mongodb预分片(pre-split)和autosplit(chunk相关)_mijichui2153的博客-CSDN博客

    注:测试下来关闭autosplit到具体生效可能确实需要一定时间,0~30分钟量级。

    显然效果很显著,如下。

    分析与结论:

            究竟是正常的的写入操作引发的split是的cpu过高,还是因为jumbo chunk 的原因导致mongodb不断尝试split但是又一直split不了导致的cpu过高呢??为此观察chunk数量。

    ①首先把autosplit打开,马上cpu又飙升上来了;

     ②然后2021年11月28日10:50统计chunk数为6658,如下:

    ③过了三十分钟发现依然是6658,由此可见并没有chunk被真正的split,如下:

    结论:从测试结果来看腾讯云MongoDB针对jumbo chunk跳过split的机制不够彻底,mongos发出split请求后依然还会尝试做不少事情(每次有这个块的数据写入就会触发),以至于占用过高cpu影响数据库性能。也就是说我们要尽量避免jumbo chunk,否则数据库会被无效split拖垮。对于这种情况关闭autosplit貌似还可以接受;但是切记关闭autosplit要非常慎重。

    展开全文
  • 平衡器交换 发展 笔记 这是第一代SOR /前端的Exchange代理。 可以在找到更新的版本。 环境配置 复制.env.example-> .env 配置备份节点网址 # Backup node url REACT_APP_RPC_URL_1=...# Supported Network ID (e.g....
  • 具有企业解决方案的高性能和易于使用的开源负载均衡器。
  • gs-spring-cloud-loadbalancer
  • ribbon-loadbalancer-2.2.5.jar
  • 前端开源库-load-balancer负载平衡器,Nodejs Worker的负载平衡器
  • 码头工人-负载平衡器 自动基于haproxy的docker负载均衡器。 此容器实现容器负载平衡器(在同一主机中运行的容器)。 负载均衡器是自动配置的。 用例 假设您有5个nginx容器,并且想要平衡它们之间的传入流量,则需要...
  • sdn_load_balancer 描述 SDN负载平衡器。 具有6个客户端的示例方案<->交换机(透明代理,负载均衡器)<-> 6个服务器的池。 控制器应用程序(POX,Python)连接到交换机,以便修改流规则并平衡所有服务器之间...
  • 服务负载均衡-Spring Cloud LoadBalancer

    千次阅读 2020-12-20 17:40:33
    使用Spring Cloud LoadBalancer进行客户端负载均衡

    关键词:负载均衡    Spring Cloud LoadBalancer    


    Spring Cloud不仅提供了使用Ribbon进行客户端负载均衡,还提供了Spring Cloud LoadBalancer。相比较于Ribbon,Spring Cloud LoadBalancer不仅能够支持RestTemplate,还支持WebClient。WeClient是Spring Web Flux中提供的功能,可以实现响应式异步请求,因此学习Spring Cloud LoadBalancer之前,建议先了解下Spring Web Flux。


    █ 使用

    公共依赖

    Spring Cloud依赖,版本是Hoxton.RELEASE。

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Hoxton.RELEASE</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    搭建服务注册中心-Eureka

    关于Eureka的使用,请戳《服务注册与发现-Spring Cloud Netflix-Eureka

    搭建服务端

    (1)引入依赖:

    <dependency>
        <!-- 使用web,使用Spring MVC对外提供服务   -->
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <!-- Eureka客户端,用于向Eureka服务端注册服务 -->
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    (2)创建Controller,对外提供访问接口:

    @RestController
    @RequestMapping("/base")
    public class BaseController {
    
        @Value("${server.port}")
        private Integer port;
    
        @GetMapping("/string")
        public String getString() {
            return "my server get string ---->"+port;
        }
    
    }

    (3)编写application.yml配置:

    # 服务端口号
    server:
      port: 8081
    
    # 服务名
    spring:
      application:
        name: myServer
    
    # Eureka
    eureka:
      client:
        # 不从Eureka注册中心获取服务列表
        fetch-registry: false
        # 向Eureka注册中心注册当前服务
        register-with-eureka: true
        # Eureka服务地址
        service-url:
          defaultZone: http://localhost:8899/eureka

    (4)分别以两个不同的端口号启动服务端项目,比如:8081、8082

    搭建客户端

    (1)引入依赖:

    <dependency>
        <!-- Spring Cloud loadbalancer 负载均衡-->
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-loadbalancer</artifactId>
    </dependency>
    <dependency>
        <!-- Eureka客户端,用于向Eureka服务端获取已注册服务列表 -->
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    (2)编写application.yml配置:

    server:
      port: 80
    
    eureka:
      client:
        fetch-registry: true
        register-with-eureka: false
        service-url:
          defaultZone: http://localhost:8899/eureka

    在《服务负载均衡-Spring Cloud Netflix-Ribbon》中,Spring Cloud提供了@LoadBalanced注解配合RestTemplate使用Spring Cloud Ribbon实现了客户端的负载均衡。对于Spring Cloud LoadBalancer也可以使用相同的方式。另外Spring Cloud LoadBalancer也提供了接合Spring Web Flux的负载均衡,如果你客户端使用Spring Web Flux完成服务端的访问,也只需相似的配置就能实现客户端负载均衡。

    如果在项目的类路径下存在Spring Cloud Ribbon相关的类,需要通过配置关闭Ribbon功能,因为Spring Cloud默认优先使用Ribbon:

    spring:
      cloud:
        loadbalancer:
          ribbon:
            enabled: false

    (3)客户端调用服务端接口

    客户端可以使用两种方式访问服务端接口,一种是使用RestTemplate,一种是WebClient。

    • RestTemplate

    ①引入依赖:

    <dependency>
        <!-- 使用web,使用Spring MVC对外提供服务   -->
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    ②编写配置,创建RestTemplate类型的Bean:

    需要在方法上标注@LoadBalanced注解。

    @Configuration
    public class LoadbalanceConfiguration {
    
        @Bean
        @LoadBalanced
        public RestTemplate restTemplate() {
            return new RestTemplate();
        }
    
    }

    ③编写Controller,客户端实现访问服务端资源,并对外提供访问接口:

    通过@Autowired,会注入第①个步骤中创建的RestTemplate实例。

    @RestController
    @RequestMapping("/rt/client")
    public class RestClient {
    
        // myServer是服务端的服务名,spring.application.name的配置
        private static final String BASE_URL = "http://myServer";
    
        @Autowired
        private RestTemplate restTemplate;
    
        @GetMapping("/getString")
        public String getString() {
            // 调用服务端提供的接口
            return restTemplate.getForObject(BASE_URL+"/base/string", String.class);
        }
    
    }

    ④启动服务端,调用接口:localhost:80/rt/client/getString

    访问客户端接口,客户端通过负载均衡,选择一个可用的服务端接口调用。

    • WebClient

    WebClient是Spring Web Flux中提供的类,通过使用WebClient可以通过响应式编程的方式异步访问服务端接口。

    WebClient.Bulider是WebClient的内部类,也是编程的入口。

    ①引入依赖:

    <dependency>
        <!-- Spring Webflux响应式web编程-->
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    ②编写配置,创建WebClient.Bulider类型的Bean:

    同样需要在方法上标注@LoadBalanced注解。

    @Configuration
    public class LoadbalanceConfiguration {
    
        @Bean
        @LoadBalanced
        public WebClient.Builder builder() {
            return WebClient.builder();
        }
        
    }

    ③编写Controller,客户端实现访问服务端资源,并对外提供访问接口:

    @RestController
    @RequestMapping("/rx/client")
    public class ReactiveClient {
    
        private static final String BASE_URL = "http://myServer";
    
        @Autowired
        private WebClient.Builder clientBuilder;
    
        @GetMapping("/getString")
        public Mono<String> getServerString() {
            return clientBuilder.baseUrl(BASE_URL).build().get().uri("/base/string").retrieve().bodyToMono(String.class);
        }
    
    }

    █ 原理-RestTemplate

    分别从RestTemplate与WebClient查看实现负载均衡的原理,两者的实现原理思想相同,都是通过对客户端工具类添加相应的拦截器,在拦截器中完成负载均衡的。

    和使用Spring Cloud Ribbon进行负载均衡一样,Spring Cloud LoadBalancer也是通过对RestTemplate添加拦截器实现的。RestTemplate提供了一个方法setInterceptors,用于设置拦截器,拦截器需要实现ClientHttpRequestInterceptor接口即可,在实际远程去请求服务端接口之前会先调用拦截器的intercept方法逻辑。这里的拦截器可以理解成相当于Servlet技术中的Filter功能。

    // RestTemplate#setInterceptors
    public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
       // Take getInterceptors() List as-is when passed in here
       if (this.interceptors != interceptors) {
          this.interceptors.clear();
          this.interceptors.addAll(interceptors);
          AnnotationAwareOrderComparator.sort(this.interceptors);
       }
    }

    (1)LoadBalancerInterceptor

    LoadBalancerInterceptor实现了ClientHttpRequestInterceptor接口,实现intercept方法,用于实现负载均衡的拦截处理:

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
       // 负载均衡客户端
       private LoadBalancerClient loadBalancer;
    
       private LoadBalancerRequestFactory requestFactory;
    
       public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
             LoadBalancerRequestFactory requestFactory) {
          this.loadBalancer = loadBalancer;
          this.requestFactory = requestFactory;
       }
    
       public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
          // for backwards compatibility
          this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
       }
    
       @Override
       public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
             final ClientHttpRequestExecution execution) throws IOException {
          final URI originalUri = request.getURI();
          String serviceName = originalUri.getHost();
          Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
          return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
       }
    
    }

    (2)LoadBalancerClient

    负载均衡客户端,用于进行负载均衡逻辑,从服务列表中选择出一个服务地址进行调用。对于Spring Cloud Ribbon,在LoadBalancerInterceptor中持有的LoadBalancerClient实现对象是RibbonLoadBalancerClient。在Spring Cloud LoadBalancer中,类型则是BlockingLoadBalancerClient。调用BlockingLoadBalancerClient的execute方法:

    public class BlockingLoadBalancerClient implements LoadBalancerClient {
       
       // 通过工厂类,获取具体的负载均衡器
       private final LoadBalancerClientFactory loadBalancerClientFactory;
    
       @Override
       public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
             throws IOException {
          ServiceInstance serviceInstance = choose(serviceId);
          if (serviceInstance == null) {
             throw new IllegalStateException("No instances available for " + serviceId);
          }
          return execute(serviceId, serviceInstance, request);
       }
       ......
    }

    (3)LoadBalancerClientFactory

    BlockingLoadBalancerClient中持有的LoadBalancerClientFactory,通过调用其getInstance方法获取具体的负载均衡器。负载均衡器实现了不同的负载均衡算法,比如轮询、随机等。在Spring Cloud Ribbon中,RibbonLoadBalancerClient中持有的是SpringClientFactory。LoadBalancerClientFactory与SpringClientFactory共同继承了抽象类NamedContextFactory。都是根据请求的url中服务名创建一个ApplicationContext,将其作为Spring ApplicationConetxt的子容器:

    public class LoadBalancerClientFactory
          extends NamedContextFactory<LoadBalancerClientSpecification>
          implements ReactiveLoadBalancer.Factory<ServiceInstance> {
    
       /**
        * Property source name for load balancer.
        */
       public static final String NAMESPACE = "loadbalancer";
    
       /**
        * Property for client name within the load balancer namespace.
        */
       public static final String PROPERTY_NAME = NAMESPACE + ".client.name";
    
       public LoadBalancerClientFactory() {
          super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME);
       }
    
       public String getName(Environment environment) {
          return environment.getProperty(PROPERTY_NAME);
       }
    
       @Override
       public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
          return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
       }
    
    }

    (4)ReactiveLoadBalancer

    负载均衡器,实现选择服务进行调用。

    public interface ReactiveLoadBalancer<T> {
    
       Request REQUEST = new DefaultRequest();
    
       Publisher<Response<T>> choose(Request request);
    
       default Publisher<Response<T>> choose() { // conflicting name
          return choose(REQUEST);
       }
    
       @FunctionalInterface
       interface Factory<T> {
    
          ReactiveLoadBalancer<T> getInstance(String serviceId);
    
       }
    
    }

    Spring Cloud LoadBalancer提供了ReactiveLoadBalancer子接口ReactorLoadBalancer,ReactorLoadBalancer的子接口ReactorServiceInstanceLoadBalancer。提供了一个默认的实现类RoundRobinLoadBalancer,实现了轮询负载均衡算法。可见Spring Cloud LoadBalancer在Hoxton.RELEASE版本对WebClient默认只提供了一种负载均衡算法。

    (5)LoadBalancerRequestFactory

    LoadBalancerRequest工厂类,用于创建LoadBalancerRequest,调用createRequest方法。在内部持有LoadBalancerClient属性对象,在Spring Cloud Ribbon中是RibbonLoadBalancerClient,在Spring Cloud LoadBalancer中是BlockingLoadBalancerClient

    public class LoadBalancerRequestFactory {
       // 负载均衡客户端
       private LoadBalancerClient loadBalancer;
    
       private List<LoadBalancerRequestTransformer> transformers;
       ......
       public LoadBalancerRequest<ClientHttpResponse> createRequest(
             final HttpRequest request, final byte[] body,
             final ClientHttpRequestExecution execution) {
          // 返回LoadBalancerRequest,这里的书写方式是使用了lamda表达式,创建匿名内部类。       
          return instance -> {
             HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
                   this.loadBalancer);
             if (this.transformers != null) {
                for (LoadBalancerRequestTransformer transformer : this.transformers) {
                   serviceRequest = transformer.transformRequest(serviceRequest,
                         instance);
                }
             }
             return execution.execute(serviceRequest, body);
          };
       }
    
    }

    (6)LoadBalancerClientConfiguration

    主要就是创建负载均衡器ReactorLoadBalancer,默认的全局配置类。会在根据接口地址url中服务名创建ApplicationContext时完成加载配置。

    @Bean
    @ConditionalOnMissingBean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
          Environment environment,
          LoadBalancerClientFactory loadBalancerClientFactory) {
       String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
       return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name,
             ServiceInstanceListSupplier.class), name);
    }

    自动配置:

    (1)在org.springframework.cloud:spring-cloud-loadbalancer的jar包下META-INF文件夹的spring.factories中:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration,\
    org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration

    LoadBalancerAutoConfiguration

    无论是Spring Cloud Ribbon,还是Spring Cloud LoadBalancer for RestTemplate,还是Spring Cloud LoadBalancer for WebClient,这个配置类都是公用的。

    作用是创建LoadBalancerClientFactory的Bean。其中的属性configurations会根据注解@LoadBalancerClient或@LoadBalancerClients的配置值创建LoadBalancerClientSpecification对象,并通过LoadBalancerAutoConfiguration的构造器注入,最后传递给LoadBalancerClientFactory:

    @Configuration(proxyBeanMethods = false)
    @LoadBalancerClients
    @AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class,
          LoadBalancerBeanPostProcessorAutoConfiguration.class,
          ReactiveLoadBalancerAutoConfiguration.class })
    public class LoadBalancerAutoConfiguration {
    
       private final ObjectProvider<List<LoadBalancerClientSpecification>> configurations;
    
       public LoadBalancerAutoConfiguration(
             ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
          this.configurations = configurations;
       }
    
       @Bean
       public LoadBalancerClientFactory loadBalancerClientFactory() {
          LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory();
          clientFactory.setConfigurations(
                this.configurations.getIfAvailable(Collections::emptyList));
          return clientFactory;
       }
    
    }

    BlockingLoadBalancerClientAutoConfiguration

    主要职责就是创建BlockingLoadBalancerClient,前提是存在RestTemplate,并且spring.cloud.loadbalancer.ribbon.enabled=false以及不存在类org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient,也就是没有引入Spring Cloud Ribbon相关依赖。从这里可见,如果同时存在Spring Cloud Ribbon和Spring Cloud LoadBalancer的相关类,会优先使用Ribbon做负载均衡。

    @Configuration(proxyBeanMethods = false)
    @LoadBalancerClients
    @AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
    @AutoConfigureBefore({
          org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.class,
          AsyncLoadBalancerAutoConfiguration.class })
    public class BlockingLoadBalancerClientAutoConfiguration {
        ......
        @Configuration(proxyBeanMethods = false)
       @ConditionalOnClass(RestTemplate.class)
       @Conditional(OnNoRibbonDefaultCondition.class)
       protected static class BlockingLoadbalancerClientConfig {
    
           @Bean
           @ConditionalOnBean(LoadBalancerClientFactory.class)
           @Primary
           public BlockingLoadBalancerClient blockingLoadBalancerClient(
                LoadBalancerClientFactory loadBalancerClientFactory) {
              return new BlockingLoadBalancerClient(loadBalancerClientFactory);
           }
    
       }
       ......
    }

    (2)在org.springframework.cloud:spring-cloud-commons的jar包下META-INF文件夹的spring.factories中:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration

    LoadBalancerAutoConfiguration

    LoadBalancerAutoConfiguration用于对RestTemplate设置拦截器。Spring Cloud Ribbon同样也是使用这个配置类完成同样的功能的。

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
       
       // 通过@LoadBalanced与@Autowired,会自动注入被@LoadBalanced注解标注的RestTemplate对象
       @LoadBalanced
       @Autowired(required = false)
       private List<RestTemplate> restTemplates = Collections.emptyList();
    
       @Autowired(required = false)
       private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
    
       // Spring在初始化bean的时候,会调用SmartInitializingSingleton的afterSingletonsInstantiated方法
       @Bean
       public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
             final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
          // 实现afterSingletonsInstantiated方法:       
          return () -> restTemplateCustomizers.ifAvailable(customizers -> {
             for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                   // 调用customize方法 
                   customizer.customize(restTemplate);
                }
             }
          });
       }
    
       @Bean
       @ConditionalOnMissingBean
       public LoadBalancerRequestFactory loadBalancerRequestFactory(
             LoadBalancerClient loadBalancerClient) {
          return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
       }
    
       @Configuration(proxyBeanMethods = false)
       @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
       static class LoadBalancerInterceptorConfig {
          
          // 构建LoadBalancerInterceptor的bean,通过参数自动注入当前Spring ApplicationContext中
          // 具有的LoadBalancerClient与LoadBalancerRequestFactory类型的bean
          @Bean
          public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
             return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
          }
    
          @Bean
          @ConditionalOnMissingBean
          public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
             return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                      restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                // 通过setInterceptors方法设置拦截器
                restTemplate.setInterceptors(list);
             };
          }
    
       }
    }   

    █ 原理-WebClient

    与Spring Cloud Ribbon不同的是,Spring Cloud LoadBalancer还支持响应式客户端的负载均衡,即Spring Cloud LoadBalancer结合Spring Web Flux实现客户端负载均衡调用。

    首先来看看如何使用WebClient完成对服务端接口的调用:

    Mono<String> mono = WebClient.builder().baseUrl("localhost:8080/").build().get().uri("/base/string").retrieve().bodyToMono(String.class);

    代码看上去是不是一气呵成,流式编程便是响应式编程的一种代码风格。通过上面的代码只是完成了请求内容的构建,实际上并没有发起接口的请求,通过调用Mono的subscribe方法触发异步请求。

    // 通过WebClient获取Builder对象
    WebClient.builder().
        // 服务器地址
        baseUrl("localhost:8080/").build().
        // get请求
        get().
        // 接口地址
        uri("/base/string").retrieve().
        // 接口响应的内容是String类型的 
        bodyToMono(String.class);

    Spring Cloud LoadBalancer实现对WebClient的负载均衡,也是通过设置拦截器实现的。通过WebClient.Builder的filter设置拦截器:

    public Builder filter(ExchangeFilterFunction filter) {
        Assert.notNull(filter, "ExchangeFilterFunction must not be null");
        this.initFilters().add(filter);
        return this;
    }

    (1)ReactorLoadBalancerExchangeFilterFunction

    ReactorLoadBalancerExchangeFilterFunction实现了ExchangeFilterFunction接口,通过实现filter方法完成负载均衡的功能:

    public class ReactorLoadBalancerExchangeFilterFunction implements ExchangeFilterFunction {
    
       private static final Log LOG = LogFactory
             .getLog(ReactorLoadBalancerExchangeFilterFunction.class);
    
       private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory;
    
       public ReactorLoadBalancerExchangeFilterFunction(
             ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
          this.loadBalancerFactory = loadBalancerFactory;
       }
    
       @Override
       public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
          URI originalUrl = request.url();
          String serviceId = originalUrl.getHost();
          if (serviceId == null) {
             String message = String.format(
                   "Request URI does not contain a valid hostname: %s",
                   originalUrl.toString());
             if (LOG.isWarnEnabled()) {
                LOG.warn(message);
             }
             return Mono.just(
                   ClientResponse.create(HttpStatus.BAD_REQUEST).body(message).build());
          }
          return choose(serviceId).flatMap(response -> {
             ServiceInstance instance = response.getServer();
             if (instance == null) {
                String message = serviceInstanceUnavailableMessage(serviceId);
                if (LOG.isWarnEnabled()) {
                   LOG.warn(message);
                }
                return Mono.just(ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE)
                      .body(serviceInstanceUnavailableMessage(serviceId)).build());
             }
    
             if (LOG.isDebugEnabled()) {
                LOG.debug(String.format(
                      "Load balancer has retrieved the instance for service %s: %s",
                      serviceId, instance.getUri()));
             }
             ClientRequest newRequest = buildClientRequest(request,
                   LoadBalancerUriTools.reconstructURI(instance, originalUrl));
             return next.exchange(newRequest);
          });
       }
    
       private Mono<Response<ServiceInstance>> choose(String serviceId) {
          ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerFactory
                .getInstance(serviceId);
          if (loadBalancer == null) {
             return Mono.just(new EmptyResponse());
          }
          return Mono.from(loadBalancer.choose());
       }
    
       private String serviceInstanceUnavailableMessage(String serviceId) {
          return "Load balancer does not contain an instance for the service " + serviceId;
       }
    
       private ClientRequest buildClientRequest(ClientRequest request, URI uri) {
          return ClientRequest.create(request.method(), uri)
                .headers(headers -> headers.addAll(request.headers()))
                .cookies(cookies -> cookies.addAll(request.cookies()))
                .attributes(attributes -> attributes.putAll(request.attributes()))
                .body(request.body()).build();
       }
    
    }

    (2)ReactiveLoadBalancer.Factory

    ReactiveLoadBalancer.Factory是ReactiveLoadBalancer的内部工厂类,用于获取ReactiveLoadBalancer

    @FunctionalInterface
    interface Factory<T> {
    
       ReactiveLoadBalancer<T> getInstance(String serviceId);
    
    }

    (3)ReactiveLoadBalancer

    与RestTemplate中一样。

    自动配置:

    在org.springframework.cloud:spring-cloud-commons的jar包下META-INF文件夹的spring.factories中:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerBeanPostProcessorAutoConfiguration,\
    org.springframework.cloud.client.loadbalancer.reactive.ReactorLoadBalancerClientAutoConfiguration

    (1)ReactorLoadBalancerClientAutoConfiguration

    创建ReactorLoadBalancerExchangeFilterFunction对象bean。

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(WebClient.class)
    @ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
    public class ReactorLoadBalancerClientAutoConfiguration {
        ......
        @Configuration(proxyBeanMethods = false)
        @Conditional(OnNoRibbonDefaultCondition.class)
        protected static class ReactorLoadBalancerExchangeFilterFunctionConfig {
    
           @Bean
           public ReactorLoadBalancerExchangeFilterFunction loadBalancerExchangeFilterFunction(
                 ReactiveLoadBalancer.Factory loadBalancerFactory) {
              return new ReactorLoadBalancerExchangeFilterFunction(loadBalancerFactory);
           }
    
        }
    }

    (2)LoadBalancerBeanPostProcessorAutoConfiguration

    将创建的ReactorLoadBalancerExchangeFilterFunction拦截器设置到WebClient.Builder对象中。

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(WebClient.class)
    @Conditional(LoadBalancerBeanPostProcessorAutoConfiguration.OnAnyLoadBalancerImplementationPresentCondition.class)
    public class LoadBalancerBeanPostProcessorAutoConfiguration {
       
       // 根据下面的方法创建的DeferringLoadBalancerExchangeFilterFunction来创建LoadBalancerWebClientBuilderBeanPostProcessor
       @Bean
       public LoadBalancerWebClientBuilderBeanPostProcessor loadBalancerWebClientBuilderBeanPostProcessor(
             DeferringLoadBalancerExchangeFilterFunction deferringExchangeFilterFunction,
             ApplicationContext context) {
          return new LoadBalancerWebClientBuilderBeanPostProcessor(
                deferringExchangeFilterFunction, context);
       }
    
       @Configuration(proxyBeanMethods = false)
       @Conditional(ReactorLoadBalancerClientAutoConfiguration.OnNoRibbonDefaultCondition.class)
       @ConditionalOnBean(ReactiveLoadBalancer.Factory.class)
       protected static class ReactorDeferringLoadBalancerFilterConfig {
          
          // 根据容器中的ReactorLoadBalancerExchangeFilterFunction,创建DeferringLoadBalancerExchangeFilterFunction
          @Bean
          @Primary
          DeferringLoadBalancerExchangeFilterFunction<ReactorLoadBalancerExchangeFilterFunction> reactorDeferringLoadBalancerExchangeFilterFunction(
                ObjectProvider<ReactorLoadBalancerExchangeFilterFunction> exchangeFilterFunctionProvider) {
             return new DeferringLoadBalancerExchangeFilterFunction<>(
                   exchangeFilterFunctionProvider);
          }
    
       }
    }   

    LoadBalancerWebClientBuilderBeanPostProcessor

    后置处理器,执行拦截器的设置。

    public class LoadBalancerWebClientBuilderBeanPostProcessor implements BeanPostProcessor {
       ......
       @Override
       public Object postProcessBeforeInitialization(Object bean, String beanName)
             throws BeansException {
          if (bean instanceof WebClient.Builder) {
             if (context.findAnnotationOnBean(beanName, LoadBalanced.class) == null) {
                return bean;
             }
             // 调用filter方法设置
             ((WebClient.Builder) bean).filter(exchangeFilterFunction);
          }
          return bean;
       }
    
    }

    █ 自定义配置

    自定义负载均衡器,实现自定义的负载均衡算法。通过注解@LoadBalancerClient@LoadBalancerClients。在Spring Cloud Ribbon中是通过注解@RibbonClient@RibbonClients完成的。在《服务负载均衡-Spring Cloud Netflix-Ribbon》提到,若自定义的配置时放在@ComponentScan的扫描范围内的话,该配置类会被设置成全局配置,因为会被放进Spring ApplicaionContext中。若不在@ComponentScan的扫描范围内,会被放进根据请求的url中服务名创建的对应的ApplicationContext中,只对当前服务名有效,此时@LoadBalancerClient中的name或value属性必须与请求的url中的服务名相同,否则找不到对应的上下文配置。

    自定义负载均衡器:

    自定义负载均衡器,不能通过只实现ReactiveLoadBalancer接口,需要去实现它的子接口ReactorServiceInstanceLoadBalancer,因为去获取负载均衡器实例的时候,是通过去容器中查找ReactorServiceInstanceLoadBalancer类型的bean来实现的:

    // LoadBalancerClientFactory#ReactiveLoadBalancer
    @Override
    public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
       return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
    }

    给MyServer服务定制化负载均衡器:

    (1)新建负载均衡器,简单实现了随机算法:

    public class CustomRandomLoadBalancerClient implements ReactorServiceInstanceLoadBalancer {
    
        // 服务列表
        private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    
        public CustomRandomLoadBalancerClient(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
            this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        }
    
        @Override
        public Mono<Response<ServiceInstance>> choose(Request request) {
            ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable();
            return supplier.get().next().map(this::getInstanceResponse);
        }
    
        /**
         * 使用随机数获取服务
         * @param instances
         * @return
         */
        private Response<ServiceInstance> getInstanceResponse(
                List<ServiceInstance> instances) {
            System.out.println("进来了");
            if (instances.isEmpty()) {
                return new EmptyResponse();
            }
    
            System.out.println("进行随机选取服务");
            // 随机算法
            int size = instances.size();
            Random random = new Random();
            ServiceInstance instance = instances.get(random.nextInt(size));
    
            return new DefaultResponse(instance);
        }
    
    }

    (2)创建配置类:

    @Configuration
    public class CustomLoadBalancerClientConfiguration {
    
        // 参数 serviceInstanceListSupplierProvider 会自动注入
        @Bean
        public ReactorServiceInstanceLoadBalancer customLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
            return new CustomRandomLoadBalancerClient(serviceInstanceListSupplierProvider);
        }
    
    }

    (3)在项目启动类上添加@LoadBalancerClient注解:

    name值一定要使用服务端配置的服务名(spring.application.name),通过configuration指定自定义的配置

    @SpringBootApplication
    @LoadBalancerClient(name = "myServer", configuration = CustomLoadBalancerClientConfiguration.class)
    public class BlockClientApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(BlockClientApplication.class, args);
        }
    
    }

    Spring Cloud LoadBalancer与Spring Cloud Ribbon通过RestTemplate做负载均衡的比较:

    (1)都是使用LoadBalancerInterceptor作为RestTemplate的拦截器。

    (2)在LoadBalancerInterceptor中持有LoadBalancerClient对象,在Spring Cloud LoadBalancer中是BlockingLoadBalancerClient,在Spring Cloud Ribbon中是RibbonLoadBalancerClient。

    (3)LoadBalancerClient中持有NamedContextFactory对象,在Spring Cloud LoadBalancer中是LoadBalancerClientFactory,在Spring Cloud Ribbon中是SpringClientFactory。

    (4)Spring Cloud LoadBalancer通过实现ReactorServiceInstanceLoadBalancer接口自定义负载均衡器,Spring Cloud Ribbon通过实现ILoadBalancer接口。

    (5)Spring Cloud LoadBalancer通过注解@LoadBalancerClient或@LoadBalancerClients实现自定义配置,Spring Cloud Ribbon也可以使用这两个注解,另外还可以使用@RibbonClient或@RibbonClients。

    (6)Spring Cloud LoadBalancer支持响应式编程负载均衡,即结合Spring Web Flux使用,Spring Cloud Ribbon是不支持的。

    展开全文
  • spring-cloud-loadbalancer BlockingLoadBalancerClient public class BlockingLoadBalancerClient implements LoadBalancerClient { //负载均衡器工厂 private final LoadBalancerClientFactory ...

    spring-cloud-loadbalancer

    BlockingLoadBalancerClient

    public class BlockingLoadBalancerClient implements LoadBalancerClient {
        //负载均衡器工厂
        private final LoadBalancerClientFactory loadBalancerClientFactory;
     
        public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory) {
            this.loadBalancerClientFactory = loadBalancerClientFactory;
        }
     
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            //根据servicerId选择服务后端服务节点
            ServiceInstance serviceInstance = this.choose(serviceId);
            if (serviceInstance == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            } else {
                return this.execute(serviceId, serviceInstance, request);
            }
        }
     
        public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            try {
                //调用LoadBalancerRequest的apply方法,在该方法里进行远程请求调用
                return request.apply(serviceInstance);
            } catch (IOException var5) {
                throw var5;
            } catch (Exception var6) {
                ReflectionUtils.rethrowRuntimeException(var6);
                return null;
            }
        }
        //重构URI
        public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
            return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
        }
     
        //根据serviceId选择后端服务节点
        public ServiceInstance choose(String serviceId) {
            //根据serviceId获取对应的负载均衡器
            ReactiveLoadBalancer<ServiceInstance> loadBalancer = this.loadBalancerClientFactory.getInstance(serviceId);
            if (loadBalancer == null) {
                return null;
            } else {
                //负载据衡器进行choose,选择一个ServicerInstance并返回
                Response<ServiceInstance> loadBalancerResponse = (Response)Mono.from(loadBalancer.choose()).block();
                return loadBalancerResponse == null ? null : (ServiceInstance)loadBalancerResponse.getServer();
            }
        }
    }
    

    LoadBalancerClientFactory

    public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification> implements Factory<ServiceInstance> {
        public static final String NAMESPACE = "loadbalancer";
        public static final String PROPERTY_NAME = "loadbalancer.client.name";
    
        public LoadBalancerClientFactory() {
            super(LoadBalancerClientConfiguration.class, "loadbalancer", "loadbalancer.client.name");
        }
    
        public String getName(Environment environment) {
            return environment.getProperty("loadbalancer.client.name");
        }
        //根据ServiceId获取对应的LoadBalancer
        public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
            return (ReactiveLoadBalancer)this.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
        }
    }
    

    ReactorLoadBalancer

    public interface ReactorLoadBalancer<T> extends ReactiveLoadBalancer<T> {
        Mono<Response<T>> choose(Request request);
    
        default Mono<Response<T>> choose() {
            return this.choose(REQUEST);
        }
    }
    

    RoundRobinLoadBalancer

    public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
        private static final Log log = LogFactory.getLog(RoundRobinLoadBalancer.class);
        private final AtomicInteger position;
        private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
        private final String serviceId;
    
    
        public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
            this(serviceInstanceListSupplierProvider, serviceId, (new Random()).nextInt(1000));
        }
    
        public RoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {
            this.serviceId = serviceId;
            this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
            this.position = new AtomicInteger(seedPosition);
        }
    
        public Mono<Response<ServiceInstance>> choose(Request request) {
            if (this.serviceInstanceListSupplierProvider != null) {
                ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
                return ((Flux)supplier.get()).next().map(this::getInstanceResponse);
            } else {
                ServiceInstanceSupplier supplier = (ServiceInstanceSupplier)this.serviceInstanceSupplier.getIfAvailable(NoopServiceInstanceSupplier::new);
                return ((Flux)supplier.get()).collectList().map(this::getInstanceResponse);
            }
        }
    
        private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
            if (instances.isEmpty()) {
                log.warn("No servers available for service: " + this.serviceId);
                return new EmptyResponse();
            } else {
                int pos = Math.abs(this.position.incrementAndGet());
                ServiceInstance instance = (ServiceInstance)instances.get(pos % instances.size());
                return new DefaultResponse(instance);
            }
        }
    }
    
    

    ServiceInstanceListSupplier

    DiscoveryClientServiceInstanceListSupplier

    public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
        public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";
        private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
        private Duration timeout = Duration.ofSeconds(30L);
        private final String serviceId;
        private final Flux<List<ServiceInstance>> serviceInstances;
    
        public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
            this.serviceId = environment.getProperty("loadbalancer.client.name");
            this.resolveTimeout(environment);
            this.serviceInstances = Flux.defer(() -> {
                //通过DiscoveryClient从注册中心获取服务实例
                return Flux.just(delegate.getInstances(this.serviceId));
            }).subscribeOn(Schedulers.boundedElastic()).timeout(this.timeout, Flux.defer(() -> {
                this.logTimeout();
                return Flux.just(new ArrayList());
            })).onErrorResume((error) -> {
                this.logException(error);
                return Flux.just(new ArrayList());
            });
        }
    
        public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
            this.serviceId = environment.getProperty("loadbalancer.client.name");
            this.resolveTimeout(environment);
            this.serviceInstances = Flux.defer(() -> {
                //通过DiscoveryClient从注册中心获取服务实例
                return delegate.getInstances(this.serviceId).collectList().flux().timeout(this.timeout, Flux.defer(() -> {
                    this.logTimeout();
                    return Flux.just(new ArrayList());
                })).onErrorResume((error) -> {
                    this.logException(error);
                    return Flux.just(new ArrayList());
                });
            });
        }
    
        public Flux<List<ServiceInstance>> get() {
            return this.serviceInstances;
        }
    
    }
    

    DelegatingServiceInstanceListSupplier

    代理模式,委托内部的ServiceInstanceListSupplier提供服务

    public abstract class DelegatingServiceInstanceListSupplier implements ServiceInstanceListSupplier, InitializingBean, DisposableBean {
        protected final ServiceInstanceListSupplier delegate;
    
        public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {
            Assert.notNull(delegate, "delegate may not be null");
            this.delegate = delegate;
        }
    
        public ServiceInstanceListSupplier getDelegate() {
            return this.delegate;
        }
    }
    

    HealthCheckServiceInstanceListSupplier

    带有健康检查功能的服务实例提供者,对代理的服务提供者进行服务健康检查

    public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier implements InitializingBean, DisposableBean {
        private static final Log LOG = LogFactory.getLog(HealthCheckServiceInstanceListSupplier.class);
        private final HealthCheck healthCheck;
        private final WebClient webClient;
        private final String defaultHealthCheckPath;
        private final Flux<List<ServiceInstance>> aliveInstancesReplay;
        private Disposable healthCheckDisposable;
    
        public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, HealthCheck healthCheck, WebClient webClient) {
            super(delegate);
            this.healthCheck = healthCheck;
            //默认的健康检查路径
            this.defaultHealthCheckPath = (String)healthCheck.getPath().getOrDefault("default", "/actuator/health");
            this.webClient = webClient;
            this.aliveInstancesReplay = Flux.defer(delegate).delaySubscription(Duration.ofMillis((long)healthCheck.getInitialDelay())).switchMap((serviceInstances) -> {
                return this.healthCheckFlux(serviceInstances).map((alive) -> {
                    return Collections.unmodifiableList(new ArrayList(alive));
                });
            }).replay(1).refCount(1);
        }
    
    
        protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
            return Flux.defer(() -> {
                List<Mono<ServiceInstance>> checks = new ArrayList(instances.size());
                Iterator var3 = instances.iterator();
    
                while(var3.hasNext()) {
                    ServiceInstance instance = (ServiceInstance)var3.next();
                    Mono<ServiceInstance> alive = this.isAlive(instance).onErrorResume((error) -> {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(String.format("Exception occurred during health check of the instance for service %s: %s", instance.getServiceId(), instance.getUri()), error);
                        }
    
                        return Mono.empty();
                    }).timeout(this.healthCheck.getInterval(), Mono.defer(() -> {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(String.format("The instance for service %s: %s did not respond for %s during health check", instance.getServiceId(), instance.getUri(), this.healthCheck.getInterval()));
                        }
    
                        return Mono.empty();
                    })).handle((isHealthy, sink) -> {
                        if (isHealthy) {
                            sink.next(instance);
                        }
    
                    });
                    checks.add(alive);
                }
    
                List<ServiceInstance> result = new ArrayList();
                return Flux.merge(checks).map((alivex) -> {
                    result.add(alivex);
                    return result;
                }).defaultIfEmpty(result);
            }).repeatWhen((restart) -> {
                return restart.delayElements(this.healthCheck.getInterval());
            });
        }
    
        public Flux<List<ServiceInstance>> get() {
            return this.aliveInstancesReplay;
        }
    
        protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
            String healthCheckPropertyValue = (String)this.healthCheck.getPath().get(serviceInstance.getServiceId());
            String healthCheckPath = healthCheckPropertyValue != null ? healthCheckPropertyValue : this.defaultHealthCheckPath;
            return this.webClient.get().uri(UriComponentsBuilder.fromUri(serviceInstance.getUri()).path(healthCheckPath).build().toUri()).exchange().flatMap((clientResponse) -> {
                return clientResponse.releaseBody().thenReturn(HttpStatus.OK.value() == clientResponse.rawStatusCode());
            });
        }
    }
    

    在这里插入图片描述

    LoadBalancerClientConfiguration

    当从服务注册中心获取服务时的配置

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @ConditionalOnProperty(
        value = {"spring.cloud.discovery.enabled"},
        matchIfMissing = true
    )
    public @interface ConditionalOnDiscoveryEnabled {
    }
    
    @Configuration(
        proxyBeanMethods = false
    )
    //服务发现条件注解
    @ConditionalOnDiscoveryEnabled
    public class LoadBalancerClientConfiguration {
        private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
    
        public LoadBalancerClientConfiguration() {
        }
        //new LoadBalancer,默认是RoundRobinLoadBalancer
        @Bean
        @ConditionalOnMissingBean
        public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
            String name = environment.getProperty("loadbalancer.client.name");
            return new RoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
        }
    
        @Configuration(
            proxyBeanMethods = false
        )
        //阻塞式服务发现条件注解
        @ConditionalOnBlockingDiscoveryEnabled
        @Order(193827466)
        public static class BlockingSupportConfiguration {
            public BlockingSupportConfiguration() {
            }
    
            @Bean
            @ConditionalOnBean({DiscoveryClient.class})
            @ConditionalOnMissingBean
            @ConditionalOnProperty(
                value = {"spring.cloud.loadbalancer.configurations"},
                havingValue = "default",
                matchIfMissing = true
            )
            public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {
                return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
            }
    
            //
            @Bean
            @ConditionalOnBean({DiscoveryClient.class})
            @ConditionalOnMissingBean
            @ConditionalOnProperty(
                value = {"spring.cloud.loadbalancer.configurations"},
                havingValue = "zone-preference"
            )
            public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {
                return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withZonePreference().withCaching().build(context);
            }
    
            @Bean
            @ConditionalOnBean({DiscoveryClient.class})
            @ConditionalOnMissingBean
            @ConditionalOnProperty(
                value = {"spring.cloud.loadbalancer.configurations"},
                havingValue = "health-check"
            )
            public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {
                return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withHealthChecks().withCaching().build(context);
            }
    
            @Bean
            @ConditionalOnBean({DiscoveryClient.class})
            @ConditionalOnMissingBean
            public ServiceInstanceSupplier discoveryClientServiceInstanceSupplier(DiscoveryClient discoveryClient, Environment env, ApplicationContext context) {
                DiscoveryClientServiceInstanceSupplier delegate = new DiscoveryClientServiceInstanceSupplier(discoveryClient, env);
                ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context.getBeanProvider(LoadBalancerCacheManager.class);
                return (ServiceInstanceSupplier)(cacheManagerProvider.getIfAvailable() != null ? new CachingServiceInstanceSupplier(delegate, (CacheManager)cacheManagerProvider.getIfAvailable()) : delegate);
            }
        }
    
        @Configuration(
            proxyBeanMethods = false
        )
        //启动服务发现条件注解
        @ConditionalOnReactiveDiscoveryEnabled
        @Order(193827465)
        public static class ReactiveSupportConfiguration {
            public ReactiveSupportConfiguration() {
            }
            //从服务注册中心发现服务,且进行缓存配置
            @Bean
            @ConditionalOnBean({ReactiveDiscoveryClient.class})
            @ConditionalOnMissingBean
            @ConditionalOnProperty(
                value = {"spring.cloud.loadbalancer.configurations"},
                havingValue = "default",
                matchIfMissing = true
            )
            public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {
                return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
            }
            //从服务注册中心发现服务,且区域优先配置
            @Bean
            @ConditionalOnBean({ReactiveDiscoveryClient.class})
            @ConditionalOnMissingBean
            @ConditionalOnProperty(
                value = {"spring.cloud.loadbalancer.configurations"},
                havingValue = "zone-preference"
            )
            public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {
                return ServiceInstanceListSupplier.builder().withDiscoveryClient().withZonePreference().withCaching().build(context);
            }
            //从注册中心发现服务,且进行健康检查配置
            @Bean
            @ConditionalOnBean({ReactiveDiscoveryClient.class})
            @ConditionalOnMissingBean
            @ConditionalOnProperty(
                value = {"spring.cloud.loadbalancer.configurations"},
                havingValue = "health-check"
            )
            public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {
                return ServiceInstanceListSupplier.builder().withDiscoveryClient().withHealthChecks().withCaching().build(context);
            }
            ......省略......
        }
    }
    

    参考https://blog.csdn.net/weixin_42189048/article/details/117843278

    展开全文
  • 这个项目即将为您提供一个用 JAVA 编写的简单(但功能强大)的负载均衡器。 “基于 Java 的负载均衡器的性能可能永远无法与用 C/C++ 编写的良好负载均衡器相比,但随着平台的进一步发展,它只会变得更好。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 56,615
精华内容 22,646
关键字:

balancer

友情链接: kernel-aodv.rar