diff --git a/pyproject.toml b/pyproject.toml index 7f9f425..19df11f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,9 @@ Issues = "https://github.com/apache/dubbo/issues" zookeeper = [ "kazoo>=2.10.0", ] +nacos = [ + "nacos-sdk-python>=2.0.6", +] ### Hatch settings ### [tool.hatch.version] diff --git a/src/dubbo/__init__.py b/src/dubbo/__init__.py index ded7e25..12de695 100644 --- a/src/dubbo/__init__.py +++ b/src/dubbo/__init__.py @@ -17,5 +17,6 @@ from .bootstrap import Dubbo from .client import Client from .server import Server +from .__about__ import __version__ -__all__ = ["Dubbo", "Client", "Server"] +__all__ = ["Dubbo", "Client", "Server", "__version__"] diff --git a/src/dubbo/configs.py b/src/dubbo/configs.py index f7a85d7..6258140 100644 --- a/src/dubbo/configs.py +++ b/src/dubbo/configs.py @@ -491,6 +491,7 @@ class RegistryConfig(AbstractConfig): "_load_balance", "_group", "_version", + "_namespace", ] def __init__( @@ -503,6 +504,7 @@ def __init__( load_balance: Optional[str] = None, group: Optional[str] = None, version: Optional[str] = None, + namespace: Optional[str] = None, ): """ Initialize the registry configuration. @@ -522,6 +524,8 @@ def __init__( :type group: Optional[str] :param version: The version of the registry. :type version: Optional[str] + :param namespace: The namespace of the registry. + :type namespace: Optional[str] """ super().__init__() @@ -533,6 +537,7 @@ def __init__( self._load_balance = load_balance self._group = group self._version = version + self._namespace = namespace @property def protocol(self) -> str: @@ -677,6 +682,24 @@ def version(self, version: str) -> None: :type version: str """ self._version = version + + @property + def namespace(self) -> Optional[str]: + """ + Get the namespace of the registry. + :return: The namespace of the registry. + :rtype: Optional[str] + """ + return self._namespace + + @namespace.setter + def namespace(self, namespace: str) -> None: + """ + Set the namespace of the registry. + :param namespace: The namespace of the registry. + :type namespace: str + """ + self._namespace = namespace def to_url(self) -> URL: """ @@ -687,10 +710,13 @@ def to_url(self) -> URL: parameters = {} if self.load_balance: parameters[registry_constants.LOAD_BALANCE_KEY] = self.load_balance + if self.namespace: + parameters[registry_constants.NAMESPACE_KEY] = self.namespace if self.group: parameters[config_constants.GROUP] = self.group if self.version: parameters[config_constants.VERSION] = self.version + return URL( scheme=self.protocol, @@ -721,6 +747,7 @@ def from_url(cls, url: Union[str, URL]) -> "RegistryConfig": load_balance=url.parameters.get(registry_constants.LOAD_BALANCE_KEY), group=url.parameters.get(config_constants.GROUP), version=url.parameters.get(config_constants.VERSION), + namespace=url.parameters.get(registry_constants.NAMESPACE_KEY), ) diff --git a/src/dubbo/constants/registry_constants.py b/src/dubbo/constants/registry_constants.py index 6ac69a4..14e17d1 100644 --- a/src/dubbo/constants/registry_constants.py +++ b/src/dubbo/constants/registry_constants.py @@ -22,3 +22,4 @@ LOAD_BALANCE_KEY = "loadbalance" +NAMESPACE_KEY = "namespace" diff --git a/src/dubbo/extension/registries.py b/src/dubbo/extension/registries.py index cf23ae7..d834a0d 100644 --- a/src/dubbo/extension/registries.py +++ b/src/dubbo/extension/registries.py @@ -54,6 +54,7 @@ class ExtendedRegistry: interface=RegistryFactory, impls={ "zookeeper": "dubbo.registry.zookeeper.zk_registry.ZookeeperRegistryFactory", + "nacos": "dubbo.registry.nacos.nacos_registry.NacosRegistryFactory", }, ) diff --git a/src/dubbo/registry/nacos/__init__.py b/src/dubbo/registry/nacos/__init__.py new file mode 100644 index 0000000..bcba37a --- /dev/null +++ b/src/dubbo/registry/nacos/__init__.py @@ -0,0 +1,15 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/dubbo/registry/nacos/nacos_registry.py b/src/dubbo/registry/nacos/nacos_registry.py new file mode 100644 index 0000000..64e8d22 --- /dev/null +++ b/src/dubbo/registry/nacos/nacos_registry.py @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dubbo + +from nacos import NacosClient +from nacos.timer import NacosTimer, NacosTimerManager + +from dubbo.constants import common_constants, registry_constants +from dubbo.registry import NotifyListener, Registry, RegistryFactory +from dubbo.url import URL +from dubbo.loggers import loggerFactory + +_LOGGER = loggerFactory.get_logger() + +DEFAULT_APPLICATION = "DEFAULT" + + +class NacosSubscriber: + """ + Nacos instance subscriber + """ + + def __init__( + self, nacos_client: NacosClient, service_name: str, listener: NotifyListener + ): + self._nacos_client = nacos_client + self._service_name = service_name + self._listener = listener + self._timer_manager = NacosTimerManager() + self._subscribed = False + + def refresh_instances(self): + """ + Refresh nacos instances + """ + if not self._subscribed: + return + + try: + instances = self._nacos_client.list_naming_instance(self._service_name) + hosts = instances["hosts"] + urls = [ + URL(scheme="tri", host=h["ip"], port=h["port"]) + for h in hosts + if h["enabled"] + ] + self._listener.notify(urls=urls) + except Exception as e: + _LOGGER.error("nacos subscriber refresh_instance failed: %s", e) + + def subscribe(self): + """ + Start timer to watch instances + """ + if not self._timer_manager.all_timers().get("refresh_instances"): + self._timer_manager.add_timer( + NacosTimer("refresh_instances", self.refresh_instances, interval=7) + ) + self._timer_manager.execute() + self._subscribed = True + self.refresh_instances() + + def unsubscribe(self): + self._subscribed = False + + +def _init_nacos_client(url: URL) -> NacosClient: + server_address = f"{url.host}:{url.port if url.port else 8848}" + parameters = url.parameters + + endpoint = parameters.get("endpoint") + namespace = parameters.get(registry_constants.NAMESPACE_KEY) + username = url.username + password = url.password + + return NacosClient( + server_addresses=server_address, + endpoint=endpoint, + namespace=namespace, + username=username, + password=password, + ) + + +def _build_nacos_service_name(url: URL): + service_name = url.parameters.get(common_constants.SERVICE_KEY) + return f"providers:{service_name}::" + + +class NacosRegistry(Registry): + + def __init__(self, url: URL): + self._url = url + self._nacos_client: NacosClient = _init_nacos_client(url) + self._service_subscriber_mapping = {} + + def _service_subscriber( + self, service_name: str, listener: NotifyListener + ) -> NacosSubscriber: + if service_name not in self._service_subscriber_mapping: + self._service_subscriber_mapping[service_name] = NacosSubscriber( + self._nacos_client, service_name=service_name, listener=listener + ) + + return self._service_subscriber_mapping[service_name] + + def register(self, url: URL) -> None: + ip = url.host + port = url.port + + nacos_service_name = _build_nacos_service_name(url) + + metadata = { + "side": "provider", + "release": f"{dubbo.__version__}_py", + "protocol": "tri", + "application": DEFAULT_APPLICATION, + "category": "providers", + "enabled": "true", + "disabled": "false", + } + self._nacos_client.add_naming_instance( + nacos_service_name, + ip, + port, + DEFAULT_APPLICATION, + metadata=metadata, + heartbeat_interval=1, + ) + + def unregister(self, url: URL) -> None: + ip = url.host + port = url.port + nacos_service_name = _build_nacos_service_name(url) + + self._nacos_client.remove_naming_instance( + nacos_service_name, ip=ip, port=port, cluster_name=DEFAULT_APPLICATION + ) + + def subscribe(self, url: URL, listener: NotifyListener) -> None: + nacos_service_name = _build_nacos_service_name(url) + + subscriber = self._service_subscriber(nacos_service_name, listener) + subscriber.subscribe() + + def unsubscribe(self, url: URL, listener: NotifyListener) -> None: + nacos_service_name = _build_nacos_service_name(url) + + subscriber = self._service_subscriber(nacos_service_name, listener) + subscriber.unsubscribe() + listener.notify([]) + + def lookup(self, url: URL): + pass + + def get_url(self) -> URL: + return self._url + + def is_available(self) -> bool: + return self._nacos_client is not None + + def destroy(self) -> None: + pass + + +class NacosRegistryFactory(RegistryFactory): + + def get_registry(self, url: URL) -> Registry: + return NacosRegistry(url)