Spring Cloud-Ribbon 客户端负载均衡

Spring Cloud-Ribbon 客户端负载均衡

第一节 简介

1.1 负载均衡

  负载均衡是高可用架构的一个关键组件,主要用来提高性能和可用性,通过负载均衡将流量分发到多个服务器,同时多服务器能够消除这部分的单点故障。

1.2 两种方案

  目前主流的负载方案有两种:一种是集中式负载均衡,在消费者和服务提供方中间使用独立的代理方式进行负载,有硬件(F5)也有软件(Nginx)。另一种是客户端做负载均衡,根据自己的请求情况做负载,Ribbon就是客户端负载均衡。

  Ribbon是Netflix开源的一款用于客户端负载均衡的软件。GitHub地址:https://github.com/Netflix/ribbon。

1.3 Ribbon模块

Ribbon模块如下:

  • ribbon-loadbalancer: 负载均衡模块,可独立使用,也可以和别的模块一起使用。Ribbon内置的负载均衡算法都实现在其中。
  • ribbon-eureka: 基于Eureka封装的模块,能够快速、方便地集成Eureka。
  • ribbon-transport: 基于Netty实现多协议的支持,比如HTTP、TCP、UDP等。
  • ribbon-httpclient: 基于Apache HttpClient封装的REST客户端,集成了负载均衡模块,可以直接在项目中使用来调用接口。
  • ribbon-example: Ribbon使用代码示例,通过这些示例能够让你的学习事半功倍。
  • ribbon-core: 一些比较核心且具有通用性的代码,客户端API的一些配置和其他API的定义。

第二节 实战练习

2.1 使用Ribbon

  这里演示一下Ribbon如何去做负载操作,调用接口用的是最底层的HttpURLConnection,也可以用其他客户端,或者直接用Ribbon Client执行。

  依赖如下,只添加Ribbon相关依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-loadbalancer</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.10</version>
</dependency>

  生产者选用eureka-client-user-service的/user/hello服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args){
// 服务列表
List<Server> serverList = Arrays.asList(
new Server("localhost", 8081),
new Server("localhost", 8083));

// 构建负载实例
ILoadBalancer loadBalancer = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(serverList);

// 调用 5 次来测试效果
for (int i = 0; i < 5; i++) {
String result = LoadBalancerCommand.<String>builder()
.withLoadBalancer(loadBalancer)
.build()
.submit(new ServerOperation<String>() {
@Override
public Observable<String> call(Server server) {
try {
String addr = "http://" + server.getHost() + ":" + server.getPort() + "/user/hello";
System.out.println(" 调用地址:" + addr);
URL url = new URL(addr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.connect();
InputStream in = conn.getInputStream();
byte[] data = new byte[in.available()];
in.read(data);
return Observable.just(new String(data));
}catch (Exception e){
return Observable.error(e);
}
}
}).toBlocking().first();
System.out.println(" 调用结果:" + result);
}
}

  复制一份eureka-client-user-service项目并配置端口8083启动,如下。

两个相同服务

  执行测试代码,结果如下,负载起作用了,8083端口调用了3次,8081调用了2次。

1
2
3
4
5
6
7
8
9
10
调用地址:http://localhost:8083/user/hello
调用结果:hello
调用地址:http://localhost:8081/user/hello
调用结果:hello
调用地址:http://localhost:8083/user/hello
调用结果:hello
调用地址:http://localhost:8081/user/hello
调用结果:hello
调用地址:http://localhost:8083/user/hello
调用结果:hello

2.2 RestTemplate结合Ribbon

2.2.1 案例

  Spring Cloud对Ribbon进行了一层封装,集成好了一些配置。RestTemplate是Spring提供的一种简单便捷的模板类来进行API的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class BeanConfiguration {
@Bean
public RestTemplate getRestTemplate(){
return new RestTemplate();
}
}

@RestController
public class HouseController {
@GetMapping("/house/data")
public HouseInfo getData(@RequestParam("name") String name) {
return new HouseInfo(1L,"上海","虹口","东体小区");
}

@GetMapping("/house/data/{name}")
public String getData2(@PathVariable("name") String name) {
return name;
}
}

@RestController
public class HouseClientController {
@Autowired
private RestTemplate restTemplate;

@GetMapping("/call/data")
public HouseInfo getData(@RequestParam("name") String name) {
return restTemplate.getForObject("http://localhost:8081/house/data?name=" + name, HouseInfo.class);
}

@GetMapping("/call/data/{name}")
public String getData2(@PathVariable("name") String name) {
return restTemplate.getForObject("http://localhost:8081/house/{name}", String.class, name);
}
}

  运行并观察结果。

结果
结果

2.2.2 getForObject

  getForObject有三个重载实现,url可以是字符串或URI对象,responseType是返回值类型,uriVariables是Path Variable参数,有两种方式,一种是可变参数,另一种是Map形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Nullable
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor(responseType, this.getMessageConverters(), this.logger);
return this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, (Object[])uriVariables);
}

