Skip to content

Commit e1ab616

Browse files
sak0haozhicui
andauthored
feat: nearbyRouteRule v2 (#230)
Signed-off-by: haozhicui <[email protected]> Co-authored-by: haozhicui <[email protected]>
1 parent 8bf19b6 commit e1ab616

File tree

9 files changed

+125
-35
lines changed

9 files changed

+125
-35
lines changed

pkg/flow/base_flow.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"fmt"
2222

2323
"github.com/hashicorp/go-multierror"
24-
2524
"github.com/polarismesh/polaris-go/pkg/flow/data"
2625
"github.com/polarismesh/polaris-go/pkg/log"
2726
"github.com/polarismesh/polaris-go/pkg/model"
@@ -163,6 +162,11 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry,
163162
request.SetDstRoute(routeRule)
164163
trigger.EnableDstRoute = false
165164
}
165+
nearbyRouteRule := registry.GetServiceNearByRouteRule(dstService, false)
166+
if nearbyRouteRule.IsInitialized() {
167+
request.SetDstRoute(nearbyRouteRule)
168+
trigger.EnableDstRoute = false
169+
}
166170
if load && (routeRule.IsCacheLoaded() || !routeRule.IsInitialized()) {
167171
dstRouterKey := &ContextKey{ServiceKey: dstService, Operation: keyDstRoute}
168172
log.GetBaseLogger().Debugf("value not initialized, scheduled context %s", dstRouterKey)
@@ -171,6 +175,14 @@ func getAndLoadCacheValues(registry localregistry.LocalRegistry,
171175
return nil, err.(model.SDKError)
172176
}
173177
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
178+
179+
dstRouterKey = &ContextKey{ServiceKey: dstService, Operation: keyDstNearByRouteRule}
180+
log.GetBaseLogger().Infof("value not initialized, scheduled context %s", dstRouterKey)
181+
notifier, err = registry.LoadServiceNearByRouteRule(dstService)
182+
if err != nil {
183+
return nil, err.(model.SDKError)
184+
}
185+
notifiers = append(notifiers, NewSingleNotifyContext(dstRouterKey, notifier))
174186
}
175187
}
176188
if trigger.EnableDstRateLimit {

pkg/flow/notify.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ import (
3030
)
3131

3232
const (
33-
keySourceRoute = "sourceRoute"
34-
keyDstRoute = "destinationRoute"
35-
keyDstRateLimit = "destinationRateLimit"
36-
keyDstInstances = "destinationInstances"
37-
keyDstServices = "destinationServices"
33+
keySourceRoute = "sourceRoute"
34+
keyDstRoute = "destinationRoute"
35+
keyDstRateLimit = "destinationRateLimit"
36+
keyDstInstances = "destinationInstances"
37+
keyDstServices = "destinationServices"
38+
keyDstNearByRouteRule = "destinationNearByRouteRule"
3839
)
3940

4041
// ContextKey 上下文标识

pkg/model/core.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ const (
3939
EventCircuitBreaker EventType = 0x2006
4040
// EventFaultDetect 探测规则
4141
EventFaultDetect EventType = 0x2007
42+
//EventNearbyRouteRule 就近路由事件
43+
EventNearbyRouteRule EventType = 0x2008
4244
)
4345

4446
// RegistryValue 存储于sdk缓存中的对象,包括服务实例和服务路由
@@ -66,21 +68,23 @@ func (e EventType) String() string {
6668
var (
6769
// 路由规则到日志回显
6870
eventTypeToPresent = map[EventType]string{
69-
EventInstances: "instance",
70-
EventRouting: "routing",
71-
EventRateLimiting: "rate_limiting",
72-
EventServices: "services",
73-
EventCircuitBreaker: "circuit_breaker",
74-
EventFaultDetect: "fault_detect",
71+
EventInstances: "instance",
72+
EventRouting: "routing",
73+
EventRateLimiting: "rate_limiting",
74+
EventServices: "services",
75+
EventCircuitBreaker: "circuit_breaker",
76+
EventFaultDetect: "fault_detect",
77+
EventNearbyRouteRule: "nearby_route_rule",
7578
}
7679

7780
presentToEventType = map[string]EventType{
78-
"instance": EventInstances,
79-
"routing": EventRouting,
80-
"rate_limiting": EventRateLimiting,
81-
"services": EventServices,
82-
"circuit_breaker": EventCircuitBreaker,
83-
"fault_detect": EventFaultDetect,
81+
"instance": EventInstances,
82+
"routing": EventRouting,
83+
"rate_limiting": EventRateLimiting,
84+
"services": EventServices,
85+
"circuit_breaker": EventCircuitBreaker,
86+
"fault_detect": EventFaultDetect,
87+
"nearby_route_rule": EventNearbyRouteRule,
8488
}
8589
)
8690

pkg/model/pb/routing.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
2424
apiservice "github.com/polarismesh/specification/source/go/api/v1/service_manage"
2525
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
26+
wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
2627

2728
"github.com/polarismesh/polaris-go/pkg/model"
2829
)
@@ -43,6 +44,18 @@ func (r *RoutingAssistant) ParseRuleValue(resp *apiservice.DiscoverResponse) (pr
4344
if nil != routingValue {
4445
revision = routingValue.GetRevision().GetValue()
4546
}
47+
48+
if resp.Routing == nil && len(resp.NearbyRouteRules) >= 1 {
49+
rule := resp.NearbyRouteRules[0]
50+
routing := &apitraffic.Routing{
51+
Namespace: wrapperspb.String(rule.GetNamespace()),
52+
Service: resp.Service.Name,
53+
Rules: resp.NearbyRouteRules,
54+
}
55+
56+
return routing, rule.GetRevision()
57+
}
58+
4659
return routingValue, revision
4760
}
4861

pkg/model/pb/rule.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ type ServiceRuleAssistant interface {
3939
}
4040

4141
var eventTypeToAssistant = map[model.EventType]ServiceRuleAssistant{
42-
model.EventRouting: &RoutingAssistant{},
43-
model.EventRateLimiting: &RateLimitingAssistant{},
44-
model.EventCircuitBreaker: &CircuitBreakAssistant{},
45-
model.EventFaultDetect: &FaultDetectAssistant{},
42+
model.EventRouting: &RoutingAssistant{},
43+
model.EventRateLimiting: &RateLimitingAssistant{},
44+
model.EventCircuitBreaker: &CircuitBreakAssistant{},
45+
model.EventFaultDetect: &FaultDetectAssistant{},
46+
model.EventNearbyRouteRule: &RoutingAssistant{},
4647
}
4748

4849
// ServiceRuleInProto 路由规则配置对象.

pkg/model/pb/validate.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,23 @@ import (
2929

3030
var (
3131
eventTypeToProtoRequestType = map[model.EventType]apiservice.DiscoverRequest_DiscoverRequestType{
32-
model.EventInstances: apiservice.DiscoverRequest_INSTANCE,
33-
model.EventRouting: apiservice.DiscoverRequest_ROUTING,
34-
model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT,
35-
model.EventServices: apiservice.DiscoverRequest_SERVICES,
36-
model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER,
37-
model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR,
32+
model.EventInstances: apiservice.DiscoverRequest_INSTANCE,
33+
model.EventRouting: apiservice.DiscoverRequest_ROUTING,
34+
model.EventRateLimiting: apiservice.DiscoverRequest_RATE_LIMIT,
35+
model.EventServices: apiservice.DiscoverRequest_SERVICES,
36+
model.EventCircuitBreaker: apiservice.DiscoverRequest_CIRCUIT_BREAKER,
37+
model.EventFaultDetect: apiservice.DiscoverRequest_FAULT_DETECTOR,
38+
model.EventNearbyRouteRule: apiservice.DiscoverRequest_NEARBY_ROUTE_RULE,
3839
}
3940

4041
protoRespTypeToEventType = map[apiservice.DiscoverResponse_DiscoverResponseType]model.EventType{
41-
apiservice.DiscoverResponse_INSTANCE: model.EventInstances,
42-
apiservice.DiscoverResponse_ROUTING: model.EventRouting,
43-
apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting,
44-
apiservice.DiscoverResponse_SERVICES: model.EventServices,
45-
apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker,
46-
apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect,
42+
apiservice.DiscoverResponse_INSTANCE: model.EventInstances,
43+
apiservice.DiscoverResponse_ROUTING: model.EventRouting,
44+
apiservice.DiscoverResponse_RATE_LIMIT: model.EventRateLimiting,
45+
apiservice.DiscoverResponse_SERVICES: model.EventServices,
46+
apiservice.DiscoverResponse_CIRCUIT_BREAKER: model.EventCircuitBreaker,
47+
apiservice.DiscoverResponse_FAULT_DETECTOR: model.EventFaultDetect,
48+
apiservice.DiscoverResponse_NEARBY_ROUTE_RULE: model.EventNearbyRouteRule,
4749
}
4850
)
4951

pkg/plugin/localregistry/localregistry.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ type RuleRegistry interface {
126126
GetServiceRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
127127
// LoadServiceRouteRule 非阻塞发起配置加载
128128
LoadServiceRouteRule(key *model.ServiceKey) (*common.Notifier, error)
129+
// GetServiceNearByRouteRule 非阻塞获取就近路由信息
130+
GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
131+
// LoadServiceNearByRouteRule 非阻塞发起就近路由加载
132+
LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error)
129133
// GetServiceRateLimitRule 非阻塞获取限流规则
130134
GetServiceRateLimitRule(key *model.ServiceKey, includeCache bool) model.ServiceRule
131135
// LoadServiceRateLimitRule 非阻塞发起限流规则加载

plugin/localregistry/inmemory/inmemory.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func (g *LocalCache) Init(ctx *plugin.InitContext) error {
161161
g.eventToCacheHandlers = make(map[model.EventType]CacheHandlers, 0)
162162
g.eventToCacheHandlers[model.EventInstances] = g.newServiceCacheHandler()
163163
g.eventToCacheHandlers[model.EventRouting] = g.newRuleCacheHandler()
164+
g.eventToCacheHandlers[model.EventNearbyRouteRule] = g.newNearByRouteRuleCacheHandler()
164165
g.eventToCacheHandlers[model.EventRateLimiting] = g.newRateLimitCacheHandler()
165166
g.eventToCacheHandlers[model.EventCircuitBreaker] = g.newCircuitBreakerCacheHandler()
166167
g.eventToCacheHandlers[model.EventFaultDetect] = g.newFaultDetectCacheHandler()
@@ -654,6 +655,14 @@ func (g *LocalCache) GetServiceRouteRule(key *model.ServiceKey, includeCache boo
654655
return svcRule
655656
}
656657

658+
// GetServiceNearByRouteRule 非阻塞获取就近路由信息
659+
func (g *LocalCache) GetServiceNearByRouteRule(key *model.ServiceKey, includeCache bool) model.ServiceRule {
660+
svcEventKey := poolGetSvcEventKey(key, model.EventNearbyRouteRule)
661+
svcRule := g.GetServiceRule(svcEventKey, includeCache)
662+
poolPutSvcEventKey(svcEventKey)
663+
return svcRule
664+
}
665+
657666
// GetServicesByMeta 非阻塞获取服务列表
658667
func (g *LocalCache) GetServicesByMeta(key *model.ServiceKey, includeCache bool) model.Services {
659668
svcEventKey := poolGetSvcEventKey(key, model.EventServices)
@@ -725,6 +734,15 @@ func (g *LocalCache) newRuleCacheHandler() CacheHandlers {
725734
}
726735
}
727736

737+
// 创建就近路由规则缓存操作回调集合
738+
func (g *LocalCache) newNearByRouteRuleCacheHandler() CacheHandlers {
739+
return CacheHandlers{
740+
CompareMessage: compareResource,
741+
MessageToCacheValue: messageToServiceRule,
742+
OnEventDeleted: g.deleteRule,
743+
}
744+
}
745+
728746
// 创建限流规则缓存操作回调集合
729747
func (g *LocalCache) newRateLimitCacheHandler() CacheHandlers {
730748
return CacheHandlers{
@@ -1011,6 +1029,17 @@ func (g *LocalCache) LoadServices(key *model.ServiceKey) (*common.Notifier, erro
10111029
})
10121030
}
10131031

1032+
// LoadServiceNearByRouteRule 非阻塞发起配置加载
1033+
func (g *LocalCache) LoadServiceNearByRouteRule(key *model.ServiceKey) (*common.Notifier, error) {
1034+
return g.LoadServiceRule(&model.ServiceEventKey{
1035+
ServiceKey: model.ServiceKey{
1036+
Namespace: key.Namespace,
1037+
Service: key.Service,
1038+
},
1039+
Type: model.EventNearbyRouteRule,
1040+
})
1041+
}
1042+
10141043
// LoadServiceRateLimitRule 非阻塞发起限流规则加载
10151044
func (g *LocalCache) LoadServiceRateLimitRule(key *model.ServiceKey) (*common.Notifier, error) {
10161045
return g.LoadServiceRule(&model.ServiceEventKey{

plugin/servicerouter/nearbybase/nearby.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package nearbybase
2020
import (
2121
"context"
2222
"fmt"
23+
"github.com/modern-go/reflect2"
24+
apitraffic "github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
2325
"strings"
2426
"time"
2527

@@ -94,7 +96,29 @@ const (
9496
// Enable 当前是否需要启动该服务路由插件
9597
func (g *NearbyBasedInstancesFilter) Enable(routeInfo *servicerouter.RouteInfo, clusters model.ServiceClusters) bool {
9698
location := g.valueCtx.GetCurrentLocation().GetLocation()
97-
return nil != location && clusters.IsNearbyEnabled()
99+
return nil != location && (clusters.IsNearbyEnabled() || g.enableNearByRouteRules(routeInfo))
100+
}
101+
102+
func ruleEmpty(svcRule model.ServiceRule) bool {
103+
return reflect2.IsNil(svcRule) || reflect2.IsNil(svcRule.GetValue()) || svcRule.GetValidateError() != nil
104+
}
105+
106+
func (g *NearbyBasedInstancesFilter) enableNearByRouteRules(routeInfo *servicerouter.RouteInfo) bool {
107+
if ruleEmpty(routeInfo.DestRouteRule) {
108+
return false
109+
}
110+
111+
rt, ok := routeInfo.DestRouteRule.GetValue().(*apitraffic.Routing)
112+
if !ok {
113+
return false
114+
}
115+
for _, rule := range rt.Rules {
116+
if rule.Enable {
117+
return true
118+
}
119+
}
120+
121+
return false
98122
}
99123

100124
// 一个匹配级别的cluster的健康和全部实例数量

0 commit comments

Comments
 (0)