通过自定义feignclient 的LoadBalancerFeignClient实现灵活的负载均衡策略

通过自定义feignclient 的LoadBalancerFeignClient 或IRule 能实现完全自定义的负载均衡策略,本文主要是通过实现自定义的LoadBalancerFeignClient而达到自定义的负载均衡策略

示例代码实现如下:

package cn.zuowenjun.demo;

import com.netflix.loadbalancer.Server;
import feign.Client;
import feign.Request;
import feign.Response;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory;
import org.springframework.cloud.netflix.feign.ribbon.LoadBalancerFeignClient;
import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import java.io.IOException;
import java.net.URL;
import java.util.*;
import java.util.stream.Collectors;

/**
 * FeignClient 服务BEAN(含自定义负载均衡请求机制),注意这里指定了configuration的类型
 */
@FeignClient(value ="zuowenjun-demo" , configuration = {DemoProviderDispatchService.Config.class})
public interface DemoProviderDispatchService {
    Logger LOGGER = LoggerFactory.getLogger(FileFmsDispatchService.class);

    //要RPC请求的接口,需要自定义负载均衡
    @RequestMapping(value = "/fileContent/compare", method = RequestMethod.POST)
    ResponseData<Integer> compare(@RequestBody FileBillCompareBO billCompareBO);

    /**
     * DemoProviderDispatchService 专用的配置类,在这个配置类里面添加的BEAN均可替换全局默认的BEAN
     * 注意:此处不能加上@Configuration,否则将变成全局配置了
     */
    static class Config {
        /**
         * 为FileFmsDispatchClient 重新定义专用的Client,在里面实现自定义的URL请求
         *
         * @param cachingFactory
         * @param clientFactory
         * @return
         */
        @Bean
        public Client requestClient(CachingSpringLoadBalancerFactory cachingFactory, SpringClientFactory clientFactory) {

            //获取当前节点的ID与端口
            String currentIpAndPort = CommonUtils.getCurrentIpAndPort();

            //构建返回一个完全自定义的LoadBalancerFeignClient
            return new LoadBalancerFeignClient(new Client.Default(null, null) {
                @Override
                public Response execute(Request request, Request.Options options) throws IOException {
                    //通过重新创建Request,将Request 中的url指向自定义负载均衡的选中的URL
                    Request newRequest = reCreateRequestForLoadBalance(request);
                    if (newRequest == null) {
                        return Response.builder().reason("服务器繁忙,当前无可用服务节点").status(204).build();
                    }

                    return super.execute(newRequest, options);
                }

                private Request reCreateRequestForLoadBalance(Request request) throws IOException {
                    URL url = new URL(request.url());
                    //获取可用的服务实例节点列表
                    List<Server> upServers = clientFactory.getLoadBalancer(“zuowenjun-demo”).getReachableServers();
                    Server bestServer = choose(url, upServers, url.getFile());
                    if (bestServer == null) {
                        //找不到最佳可用服务器节点,说明所有服务节点都压力山大
                        return null;
                    }

                    url = new URL(url.getProtocol(), bestServer.getHost(), bestServer.getPort(), url.getFile());
                    return Request.create(request.method(), url.toString(), request.headers(), request.body(), request.charset());
                }


                /**
                 * 选择最优的服务节点
                 * @param url
                 * @param upServers
                 * @param apiPath
                 * @return
                 */
                private Server choose(URL url, List<Server> upServers, String apiPath) {

                    if (CollectionUtils.isEmpty(upServers)) {
                        throw new ApplicationException(500, "从注册中心获取不到可用的服务节点信息");
                    }

                    apiPath=apiPath.startsWith("/")?apiPath.substring(1):apiPath;
                    String hashKey = Constants.LOADBALANCE_API_PREFIX + apiPath.replace("/", "_");
                    Boolean existRequest = RedisUtils.existHashKey(hashKey, String.format("%s:%s", url.getHost(), url.getPort()));
                    Server bestServer = null;
                    if (!Boolean.TRUE.equals(existRequest)) {
                        //如果当前即将请求的URL的节点之前没有缓存标记请求处理中时,则可直接复用返回
                        bestServer = upServers.stream().filter(s -> s.getHost().equals(url.getHost()) && s.getPort() == url.getPort()).findFirst().orElse(null);
                        if (bestServer != null) {
                            return bestServer;
                        }
                    }

                    //先从缓存中找出当前API 的请求中的节点列表
                    Map<Object, Object> existRequestMap = RedisUtils.getHashEntries(hashKey);
                    Set<Object> existRequestIpAndPorts = new HashSet<>();
                    if (MapUtils.isNotEmpty(existRequestMap)) {
                        existRequestIpAndPorts.addAll(existRequestMap.keySet());
                    }

                    //排除API请求中的节点,保留空闲节点列表
                    upServers = upServers.stream().filter(s -> !existRequestIpAndPorts.contains(s.getHostPort()) && !s.getHostPort().equals(currentIpAndPort)).collect(Collectors.toList());
                    if (CollectionUtils.isEmpty(upServers)) {
                        //说明当前所有节点全部都有处理请求中,无空闲节点
                        return null;
                    }

                    //从空闲节点列表中随机返回一个节点
                    int rndNo = new Random().nextInt(upServers.size());
                    bestServer = upServers.get(rndNo);
                    LOGGER.debug("DemoProviderDispatchService.Config.requestClient.Client#choose {}", bestServer.getHostPort());
                    return bestServer;
                }

            }, cachingFactory, clientFactory);
        }


    }

}

调用时就正常注入DemoProviderDispatchService BEAN,并使用:demoProviderDispatchService.compare(…) 即可实现在自定义负载均衡的策略下请求远程服务API,这种自定义的负载均衡策略可以满足特定的性能要求

来源链接:https://www.cnblogs.com/zuowj/p/18579778

© 版权声明
THE END
支持一下吧
点赞9 分享
评论 抢沙发
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码

    暂无评论内容