@Nullable
public <T> T getForObject(String url, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor(responseType, this.getMessageConverters(), this.logger);
return this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, (Map)uriVariables);
}

@Nullable
public <T> T getForObject(URI url, Class<T> responseType) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor(responseType, this.getMessageConverters(), this.logger);
return this.execute(url, HttpMethod.GET, requestCallback, responseExtractor);
}

2.2.3 getForEntity

  getForEntity也可以用来获取数据,可以获取返回的状态码、请求头等信息,通过getBody获取响应的内容,和getForObject类似也有三种重载实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
ResponseExtractor<ResponseEntity<T>> responseExtractor = this.responseEntityExtractor(responseType);
return (ResponseEntity)nonNull(this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));
}

public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
ResponseExtractor<ResponseEntity<T>> responseExtractor = this.responseEntityExtractor(responseType);
return (ResponseEntity)nonNull(this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));
}

public <T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
ResponseExtractor<ResponseEntity<T>> responseExtractor = this.responseEntityExtractor(responseType);
return (ResponseEntity)nonNull(this.execute(url, HttpMethod.GET, requestCallback, responseExtractor));
}

  运行如下代码并观察结果。

1
2
3
4
5
6
7
8
9
@GetMapping("/call/dataEntity")
public HouseInfo getForEntity(@RequestParam("name") String name) {
ResponseEntity<HouseInfo> responseEntity = restTemplate.getForEntity(
"http://localhost:8081/house/data?name=" + name, HouseInfo.class);
if(responseEntity.getStatusCodeValue() == 200){
return responseEntity.getBody();
}
return null;
}

结果

2.2.4 postForObject

  当然如果是Post请求,则有对应的postForObject函数,如下代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HouseController {

......

@PostMapping("/house/save")
public Long addData(@RequestBody HouseInfo houseInfo) {
System.out.println(houseInfo.getName());
return 1001L;
}
}

public class HouseClientController {
......

@GetMapping("/call/save")
public Long add() {
HouseInfo houseInfo = new HouseInfo();
houseInfo.setCity(" 上海 ");
houseInfo.setRegion(" 虹口 ");
houseInfo.setName("XXX");
Long id = restTemplate.postForObject("http://localhost:8081/house/save", houseInfo, Long.class);
return id;
}
}

结果

  还有Put、Delete等Http Method不再赘述。exchange函数也是一个比较常用的方法,可以执行get、post、put、delete这四种请求方式。

2.2.5 集成Ribbon

  Spring Cloud项目中集成Ribbon只要添加如下依赖,也可以不用配置,Eureka中已经引用了Ribbon。

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>

2.2.6 动态发现服务并负载均衡

  RestTemplate可以结合Eureka来动态发现服务并进行负载均衡的调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class BeanConfiguration {
@Bean
@LoadBalanced //使RestTemplate具有负载均衡能力
public RestTemplate getRestTemplate(){
return new RestTemplate();
}
}

//将ID+PORT改为服务名称,即注册到Eureka的名字
@GetMapping("/call/data")
public HouseInfo getData(@RequestParam("name") String name) {
return restTemplate.getForObject("http://eureka-client-user-service/house/data?name=" + name, HouseInfo.class);
}

结果

2.2.7 LoadBalanced注解原理

  Spring Cloud封装了大量的底层工作,主要是给RestTemplate增加拦截器,在请求之前对请求的地址进行替换或根据具体的负载策略选择服务器地址,然后再去调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 自定义RestTemplate拦截器
public class MyLoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancerClient;
private LoadBalancerRequestFactory requestFactory;

public MyLoadBalancerInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
this.loadBalancerClient = loadBalancerClient;
this.requestFactory = requestFactory;
}

public MyLoadBalancerInterceptor(LoadBalancerClient loadBalancerClient) {
this(loadBalancerClient, new LoadBalancerRequestFactory(loadBalancerClient));
}

@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
final URI originalUri = httpRequest.getURI();
String serviceName = originalUri.getHost();
System.out.println("进入自定义的请求拦截器中" + serviceName);
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancerClient.execute(serviceName, requestFactory.createRequest(httpRequest, bytes, clientHttpRequestExecution));
}
}


// 自定义注解
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface MyLoadBalanced {
}

// 给RestTemplate注入拦截器
@Configuration
public class MyLoadBalancerAutoConfiguration {
@MyLoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Bean
public MyLoadBalancerInterceptor myLoadBalancerInterceptor(){
return new MyLoadBalancerInterceptor(new RibbonLoadBalancerClient(new SpringClientFactory()));
}

@Bean
public SmartInitializingSingleton myLoadBalancedRestTemplateInitializer(){
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
for (RestTemplate restTemplate : MyLoadBalancerAutoConfiguration.this.restTemplates){
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(myLoadBalancerInterceptor());
restTemplate.setInterceptors(list);
}
}
};
}
}

