Skip to content

Commit 7eaada6

Browse files
evelynweipeerless1024
authored andcommitted
fix: nearby cache refresh bugfix
1 parent 8352d01 commit 7eaada6

File tree

12 files changed

+247
-38
lines changed

12 files changed

+247
-38
lines changed

pkg/flow/base_flow.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222

2323
"github.com/hashicorp/go-multierror"
24+
2425
"github.com/polarismesh/polaris-go/pkg/flow/data"
2526
"github.com/polarismesh/polaris-go/pkg/log"
2627
"github.com/polarismesh/polaris-go/pkg/model"
@@ -158,31 +159,45 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry,
158159
}
159160
if trigger.EnableDstRoute {
160161
routeRule := registry.GetServiceRouteRule(dstService, false)
162+
nearbyRouteRule := registry.GetServiceNearByRouteRule(dstService, false)
163+
164+
// 同时设置自定义路由规则和就近路由规则到不同的字段
165+
// 这样两种路由规则可以独立判断是否生效
161166
if routeRule.IsInitialized() {
162167
request.SetDstRoute(routeRule)
163-
trigger.EnableDstRoute = false
164168
}
165-
nearbyRouteRule := registry.GetServiceNearByRouteRule(dstService, false)
166169
if nearbyRouteRule.IsInitialized() {
167-
request.SetDstRoute(nearbyRouteRule)
170+
request.SetDstNearbyRoute(nearbyRouteRule)
171+
}
172+
173+
// 如果至少有一个规则已初始化,则标记为已启用
174+
if routeRule.IsInitialized() || nearbyRouteRule.IsInitialized() {
168175
trigger.EnableDstRoute = false
169176
}
170-
if load && (routeRule.IsCacheLoaded() || !routeRule.IsInitialized()) {
171-
dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstRoute}
172-
log.GetBaseLogger().Debugf("value not initialized, scheduled context %s", dstRouterKey)
173-
notifier, err := registry.LoadServiceRouteRule(dstService)
174-
if err != nil {
175-
return nil, err.(model.SDKError)
176-
}
177-
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
178177

179-
dstRouterKey = &ContextKey{ServiceKey: dstService, Operation: keyDstNearByRouteRule}
180-
log.GetBaseLogger().Infof("value not initialized, scheduled context %s", dstRouterKey)
181-
notifier, err = registry.LoadServiceNearByRouteRule(dstService)
182-
if err != nil {
183-
return nil, err.(model.SDKError)
178+
// 同时检查 routeRule 和 nearbyRouteRule 的状态来决定是否需要加载
179+
// 只有当两者都未初始化或需要从缓存加载时才触发加载
180+
needLoadRouteRule := routeRule.IsCacheLoaded() || !routeRule.IsInitialized()
181+
needLoadNearbyRouteRule := nearbyRouteRule.IsCacheLoaded() || !nearbyRouteRule.IsInitialized()
182+
if load && (needLoadRouteRule || needLoadNearbyRouteRule) {
183+
if needLoadRouteRule {
184+
dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstRoute}
185+
log.GetBaseLogger().Debugf("value not initialized, scheduled context %s", dstRouterKey)
186+
notifier, err := registry.LoadServiceRouteRule(dstService)
187+
if err != nil {
188+
return nil, err.(model.SDKError)
189+
}
190+
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
191+
}
192+
if needLoadNearbyRouteRule {
193+
dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstNearByRouteRule}
194+
log.GetBaseLogger().Infof("value not initialized, scheduled context %s", dstRouterKey)
195+
notifier, err := registry.LoadServiceNearByRouteRule(dstService)
196+
if err != nil {
197+
return nil, err.(model.SDKError)
198+
}
199+
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
184200
}
185-
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
186201
}
187202
}
188203
if trigger.EnableDstRateLimit {

pkg/flow/data/object.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ func (br *BaseRequest) SetDstRoute(rule model.ServiceRule) {
138138
// do nothing
139139
}
140140

141+
// SetDstNearbyRoute 设置就近路由规则
142+
func (br *BaseRequest) SetDstNearbyRoute(rule model.ServiceRule) {
143+
// do nothing
144+
}
145+
141146
// SetDstRateLimit 设置ratelimit
142147
func (br *BaseRequest) SetDstRateLimit(rule model.ServiceRule) {
143148
// do nothing
@@ -382,6 +387,11 @@ func (c *CommonInstancesRequest) SetDstRoute(rule model.ServiceRule) {
382387
c.RouteInfo.DestRouteRule = rule
383388
}
384389

390+
// SetDstNearbyRoute 设置目标服务就近路由规则
391+
func (c *CommonInstancesRequest) SetDstNearbyRoute(rule model.ServiceRule) {
392+
c.RouteInfo.DestNearbyRouteRule = rule
393+
}
394+
385395
// SetDstRateLimit 设置目标服务限流规则
386396
func (c *CommonInstancesRequest) SetDstRateLimit(rule model.ServiceRule) {
387397
// do nothing
@@ -659,6 +669,11 @@ func (cl *CommonRateLimitRequest) SetDstRoute(rule model.ServiceRule) {
659669
// do nothing
660670
}
661671

672+
// SetDstNearbyRoute 设置目标服务就近路由规则
673+
func (cl *CommonRateLimitRequest) SetDstNearbyRoute(rule model.ServiceRule) {
674+
// do nothing
675+
}
676+
662677
// SetDstRateLimit 设置目标服务限流规则
663678
func (cl *CommonRateLimitRequest) SetDstRateLimit(rule model.ServiceRule) {
664679
cl.RateLimitRule = rule

pkg/model/engine.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ type CacheValueQuery interface {
5858
SetDstInstances(instances ServiceInstances)
5959
// SetDstRoute 设置目标服务路由规则
6060
SetDstRoute(rule ServiceRule)
61+
// SetDstNearbyRoute 设置目标服务就近路由规则
62+
SetDstNearbyRoute(rule ServiceRule)
6163
// SetDstRateLimit 设置目标服务限流规则
6264
SetDstRateLimit(rule ServiceRule)
6365
// SetSrcRoute 设置源服务路由规则

pkg/model/pb/nearby.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Tencent is pleased to support the open source community by making polaris-go available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package pb
19+
20+
import (
21+
"github.com/golang/protobuf/proto"
22+
"github.com/modern-go/reflect2"
23+
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
24+
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
25+
"google.golang.org/protobuf/types/known/wrapperspb"
26+
27+
"github.com/polarismesh/polaris-go/pkg/log"
28+
"github.com/polarismesh/polaris-go/pkg/model"
29+
)
30+
31+
// NearbyRoutingAssistant 就近路由规则解析助手
32+
type NearbyRoutingAssistant struct {
33+
}
34+
35+
// ParseRuleValue 解析出具体的规则值
36+
func (n *NearbyRoutingAssistant) ParseRuleValue(resp *apiservice.DiscoverResponse) (proto.Message, string) {
37+
var revision string
38+
serviceKey := ""
39+
if resp.Service != nil {
40+
serviceKey = resp.Service.GetNamespace().GetValue() + "/" + resp.Service.GetName().GetValue()
41+
}
42+
43+
if resp.NearbyRouteRules == nil || len(resp.NearbyRouteRules) == 0 {
44+
// 当没有就近路由规则时,使用 service.revision
45+
revision = resp.GetService().GetRevision().GetValue()
46+
log.GetBaseLogger().Debugf("NearbyRoutingAssistant.ParseRuleValue: service=%s, no nearby route rules found, using service revision=%s",
47+
serviceKey, revision)
48+
return nil, revision
49+
}
50+
51+
// 遍历规则列表,优先从开启的规则中获取revision
52+
var selectedRule *apitraffic.RouteRule
53+
var minPriorityRule *apitraffic.RouteRule
54+
55+
for _, rule := range resp.NearbyRouteRules {
56+
// 如果找到开启的规则,直接使用
57+
if rule.GetEnable() {
58+
selectedRule = rule
59+
break
60+
}
61+
62+
// 记录 priority 值最小的规则
63+
if minPriorityRule == nil || rule.GetPriority() < minPriorityRule.GetPriority() {
64+
minPriorityRule = rule
65+
}
66+
}
67+
68+
// 如果没有开启的规则,使用 priority 值最小的规则
69+
if selectedRule == nil {
70+
selectedRule = minPriorityRule
71+
}
72+
73+
// 使用服务级别的 revision,而不是单个规则的 revision
74+
// 这样可以避免因 revision 不匹配导致的循环刷新问题
75+
revision = resp.GetService().GetRevision().GetValue()
76+
routing := &apitraffic.Routing{
77+
Namespace: resp.Service.Namespace,
78+
Service: resp.Service.Name,
79+
Rules: resp.NearbyRouteRules,
80+
Revision: wrapperspb.String(revision),
81+
}
82+
83+
log.GetBaseLogger().Debugf("NearbyRoutingAssistant.ParseRuleValue: service=%s, rules=%d, revision=%s, enabled=%v, priority=%d",
84+
serviceKey, len(resp.NearbyRouteRules), revision, selectedRule.GetEnable(), selectedRule.GetPriority())
85+
86+
return routing, revision
87+
}
88+
89+
// Validate 规则校验
90+
func (n *NearbyRoutingAssistant) Validate(message proto.Message, ruleCache model.RuleCache) error {
91+
if reflect2.IsNil(message) {
92+
return nil
93+
}
94+
// 就近路由规则不需要特殊校验,复用RoutingAssistant的校验逻辑
95+
routingValue := message.(*apitraffic.Routing)
96+
assistant := &RoutingAssistant{}
97+
return assistant.Validate(routingValue, ruleCache)
98+
}
99+
100+
// SetDefault 设置默认值
101+
func (n *NearbyRoutingAssistant) SetDefault(message proto.Message) {
102+
// do nothing
103+
}

pkg/model/pb/routing.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import (
2323
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
2424
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
2525
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
26-
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
2726

27+
"github.com/polarismesh/polaris-go/pkg/log"
2828
"github.com/polarismesh/polaris-go/pkg/model"
2929
)
3030

@@ -40,22 +40,24 @@ type RoutingAssistant struct {
4040
// ParseRuleValue 解析出具体的规则值
4141
func (r *RoutingAssistant) ParseRuleValue(resp *apiservice.DiscoverResponse) (proto.Message, string) {
4242
var revision string
43+
serviceKey := ""
44+
if resp.Service != nil {
45+
serviceKey = resp.Service.GetNamespace().GetValue() + "/" + resp.Service.GetName().GetValue()
46+
}
47+
4348
routingValue := resp.Routing
4449
if nil != routingValue {
4550
revision = routingValue.GetRevision().GetValue()
46-
}
51+
inboundCount := len(routingValue.GetInbounds())
52+
outboundCount := len(routingValue.GetOutbounds())
4753

48-
if resp.Routing == nil && len(resp.NearbyRouteRules) >= 1 {
49-
rule := resp.NearbyRouteRules[0]
50-
routing := &apitraffic.Routing{
51-
Namespace: wrapperspb.String(rule.GetNamespace()),
52-
Service: resp.Service.Name,
53-
Rules: resp.NearbyRouteRules,
54-
}
54+
log.GetBaseLogger().Debugf("RoutingAssistant.ParseRuleValue: service=%s, routing rule found, revision=%s, inbounds=%d, outbounds=%d",
55+
serviceKey, revision, inboundCount, outboundCount)
5556

56-
return routing, rule.GetRevision()
57+
return routingValue, revision
5758
}
5859

60+
log.GetBaseLogger().Debugf("RoutingAssistant.ParseRuleValue: service=%s, no routing rule found", serviceKey)
5961
return routingValue, revision
6062
}
6163

pkg/model/pb/rule.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var eventTypeToAssistant = map[model.EventType]ServiceRuleAssistant{
4343
model.EventRateLimiting: &RateLimitingAssistant{},
4444
model.EventCircuitBreaker: &CircuitBreakAssistant{},
4545
model.EventFaultDetect: &FaultDetectAssistant{},
46-
model.EventNearbyRouteRule: &RoutingAssistant{},
46+
model.EventNearbyRouteRule: &NearbyRoutingAssistant{},
4747
}
4848

4949
// ServiceRuleInProto 路由规则配置对象.

pkg/plugin/servicerouter/servicerouter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type RouteInfo struct {
4545
DestService model.ServiceMetadata
4646
// 目标路由规则
4747
DestRouteRule model.ServiceRule
48+
// 目标就近路由规则
49+
DestNearbyRouteRule model.ServiceRule
4850
// 在路由匹配过程中使用到的环境变量
4951
EnvironmentVariables map[string]string
5052
// 全死全活路由插件,用于做路由兜底
@@ -87,6 +89,7 @@ func (r *RouteInfo) ClearValue() {
8789
r.DestService = nil
8890
r.SourceService = nil
8991
r.DestRouteRule = nil
92+
r.DestNearbyRouteRule = nil
9093
r.SourceService = nil
9194
r.FilterOnlyRouter = nil
9295
r.MatchRuleType = UnknownRule

pkg/plugin/servicerouter/util.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package servicerouter
2020
import (
2121
"sync"
2222

23+
"github.com/polarismesh/polaris-go/pkg/log"
2324
"github.com/polarismesh/polaris-go/pkg/model"
2425
"github.com/polarismesh/polaris-go/pkg/plugin"
2526
"github.com/polarismesh/polaris-go/pkg/plugin/common"
@@ -59,10 +60,20 @@ func processServiceRouters(ctx model.ValueContext, routers []ServiceRouter, rout
5960
svcClusters model.ServiceClusters, cluster *model.Cluster) (*RouteResult, model.SDKError) {
6061
var result *RouteResult
6162
var err error
63+
log.GetBaseLogger().Debugf("processServiceRouters: start, source=%s, dest=%s, routers=%d, instances=%d",
64+
routeInfo.SourceService, routeInfo.DestService, len(routers), cluster.GetClusterValue().GetInstancesSet(false, false).Count())
65+
6266
for _, router := range routers {
63-
if !routeInfo.IsRouterEnable(router.ID()) || !router.Enable(routeInfo, svcClusters) {
67+
routerName := router.Name()
68+
isRouterEnabled := routeInfo.IsRouterEnable(router.ID())
69+
isEnabled := router.Enable(routeInfo, svcClusters)
70+
71+
if !isRouterEnabled || !isEnabled {
72+
log.GetBaseLogger().Debugf("processServiceRouters: router=%v skipped (routerEnabled=%v, enabled=%v)",
73+
routerName, isRouterEnabled, isEnabled)
6474
continue
6575
}
76+
6677
if nil != result {
6778
// 回收,下一步即将被新值替换
6879
GetRouteResultPool().Put(result)
@@ -73,13 +84,18 @@ func processServiceRouters(ctx model.ValueContext, routers []ServiceRouter, rout
7384
cluster.PoolPut()
7485
}
7586
if err != nil {
87+
log.GetBaseLogger().Errorf("processServiceRouters: router=%v failed, error=%v", routerName, err)
7688
return nil, err.(model.SDKError)
7789
}
7890
if nil != result.RedirectDestService {
7991
// 转发规则
92+
log.GetBaseLogger().Debugf("processServiceRouters: router=%v redirect to %s",
93+
routerName, result.RedirectDestService)
8094
return result, nil
8195
}
8296
cluster = result.OutputCluster
97+
log.GetBaseLogger().Debugf("processServiceRouters: router=%v done, instances=%d, status=%s",
98+
routerName, cluster.GetClusterValue().GetInstancesSet(false, false).Count(), result.Status.String())
8399
}
84100
if !routeInfo.ignoreFilterOnlyOnEndChain {
85101
// 需要执行一遍全死全活
@@ -92,9 +108,12 @@ func processServiceRouters(ctx model.ValueContext, routers []ServiceRouter, rout
92108
cluster.PoolPut()
93109
}
94110
if err != nil {
111+
log.GetBaseLogger().Errorf("processServiceRouters: FilterOnlyRouter failed, error=%v", err)
95112
return nil, err.(model.SDKError)
96113
}
97114
cluster = result.OutputCluster
115+
log.GetBaseLogger().Debugf("processServiceRouters: FilterOnlyRouter done, instances=%d",
116+
cluster.GetClusterValue().GetInstancesSet(false, false).Count())
98117
}
99118
return result, nil
100119
}

plugin/localregistry/inmemory/model.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ func (s *CacheObject) OnServiceUpdate(event *serverconnector.ServiceEvent) {
291291
atomic.StoreInt32(&cachedValue.(*pb.ServiceInstancesInProto).CacheLoaded, 0)
292292
case model.EventRouting:
293293
atomic.StoreInt32(&cachedValue.(*pb.ServiceRuleInProto).CacheLoaded, 0)
294+
case model.EventNearbyRouteRule:
295+
atomic.StoreInt32(&cachedValue.(*pb.ServiceRuleInProto).CacheLoaded, 0)
294296
}
295297
}
296298
}

plugin/servicerouter/nearbybase/model.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ func (n *nearbyConfig) SetDefault() {
100100
if n.MatchLevel == "" {
101101
n.MatchLevel = config.DefaultMatchLevel
102102
}
103+
// 明确设置MaxMatchLevel的默认值为all,表示可以降级到所有级别
104+
if n.MaxMatchLevel == "" {
105+
n.MaxMatchLevel = config.AllLevel
106+
}
103107
if n.UnhealthyPercentToDegrade == 0 {
104108
n.UnhealthyPercentToDegrade = 100
105109
}

0 commit comments

Comments
 (0)