龗孖 龗孖
首页
  • JAVA
  • 设计模式
  • 前端文章

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 页面

    • HTML
    • CSS
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

靇孖

某微型企业非牛逼技术专家。
首页
  • JAVA
  • 设计模式
  • 前端文章

    • JavaScript
  • 学习笔记

    • 《JavaScript教程》
    • 《JavaScript高级程序设计》
    • 《ES6 教程》
    • 《Vue》
    • 《React》
    • 《TypeScript 从零实现 axios》
    • 《Git》
    • TypeScript
    • JS设计模式总结
  • 页面

    • HTML
    • CSS
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • JAVA

  • MQ

  • 工具

  • 微服务

    • 学习

    • 架构

      • SpringCloudGateway基于nacos如何去做灰度发布
        • 架构层次,如何保证高并发
      • 降级、熔断、限流
    • 数据库

    • 其他

    • 程序设计

    • 算法

    • 服务端
    • 微服务
    • 架构
    龗孖
    2024-02-14
    目录

    SpringCloudGateway基于nacos如何去做灰度发布

    # nacos的matedata

    我们在向 Nacos Server 进行服务注册的时候往往会附加一些 metadata ,可以参考官方文档中 Dubbo 融合 Nacos 成为注册中心 (opens new window) 章节。 充分利用好服务实例的 metadata ,可以衍生出许多有意思的实践。 完全可以把相关内容放进 metadata 中,好比说版本号,特性名等等

    然后再根据负载均衡路由到不同的服务

    spring.cloud.nacos.discovery.metadata.version=1.15
    spring.cloud.nacos.discovery.metadata.advance=true
    
    1
    2

    # 准备工作

    nacos 部署 gateway 部署 -可以参考 部署两台服务A

    # 开始

    # 跟踪代码

        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
            String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
            if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
                ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
                if (log.isTraceEnabled()) {
                    log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
                }
    
                return this.choose(exchange).doOnNext((response) -> {
                    if (!response.hasServer()) {
                        throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());
                    } else {
                        ServiceInstance retrievedInstance = (ServiceInstance)response.getServer();
                        URI uri = exchange.getRequest().getURI();
                        String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
                        if (schemePrefix != null) {
                            overrideScheme = url.getScheme();
                        }
    
                        DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
                        URI requestUrl = this.reconstructURI(serviceInstance, uri);
                        if (log.isTraceEnabled()) {
                            log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                        }
    
                        exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                    }
                }).then(chain.filter(exchange));
            } else {
                return chain.filter(exchange);
            }
        }
    
        protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
            return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
        }
    
        private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
            URI uri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
            ReactorLoadBalancer<ServiceInstance> loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);
            if (loadBalancer == null) {
                throw new NotFoundException("No loadbalancer available for " + uri.getHost());
            } else {
                return loadBalancer.choose(this.createRequest());
            }
        }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    	@SuppressWarnings("rawtypes")
    	@Override
    	// see original
    	// https://github.com/Netflix/ocelli/blob/master/ocelli-core/
    	// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.java
    	public Mono<Response<ServiceInstance>> choose(Request request) {
    		// TODO: move supplier to Request?
    		// Temporary conditional logic till deprecated members are removed.
    		if (serviceInstanceListSupplierProvider != null) {
    			ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
    					.getIfAvailable(NoopServiceInstanceListSupplier::new);
    			return supplier.get().next().map(this::getInstanceResponse);
    		}
    		ServiceInstanceSupplier supplier = this.serviceInstanceSupplier
    				.getIfAvailable(NoopServiceInstanceSupplier::new);
    		return supplier.get().collectList().map(this::getInstanceResponse);
    	}
    
    	private Response<ServiceInstance> getInstanceResponse(
    			List<ServiceInstance> instances) {
    		if (instances.isEmpty()) {
    			log.warn("No servers available for service: " + this.serviceId);
    			return new EmptyResponse();
    		}
    		// TODO: enforce order?
    		int pos = Math.abs(this.position.incrementAndGet());
    
    		ServiceInstance instance = instances.get(pos % instances.size());
    
    		return new DefaultResponse(instance);
    	}
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    通过代码跟踪 ReactiveLoadBalancerClientFilter 与 RoundRobinLoadBalancer 可以发现,最终 我们只需要对 getInstanceResponse 进行改造 即可满足所有需要

    动手!

    # 开始修改代码

    我们只需要新增一个 GlobalFilter 在 AdvanceReactiveLoadBalancerClientFilter 执行之前 ,并且对LoadBalancer 的getInstanceResponse 做一下稍微改造就OK了

    # 复制 RoundRobinLoadBalancer 内容 并修改 getInstanceResponse() 逻辑

    package top.lingma.gateway.loadbalancer;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.beans.factory.ObjectProvider;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
    import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
    import org.springframework.cloud.client.loadbalancer.reactive.Request;
    import org.springframework.cloud.client.loadbalancer.reactive.Response;
    import org.springframework.cloud.loadbalancer.core.*;
    import reactor.core.publisher.Mono;
    
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Collectors;
    
    public class AdvanceRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    
        private static final Log log = LogFactory.getLog(AdvanceRoundRobinLoadBalancer.class);
    
        private final AtomicInteger position;
        private final AtomicInteger positionAdvance;
    
        @Deprecated
        private ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier;
    
        private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    
        private final String serviceId;
    
    
        @Deprecated
        public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier) {
            this(serviceId, serviceInstanceSupplier, new Random().nextInt(1000));
        }
    
        public AdvanceRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
            this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));
        }
    
    
        public AdvanceRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition) {
            this.serviceId = serviceId;
            this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
            this.position = new AtomicInteger(seedPosition);
            this.positionAdvance = new AtomicInteger(seedPosition);
        }
    
        @Deprecated
        public AdvanceRoundRobinLoadBalancer(String serviceId, ObjectProvider<ServiceInstanceSupplier> serviceInstanceSupplier, int seedPosition) {
            this.serviceId = serviceId;
            this.serviceInstanceSupplier = serviceInstanceSupplier;
            this.position = new AtomicInteger(seedPosition);
            this.positionAdvance = new AtomicInteger(seedPosition);
        }
    
    
        @Override
    
        public Mono<Response<ServiceInstance>> choose(Request request) {
    
    
            if (serviceInstanceListSupplierProvider != null) {
                ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
                return supplier.get().next().map((instances) -> {
                    // 此处做了选择逻辑的修改
                    if (request instanceof AdvanceRequestContext) {
                        List<ServiceInstance> advanceInstance = instances.stream().filter(s -> s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
                        return getInstanceResponse(advanceInstance, request);
                    } else {
                        List<ServiceInstance> routineInstance = instances.stream().filter(s -> !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
                        return getInstanceResponse(routineInstance, request);
                    }
    
                });
            }
            ServiceInstanceSupplier supplier = this.serviceInstanceSupplier.getIfAvailable(NoopServiceInstanceSupplier::new);
            return supplier.get().collectList().map((instances) -> {
                if (request instanceof AdvanceRequestContext) {
                    // 此处做了选择逻辑的修改
                    List<ServiceInstance> advanceInstance = instances.stream().filter(s -> s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
                    return getInstanceResponse(advanceInstance, request);
                } else {
                    List<ServiceInstance> instance = instances.stream().filter(s -> !s.getMetadata().getOrDefault("advance", "").equals("true")).collect(Collectors.toList());
                    return getInstanceResponse(instance, request);
                }
    
            });
        }
    
        private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, Request request) {
            if (instances.isEmpty()) {
                if (request instanceof AdvanceRequestContext) {
                    return new AdvanceEmptyResponse();
                }
                log.warn("No servers available for service: " + this.serviceId);
                return new EmptyResponse();
            }
            int pos = 1;
            //灰度发布选择逻辑
            if (request instanceof AdvanceRequestContext) {
                pos = Math.abs(this.positionAdvance.incrementAndGet());
            } else {
                pos = Math.abs(this.position.incrementAndGet());
            }
            ServiceInstance instance = instances.get(pos % instances.size());
            return new DefaultResponse(instance);
    
        }
    
    }
    
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115

    # AdvanceEmptyResponse 类是为了标识无灰度发布服务器,此时可以走正常服务器

    package top.lingma.gateway.loadbalancer;
    
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.loadbalancer.reactive.CompletionContext;
    import org.springframework.cloud.client.loadbalancer.reactive.Response;
    
    public class AdvanceEmptyResponse extends org.springframework.cloud.client.loadbalancer.EmptyResponse implements Response<ServiceInstance> {
        public AdvanceEmptyResponse() {
        }
    
        public void onComplete(CompletionContext completionContext) {
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    # AdvanceRequestContext 是为了能从 GlobalFilter 传递信息到 LoadBalancer

    package top.lingma.gateway.loadbalancer;
    
    import org.springframework.cloud.client.loadbalancer.reactive.Request;
    import org.springframework.web.server.ServerWebExchange;
    
    public class AdvanceRequestContext<T> implements Request {
    
        private T exchange;
    
        public AdvanceRequestContext(T exchange) {
            this.exchange = exchange;
        }
    
        @Override
        public T getContext() {
            return exchange;
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19

    # AdvanceReactiveLoadBalancerClientFilter 复制于 ReactiveLoadBalancerClientFilter

    注意两点 第一灰度服务器选择在ReactiveLoadBalancerClientFilter 之前 LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1;

    package top.lingma.gateway.loadbalancer;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
    import org.springframework.cloud.client.loadbalancer.reactive.Response;
    import org.springframework.cloud.gateway.config.LoadBalancerProperties;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.cloud.gateway.filter.GlobalFilter;
    import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
    import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
    import org.springframework.cloud.gateway.support.NotFoundException;
    import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
    import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
    import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
    import org.springframework.core.Ordered;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    import java.net.URI;
    import java.util.List;
    
    import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;
    
    @Component
    public class AdvanceReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {
    
        private static final Log log = LogFactory.getLog(ReactiveLoadBalancerClientFilter.class);
    
        private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150 - 1;
    
        private final LoadBalancerClientFactory clientFactory;
    
        private LoadBalancerProperties properties;
    
        public AdvanceReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
            this.clientFactory = clientFactory;
            this.properties = properties;
        }
    
        @Override
        public int getOrder() {
            return LOAD_BALANCER_CLIENT_FILTER_ORDER;
        }
    
        @Override
        @SuppressWarnings("Duplicates")
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            // 灰度用户专属服务器         判定是否是灰度用户,是否拥有灰度权限 不然直接进行下一步
            List<String> secChUa = exchange.getRequest().getHeaders().get("sec-ch-ua");
            if (secChUa == null || secChUa.isEmpty() || !secChUa.stream().findFirst().map(r -> r.contains("Edge")).orElse(false)) {
                return chain.filter(exchange);
            }
    
            URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
            String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
            if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
                return chain.filter(exchange);
            }
            // preserve the original url
            addOriginalRequestUrl(exchange, url);
    
            if (log.isTraceEnabled()) {
                log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
            }
    
            return choose(exchange).doOnNext(response -> {
                if (response instanceof AdvanceEmptyResponse) {
                    return;
                }
                if (!response.hasServer()) {
                    throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
                }
    
                ServiceInstance retrievedInstance = response.getServer();
    
                URI uri = exchange.getRequest().getURI();
    
                // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
                // if the loadbalancer doesn't provide one.
                String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
                if (schemePrefix != null) {
                    overrideScheme = url.getScheme();
                }
    
                DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
    
                URI requestUrl = reconstructURI(serviceInstance, uri);
    
                if (log.isTraceEnabled()) {
                    log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                }
                exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
            }).then(chain.filter(exchange));
        }
    
        protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
            return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
        }
    
        @SuppressWarnings("deprecation")
        private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
            URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
            ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);
            if (loadBalancer == null) {
                throw new NotFoundException("No loadbalancer available for " + uri.getHost());
            }
            return loadBalancer.choose(createRequest(exchange));
        }
    
        /***
         * 此处进行了改造 传入了内容 方便后续 LoadBalancer 处理信息
         * @param exchange
         * @return
         */
        @SuppressWarnings("deprecation")
        private AdvanceRequestContext<ServerWebExchange> createRequest(ServerWebExchange exchange) {
            return new AdvanceRequestContext(exchange);
        }
    
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124

    # 以上已经完成了灰度发布的必要部分,再进行一下AutoConfiguration 注意,这里不能被Spring 扫描

    package top.lingma.gateway.loadbalancer;
    
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
    import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
    import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.core.env.Environment;
    
    @ConditionalOnDiscoveryEnabled
    public class AdvanceLoadBalancerAutoConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
            String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
            return new AdvanceRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

    # 最后 启动类配置 @LoadBalancerClients 的 defaultConfiguration

    
    @SpringBootApplication()
    @LoadBalancerClients(defaultConfiguration = AdvanceLoadBalancerAutoConfiguration.class)
    public class LingmaGatewayApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(LingmaGatewayApplication.class, args);
        }
    
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    上次更新: 2024/11/04, 10:04:09
    SpringCloudGateway学习
    架构层次,如何保证高并发

    ← SpringCloudGateway学习 架构层次,如何保证高并发→

    最近更新
    01
    树中两个节点的最低公共祖先
    10-17
    02
    hexo多平台多博客网站同步
    09-04
    03
    最长不含重复字符的子字符串
    09-03
    更多文章>
    Theme by Vdoing | Copyright © 2015-2024 Ling ma | 996.icu | 京ICP备16011424号-1
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式