// 使用自定义拦截注解
@Configuration
public class BeanConfiguration {
@Bean
@MyLoadBalanced //使RestTemplate具有负载均衡能力
public RestTemplate getRestTemplate(){
return new RestTemplate();
}
}

  重启服务,并观察控制台输出。

  更多内容参考:LoadBalanced注解原理

2.2.8 Ribbon API使用

  当我们有一些特殊需求需要Ribbon来获取对应服务信息(ribbon-eureka-demo)时,可以使用LoadRibbonClient来获取,假设我们想要获取一个服务的服务地址,可以通过LoadRibbonClient的choose函数来获取

1
2
3
4
5
6
7
private LoadBalancerClient loadBalancer;

@GetMapping("/choose")
public Object chooseUrl(){
ServiceInstance instance = loadBalancer.choose("ribbon-eureka-demo");
return instance;
}

​ 启动,访问接口,可以看到如下信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
serviceId: "ribbon-eureka-demo",
server: {
host: "localhost",
port: 8081,
id: "localhost:8081",
zone: "UNKNOWN",
readyToServer: true,
alive: true,
hostPort: "localhost:8081",
metaInfo: {
serverGroup: null,
serviceForDiscovery: null,
instanceId: "localhost:8081",
appName: null
}
},
secure: false,
metadata: { },
host: "localhost",
port: 8081,
uri: "http://localhost:8081"
}

2.2.9 Ribbon饥饿加载

​ 在日常使用时经常会遇到一个问题:在网络不佳的场景下进行服务调用时,第一次调用会超时。一些解决方案是,比如把超时时间改长,或者直接禁用超时等。Spring Cloud在其更新中解决了此问题。

​ Ribbon的客户端是在第一次请求时初始化的,如果超时时间比较短的话,初始化Client时间再加上请求接口的时间,就导致了第一次请求超时。Spring Cloud的解决方案是eager-load方式。

1
2
3
4
# 开启Ribbon的饥饿加载模式
ribbon.eager-load.enabled=true
# 指定需要饥饿加载的服务名,即需要调用的服务,多个可以用逗号隔开
ribbon.eager-load.clients=ribbon-eureka-demo

​ 通过调试源码的方式可以来验证,而不需模拟网络不佳的环境,org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration源码如下,在return设置断点调试,如果执行则证明配置成功。

1
2
3
4
5
@Bean
@ConditionalOnProperty({"ribbon.eager-load.enabled"})
public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
return new RibbonApplicationContextInitializer(this.springClientFactory(), this.ribbonEagerLoadProperties.getClients());
}

第三节 负载均衡策略

​ Ribbon作为一款客户端负载均衡框架,默认的负载策略是轮询,但同时也提供了多种其它策略来选择。

负载均衡策略:

  • BestAvailableRule:选择一个最小的并发请求Server,逐个考察Server,若Server被标记为错误,则跳过,然后再选择ActiveRequestCount中最小的Server。
  • AvailabilityFilteringRule:过滤掉那些一直连接失败的且被标记为circuit tripped的后端Server,并过滤掉那些高并发的后端Server或使用一个AvailabilityPredicate来包含过滤Server的逻辑。其实就是检查Status里记录的各个Server的运行状态。
  • ZoneAvoidanceRule:使用ZoneAvoidancePredicate和AvailabilityPredicate来判断是否选择某个Server,前者判定一个Zone的运行性能是否可用,剔除不可用的Zone,后者用于过滤掉连接数过多的Server。
  • RandomRule:随机选择一个Server。
  • RoundRobinRule:轮询选择,轮询index,选择index对应位置的Server。
  • RetryRule:对选定的负载均衡策略再加上重试机制,即选定了某个策略进行请求负载时在一个配置的时间段内若选择Server不成功,则一直尝试使用subRule的方式选择一个可用的Server。
  • ResponseTimeWeightedRule:后改名为WeightedResponseTimeRule,作用同WeightedResponseTimeRule。
  • WeightedResponseTimeRule:根据响应时间分配一个Weight(权重),响应时间越长,Weight越小,被选中的可能性越低。

第四节 自定义负载策略

​ 通过实现IRule接口可以自定义负载策略,主要的选择服务策略是choose方法,此处只演示如何自定义,具体的选择策略则忽略,直接返回服务列表中的第一个服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyRule implements IRule {
private ILoadBalancer lb;

@Override
public Server choose(Object key) {
List<Server> servers = lb.getAllServers();
for (Server server : servers) {
System.out.println(server.getHostPort());
}
return servers.get(0);
}

@Override
public void setLoadBalancer(ILoadBalancer lb) {
this.lb = lb;
}

@Override
public ILoadBalancer getLoadBalancer() {
return lb;
}
}

