Skip to content

feat: Add nacos registry support. #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ Issues = "https://github.com/apache/dubbo/issues"
zookeeper = [
"kazoo>=2.10.0",
]
nacos = [
"nacos-sdk-python>=2.0.6",
]

### Hatch settings ###
[tool.hatch.version]
Expand Down
3 changes: 2 additions & 1 deletion src/dubbo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
from .bootstrap import Dubbo
from .client import Client
from .server import Server
from .__about__ import __version__

__all__ = ["Dubbo", "Client", "Server"]
__all__ = ["Dubbo", "Client", "Server", "__version__"]
27 changes: 27 additions & 0 deletions src/dubbo/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ class RegistryConfig(AbstractConfig):
"_load_balance",
"_group",
"_version",
"_namespace",
]

def __init__(
Expand All @@ -503,6 +504,7 @@ def __init__(
load_balance: Optional[str] = None,
group: Optional[str] = None,
version: Optional[str] = None,
namespace: Optional[str] = None,
):
"""
Initialize the registry configuration.
Expand All @@ -522,6 +524,8 @@ def __init__(
:type group: Optional[str]
:param version: The version of the registry.
:type version: Optional[str]
:param namespace: The namespace of the registry.
:type namespace: Optional[str]
"""
super().__init__()

Expand All @@ -533,6 +537,7 @@ def __init__(
self._load_balance = load_balance
self._group = group
self._version = version
self._namespace = namespace

@property
def protocol(self) -> str:
Expand Down Expand Up @@ -677,6 +682,24 @@ def version(self, version: str) -> None:
:type version: str
"""
self._version = version

@property
def namespace(self) -> Optional[str]:
"""
Get the namespace of the registry.
:return: The namespace of the registry.
:rtype: Optional[str]
"""
return self._namespace

@namespace.setter
def namespace(self, namespace: str) -> None:
"""
Set the namespace of the registry.
:param namespace: The namespace of the registry.
:type namespace: str
"""
self._namespace = namespace

def to_url(self) -> URL:
"""
Expand All @@ -687,10 +710,13 @@ def to_url(self) -> URL:
parameters = {}
if self.load_balance:
parameters[registry_constants.LOAD_BALANCE_KEY] = self.load_balance
if self.namespace:
parameters[registry_constants.NAMESPACE_KEY] = self.namespace
if self.group:
parameters[config_constants.GROUP] = self.group
if self.version:
parameters[config_constants.VERSION] = self.version


return URL(
scheme=self.protocol,
Expand Down Expand Up @@ -721,6 +747,7 @@ def from_url(cls, url: Union[str, URL]) -> "RegistryConfig":
load_balance=url.parameters.get(registry_constants.LOAD_BALANCE_KEY),
group=url.parameters.get(config_constants.GROUP),
version=url.parameters.get(config_constants.VERSION),
namespace=url.parameters.get(registry_constants.NAMESPACE_KEY),
)


Expand Down
1 change: 1 addition & 0 deletions src/dubbo/constants/registry_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@


LOAD_BALANCE_KEY = "loadbalance"
NAMESPACE_KEY = "namespace"
1 change: 1 addition & 0 deletions src/dubbo/extension/registries.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ExtendedRegistry:
interface=RegistryFactory,
impls={
"zookeeper": "dubbo.registry.zookeeper.zk_registry.ZookeeperRegistryFactory",
"nacos": "dubbo.registry.nacos.nacos_registry.NacosRegistryFactory",
},
)

Expand Down
15 changes: 15 additions & 0 deletions src/dubbo/registry/nacos/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
183 changes: 183 additions & 0 deletions src/dubbo/registry/nacos/nacos_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dubbo

from nacos import NacosClient
from nacos.timer import NacosTimer, NacosTimerManager

from dubbo.constants import common_constants, registry_constants
from dubbo.registry import NotifyListener, Registry, RegistryFactory
from dubbo.url import URL
from dubbo.loggers import loggerFactory

_LOGGER = loggerFactory.get_logger()

DEFAULT_APPLICATION = "DEFAULT"


class NacosSubscriber:
"""
Nacos instance subscriber
"""

def __init__(
self, nacos_client: NacosClient, service_name: str, listener: NotifyListener
):
self._nacos_client = nacos_client
self._service_name = service_name
self._listener = listener
self._timer_manager = NacosTimerManager()
self._subscribed = False

def refresh_instances(self):
"""
Refresh nacos instances
"""
if not self._subscribed:
return

try:
instances = self._nacos_client.list_naming_instance(self._service_name)
hosts = instances["hosts"]
urls = [
URL(scheme="tri", host=h["ip"], port=h["port"])
for h in hosts
if h["enabled"]
]
self._listener.notify(urls=urls)
except Exception as e:
_LOGGER.error("nacos subscriber refresh_instance failed: %s", e)

def subscribe(self):
"""
Start timer to watch instances
"""
if not self._timer_manager.all_timers().get("refresh_instances"):
self._timer_manager.add_timer(
NacosTimer("refresh_instances", self.refresh_instances, interval=7)
)
self._timer_manager.execute()
self._subscribed = True
self.refresh_instances()

def unsubscribe(self):
self._subscribed = False


def _init_nacos_client(url: URL) -> NacosClient:
server_address = f"{url.host}:{url.port if url.port else 8848}"
parameters = url.parameters

endpoint = parameters.get("endpoint")
namespace = parameters.get(registry_constants.NAMESPACE_KEY)
username = url.username
password = url.password

return NacosClient(
server_addresses=server_address,
endpoint=endpoint,
namespace=namespace,
username=username,
password=password,
)


def _build_nacos_service_name(url: URL):
service_name = url.parameters.get(common_constants.SERVICE_KEY)
return f"providers:{service_name}::"


class NacosRegistry(Registry):

def __init__(self, url: URL):
self._url = url
self._nacos_client: NacosClient = _init_nacos_client(url)
self._service_subscriber_mapping = {}

def _service_subscriber(
self, service_name: str, listener: NotifyListener
) -> NacosSubscriber:
if service_name not in self._service_subscriber_mapping:
self._service_subscriber_mapping[service_name] = NacosSubscriber(
self._nacos_client, service_name=service_name, listener=listener
)

return self._service_subscriber_mapping[service_name]

def register(self, url: URL) -> None:
ip = url.host
port = url.port

nacos_service_name = _build_nacos_service_name(url)

metadata = {
"side": "provider",
"release": f"{dubbo.__version__}_py",
"protocol": "tri",
"application": DEFAULT_APPLICATION,
"category": "providers",
"enabled": "true",
"disabled": "false",
}
self._nacos_client.add_naming_instance(
nacos_service_name,
ip,
port,
DEFAULT_APPLICATION,
metadata=metadata,
heartbeat_interval=1,
)

def unregister(self, url: URL) -> None:
ip = url.host
port = url.port
nacos_service_name = _build_nacos_service_name(url)

self._nacos_client.remove_naming_instance(
nacos_service_name, ip=ip, port=port, cluster_name=DEFAULT_APPLICATION
)

def subscribe(self, url: URL, listener: NotifyListener) -> None:
nacos_service_name = _build_nacos_service_name(url)

subscriber = self._service_subscriber(nacos_service_name, listener)
subscriber.subscribe()

def unsubscribe(self, url: URL, listener: NotifyListener) -> None:
nacos_service_name = _build_nacos_service_name(url)

subscriber = self._service_subscriber(nacos_service_name, listener)
subscriber.unsubscribe()
listener.notify([])

def lookup(self, url: URL):
pass

def get_url(self) -> URL:
return self._url

def is_available(self) -> bool:
return self._nacos_client is not None

def destroy(self) -> None:
pass


class NacosRegistryFactory(RegistryFactory):

def get_registry(self, url: URL) -> Registry:
return NacosRegistry(url)