|
19 | 19 | import java.io.IOException; |
20 | 20 | import java.net.URI; |
21 | 21 | import java.nio.charset.StandardCharsets; |
| 22 | +import java.time.Duration; |
22 | 23 | import java.util.ArrayList; |
23 | 24 | import java.util.Arrays; |
24 | 25 | import java.util.Collection; |
|
37 | 38 | import org.apache.servicecomb.governance.processor.injection.FaultInjectionDecorators; |
38 | 39 | import org.apache.servicecomb.governance.processor.injection.FaultInjectionDecorators.FaultInjectionDecorateCheckedSupplier; |
39 | 40 | import org.apache.servicecomb.http.client.common.HttpUtils; |
40 | | -import org.apache.servicecomb.service.center.client.DiscoveryEvents.PullInstanceEvent; |
41 | 41 | import org.slf4j.Logger; |
42 | 42 | import org.slf4j.LoggerFactory; |
43 | 43 | import org.springframework.cloud.client.ServiceInstance; |
|
60 | 60 | import com.huaweicloud.common.context.InvocationContext; |
61 | 61 | import com.huaweicloud.common.context.InvocationContextHolder; |
62 | 62 | import com.huaweicloud.common.context.InvocationStage; |
63 | | -import com.huaweicloud.common.event.EventManager; |
64 | 63 | import com.huaweicloud.common.disovery.InstanceIDAdapter; |
| 64 | +import com.huaweicloud.common.event.EventManager; |
65 | 65 | import com.huaweicloud.governance.adapters.loadbalancer.RetryContext; |
| 66 | +import com.huaweicloud.governance.event.InstanceIsolatedEvent; |
66 | 67 |
|
67 | 68 | import io.github.resilience4j.bulkhead.Bulkhead; |
68 | 69 | import io.github.resilience4j.bulkhead.BulkheadFullException; |
@@ -161,15 +162,15 @@ private Response decorateWithFault(Request request, Options options, URI origina |
161 | 162 | headers.put("Content-Type", Arrays.asList("application/json")); |
162 | 163 | if (result == null) { |
163 | 164 | return Response.builder().status(200) |
164 | | - .request(request) |
165 | | - .headers(headers) |
166 | | - .build(); |
| 165 | + .request(request) |
| 166 | + .headers(headers) |
| 167 | + .build(); |
167 | 168 | } |
168 | 169 | return Response.builder().status(200) |
169 | | - .request(request) |
170 | | - .headers(headers) |
171 | | - .body(HttpUtils.serialize(result).getBytes( |
172 | | - StandardCharsets.UTF_8)).build(); |
| 170 | + .request(request) |
| 171 | + .headers(headers) |
| 172 | + .body(HttpUtils.serialize(result).getBytes( |
| 173 | + StandardCharsets.UTF_8)).build(); |
173 | 174 | } |
174 | 175 | } catch (Throwable e) { |
175 | 176 | throw new RuntimeException(e); |
@@ -336,42 +337,47 @@ private Response executeWithInstanceIsolation(GovernanceRequestExtractor governa |
336 | 337 | org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse, |
337 | 338 | Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean useRawStatusCodes) { |
338 | 339 |
|
339 | | - try { |
340 | | - CircuitBreakerPolicy circuitBreakerPolicy = instanceIsolationHandler.matchPolicy(governanceRequest); |
341 | | - if (circuitBreakerPolicy != null && circuitBreakerPolicy.isForceOpen()) { |
342 | | - return Response.builder().status(503) |
343 | | - .reason("Policy " + circuitBreakerPolicy.getName() + " forced open and deny requests").request(feignRequest) |
344 | | - .build(); |
| 340 | + CircuitBreakerPolicy circuitBreakerPolicy = instanceIsolationHandler.matchPolicy(governanceRequest); |
| 341 | + if (circuitBreakerPolicy != null && circuitBreakerPolicy.isForceOpen()) { |
| 342 | + return Response.builder().status(503) |
| 343 | + .reason("Policy " + circuitBreakerPolicy.getName() + " forced open and deny requests").request(feignRequest) |
| 344 | + .build(); |
| 345 | + } |
| 346 | + |
| 347 | + if (circuitBreakerPolicy != null && !circuitBreakerPolicy.isForceClosed()) { |
| 348 | + CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(governanceRequest); |
| 349 | + if (circuitBreaker == null) { |
| 350 | + return executeWithInstanceBulkhead(governanceRequest, feignClient, options, feignRequest, lbRequest, |
| 351 | + lbResponse, supportedLifecycleProcessors, useRawStatusCodes); |
345 | 352 | } |
346 | 353 |
|
347 | | - if (circuitBreakerPolicy != null && !circuitBreakerPolicy.isForceClosed()) { |
348 | | - CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(governanceRequest); |
349 | | - if (circuitBreaker == null) { |
350 | | - return executeWithInstanceBulkhead(governanceRequest, feignClient, options, feignRequest, lbRequest, |
351 | | - lbResponse, supportedLifecycleProcessors, useRawStatusCodes); |
352 | | - } |
| 354 | + CheckedFunction0<Response> next = () -> executeWithInstanceBulkhead(governanceRequest, feignClient, options, |
| 355 | + feignRequest, lbRequest, lbResponse, |
| 356 | + supportedLifecycleProcessors, useRawStatusCodes); |
353 | 357 |
|
354 | | - CheckedFunction0<Response> next = () -> executeWithInstanceBulkhead(governanceRequest, feignClient, options, |
355 | | - feignRequest, lbRequest, lbResponse, |
356 | | - supportedLifecycleProcessors, useRawStatusCodes); |
| 358 | + DecorateCheckedSupplier<Response> dcs = Decorators.ofCheckedSupplier(next); |
| 359 | + dcs.withCircuitBreaker(circuitBreaker); |
357 | 360 |
|
358 | | - DecorateCheckedSupplier<Response> dcs = Decorators.ofCheckedSupplier(next); |
359 | | - dcs.withCircuitBreaker(circuitBreaker); |
| 361 | + try { |
360 | 362 | return dcs.get(); |
361 | | - } |
| 363 | + } catch (Throwable e) { |
| 364 | + if (e instanceof CallNotPermittedException) { |
| 365 | + // when instance isolated, request to pull instances. |
| 366 | + LOG.error("instance isolated [{}], [{}]", governanceRequest.instanceId(), e.getMessage()); |
| 367 | + EventManager.post(new InstanceIsolatedEvent(governanceRequest.instanceId(), |
| 368 | + Duration.parse(circuitBreakerPolicy.getWaitDurationInOpenState()))); |
| 369 | + return Response.builder().status(503).reason("instance isolated.").request(feignRequest).build(); |
| 370 | + } |
362 | 371 |
|
363 | | - return executeWithInstanceBulkhead(governanceRequest, feignClient, options, feignRequest, lbRequest, |
364 | | - lbResponse, supportedLifecycleProcessors, useRawStatusCodes); |
365 | | - } catch (Throwable e) { |
366 | | - if (e instanceof CallNotPermittedException) { |
367 | | - // when instance isolated, request to pull instances. |
368 | | - LOG.error("instance isolated [{}]", governanceRequest.instanceId()); |
369 | | - EventManager.post(new PullInstanceEvent()); |
370 | | - return Response.builder().status(503).reason("instance isolated.").request(feignRequest).build(); |
| 372 | + if (e instanceof RuntimeException) { |
| 373 | + throw (RuntimeException) e; |
| 374 | + } |
| 375 | + throw new RuntimeException(e); |
371 | 376 | } |
372 | | - |
373 | | - throw new RuntimeException(e); |
374 | 377 | } |
| 378 | + |
| 379 | + return executeWithInstanceBulkhead(governanceRequest, feignClient, options, feignRequest, lbRequest, |
| 380 | + lbResponse, supportedLifecycleProcessors, useRawStatusCodes); |
375 | 381 | } |
376 | 382 |
|
377 | 383 | private Response executeWithInstanceBulkhead(GovernanceRequestExtractor governanceRequest, Client feignClient, |
|
0 commit comments