​ 然后再配置文件中添加如下配置。

1
ribbon-config-demo.ribbon.NFLoadBalancerRuleClassName=xxx.xxx.xxx.rule.MyRule

​ 重启服务,访问调用服务的接口,可以看到控制台输出信息,每次都是调用第一个服务。


第五节 常用配置

5.1 配置介绍

5.1.1 禁用Eureka

​ Eureka中的服务信息会被拉取到客户端本地,如果不想和Eureka集成,可以通过配置禁用。禁用后需要通过服务地址访问服务。

1
2
# 禁用Eureka
ribbon.eureka.enabled=false

5.1.2 配置接口地址列表

​ 此配置针对具体服务,如上ribbon-config-demo,配置后即可继续使用服务名称来调用接口。

1
2
# 禁用Eureka后手动配置服务地址
ribbon-config-demo.ribbon.listOfServers=localhost:8081,localhost:8083

5.1.3 配置负载均衡策略

1
2
# 配置负载均衡策略
ribbon-config-demo.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RandomRule

5.1.4 超时时间

​ Ribbon中有两种和时间相关的设置,分别是请求连接的超时时间和请求处理的超时时间,设置规则如下:

1
2
3
4
5
6
7
8
# 请求连接的超时时间
ribbon.ConnectTimeOut=2000
# 请求处理的超时时间
ribbon.ReadTimeout=5000

# 也可以针对每个客户端设置不同的超时时间
ribbon-config-demo.ribbon.ConnectTimeOut=2000
ribbon-config-demo.ribbon.ReadTimeout=5000

5.1.5 并发参数

1
2
3
4
# 最大连接数
ribbon.MaxTotalConnections=500
# 每个Host最大连接数
ribbon.MaxConnectionPerHost=500

5.2 代码配置

​ 除了配置文件,也可以通过代码进行配置。首先创建一个配置类,初始化自定义的策略。

1
2
3
4
5
6
7
@Configuration
public class BeanConfiguration {
@Bean
public MyRule rule() {
return new MyRule();
}
}

​ 创建一个Ribbon客户端的配置类,管理BeanConfiguration,用name来指定调用的服务名称。

1
2
3
4
@RibbonClient(name = "ribbon-config-demo", configuration = BeanConfiguration.class)
public class RibbonClientConfig {

}

5.3 文件配置

1
2
3
4
5
<clientName>.ribbon.NFLoadBalancerClassName: Should implement ILoadBalancer(负载均衡操作接口)
<clientName>.ribbon.NFLoadBalancerRuleClassName: Should implement IRule(负载均衡算法)
<clientName>.ribbon.NFLoadBalancerPingClassName: Should implement IPing(服务可用性检查)
<clientName>.ribbon.NIWSServerListClassName: Should implement ServerList(服务列表获取)
<clientName>.ribbon.NIWSServerListFilterClassName: Should implement ServerListFilter(服务列表的过滤)

第六节 重试机制

​ 故障是难以避免的,当用Nginx做负载均衡时,若应用是无状态的、可以滚动发布的,也就是需要一台台去重启应用,这样对用户的影响比较小,因为Nginx在转发请求失败后会重新将该请求转发到别的实例上。

​ 由于Eureka是基于AP原则构建的,牺牲了数据的一致性,每个Eureka服务都会保存注册的服务信息,当注册的客户端与Eureka的心跳无法保持时,有可能是网络原因,也可能是服务挂掉了。在这种情况下,Eureka中还会在一段时间内保存注册信息。这个时候客户端有可能拿到已经挂掉的服务信息,所以Ribbon可能会拿到已经失效的服务信息,导致失败的请求。

​ 这种情况可以通过重试机制来避免,即当Ribbon发现请求不可到达时,重新请求另外的服务。

6.1 RetryRule重试

​ 最简单的解决方式就是开启Ribbon自带的重试策略。

1
ribbon-config-demo.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RetryRule

6.2 Spring Retry重试

​ 还可以通过集成Spring Retry来进行重试操作。

​ 添加依赖。

1
2
3
4
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>

​ 然后配置重试次数等信息。

1
2
3
4
5
6
7
8
# 对当前实例的重试次数
ribbon.maxAutoRetries=1
# 切换实例的重试次数
ribbon.maxAutoRetriesNextServer=3
# 对所有操作请求都进行重试
ribbon.okToRetryOnAllOperations=true
# 对Http响应码进行重试
ribbon.retryableStatusCodes=500,404,502

参考博客和文章书籍等:

《Spring Cloud微服务-入门、实战和进阶》

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容