-
Notifications
You must be signed in to change notification settings - Fork 26
Multiple endpoints #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 21 commits
dc2407e
4b1de04
e3defa3
c744c1a
7a61e92
7b33a71
41492ff
cad609a
4406119
ea4814f
a6c50b7
cebbedb
f4e46dd
2e38ccb
1951e1c
a66edf9
6998a42
78d0a65
16be75c
aead4ea
95c7d24
ba4ea4f
339f4aa
ae94b43
7a07e22
ec49b1a
681d39c
0151244
d47f9ed
709f74e
3aa610e
8206a66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
|
||
import semantic_version as sem | ||
from six.moves import urllib_parse | ||
import copy | ||
|
||
from .apis import AuthAPI | ||
from .apis import ClusterAPI | ||
|
@@ -17,6 +18,7 @@ | |
from .apis import MaintenanceAPI | ||
from .apis import WatchAPI | ||
from .errors import UnsupportedServerVersion | ||
from .errors import Etcd3Exception | ||
from .stateful import Lease | ||
from .stateful import Lock | ||
from .stateful import Txn | ||
|
@@ -25,6 +27,8 @@ | |
from .swaggerdefs import get_spec | ||
from .utils import Etcd3Warning | ||
from .utils import log | ||
from .utils import check_param | ||
from .utils import EtcdEndpoint | ||
from .version import __version__ | ||
|
||
|
||
|
@@ -48,15 +52,59 @@ def __iter__(self): | |
DEFAULT_VERSION = '3.3.0' | ||
|
||
|
||
def retry_all_hosts(func): | ||
def wrapper(self, *args, **kwargs): | ||
errors = [] | ||
got_result = False | ||
call_endpoints = copy.copy(self.endpoints) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe a lock is better than copy if we have 3 endpoints of which the initial 2 are invalid:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two problems to consider:
A lock could work, but to prevent It should work correctly if when the while loop operating on the copy exits correctly, I add a block:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to lock the whole in fact, atomicity seems not so important here or, just lock |
||
retries = len(call_endpoints) | ||
while retries > 0: | ||
retries -= 1 | ||
endpoint = call_endpoints.pop(0) | ||
call_endpoints.append(endpoint) | ||
self.host = endpoint.host | ||
self.port = endpoint.port | ||
try: | ||
ret = func(self, *args, **kwargs) | ||
got_result = True | ||
break | ||
except Exception as e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only retry when connection fails There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just catch these errors |
||
errors.append(e) | ||
log.warning('Failed to call %s(args: %s, kwargs: %s) on ' | ||
'endpoint %s (%s)' % | ||
(func.__name__, args, kwargs, endpoint, e)) | ||
if not got_result: | ||
exception_types = [x.__class__ for x in errors] | ||
if len(set(exception_types)) == 1: | ||
log.error('Failed to call %s(args: %s, kwargs: %s) on all ' | ||
'endpoints: %s. Got errors: %s' % | ||
(func.__name__, args, kwargs, call_endpoints, errors)) | ||
raise errors[0] | ||
else: | ||
raise Etcd3Exception( | ||
'Failed to call %s(args: %s, kwargs: %s) on all ' | ||
'endpoints: %s. Got errors: %s' % | ||
(func.__name__, args, kwargs, call_endpoints, errors)) | ||
# elif len(errors) > 0: | ||
# log.warning('Got errors %s, retried successfully') | ||
return ret | ||
return wrapper | ||
|
||
|
||
class BaseClient(AuthAPI, ClusterAPI, KVAPI, LeaseAPI, MaintenanceAPI, | ||
WatchAPI, ExtraAPI, LockAPI): | ||
def __init__(self, host='127.0.0.1', port=2379, protocol='http', | ||
cert=(), verify=None, | ||
timeout=None, headers=None, user_agent=None, pool_size=30, | ||
@check_param(at_most_one_of=['port', 'endpoints'], at_least_one_of=['port', 'endpoints']) | ||
@check_param(at_most_one_of=['host', 'endpoints'], at_least_one_of=['host', 'endpoints']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, I'm gonna remove the decorators There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed in ae94b43 |
||
def __init__(self, host=None, port=None, endpoints=None, protocol='http', cert=(), | ||
verify=None, timeout=None, headers=None, user_agent=None, pool_size=30, | ||
username=None, password=None, token=None, | ||
server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION): | ||
self.host = host | ||
self.port = port | ||
if host is not None: | ||
self.endpoints = ([EtcdEndpoint(host, port)]) | ||
else: | ||
self.endpoints = endpoints | ||
self.host = self.endpoints[0].host | ||
self.port =self.endpoints[0].port | ||
self.cert = cert | ||
self.protocol = protocol | ||
if cert: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -382,3 +382,12 @@ def find_executable(executable, path=None): # pragma: no cover | |
f = os.path.join(p, execname) | ||
if os.path.isfile(f): | ||
return f | ||
|
||
|
||
class EtcdEndpoint(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this class contains only host and port but made creating a client less friendly any further design on this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better put this into |
||
def __init__(self, host='127.0.0.1', port=2379): | ||
self.host = host | ||
self.port = port | ||
|
||
def __repr__(self): | ||
return "EtcdEndpoint(host=%s, port=%s)" % (self.host, self.port) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,3 +19,4 @@ m2r==0.2.1 | |
codecov>=1.4.0 | ||
codacy-coverage==1.3.11 | ||
twine==1.13.0 | ||
docker==3.7.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
import six | ||
from etcd3.client import Client | ||
import pytest | ||
from .etcd_cluster import EtcdTestCluster | ||
|
||
|
||
@pytest.fixture(scope='session') | ||
def etcd_cluster(request): | ||
# function_name = request.function.__name__ | ||
# function_name = re.sub(r"[^a-zA-Z0-9]+", "", function_name) | ||
cluster = EtcdTestCluster(ident='cleartext', size=3) | ||
|
||
def fin(): | ||
cluster.down() | ||
request.addfinalizer(fin) | ||
cluster.up() | ||
cluster.wait_ready() | ||
|
||
return cluster | ||
|
||
|
||
@pytest.fixture(scope='session') | ||
def etcd_cluster_ssl(request): | ||
# function_name = request.function.__name__ | ||
# function_name = re.sub(r"[^a-zA-Z0-9]+", "", function_name) | ||
cluster = EtcdTestCluster(ident='ssl', size=3, ssl=True) | ||
|
||
def fin(): | ||
cluster.down() | ||
request.addfinalizer(fin) | ||
cluster.up() | ||
cluster.wait_ready() | ||
|
||
return cluster | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def client(etcd_cluster): | ||
""" | ||
init Etcd3Client, close its connection-pool when teardown | ||
""" | ||
# _, p, _ = docker_run_etcd_main() | ||
c = Client(endpoints=etcd_cluster.get_endpoints(), | ||
protocol='https' if etcd_cluster.ssl else 'http') | ||
yield c | ||
c.close() | ||
|
||
|
||
@pytest.fixture | ||
def clear(etcd_cluster): | ||
def _clear(): | ||
etcd_cluster.etcdctl('del --from-key ""') | ||
return _clear | ||
|
||
|
||
def teardown_auth(etcd_cluster): # pragma: no cover | ||
""" | ||
disable auth, delete all users and roles | ||
""" | ||
etcd_cluster.etcdctl('--user root:root auth disable') | ||
etcd_cluster.etcdctl('--user root:changed auth disable') | ||
for i in (etcd_cluster.etcdctl('role list') or '').splitlines(): | ||
if six.PY3: # pragma: no cover | ||
i = six.text_type(i, encoding='utf-8') | ||
etcd_cluster.etcdctl('role delete %s' % i) | ||
for i in (etcd_cluster.etcdctl('user list') or '').splitlines(): | ||
if six.PY3: # pragma: no cover | ||
i = six.text_type(i, encoding='utf-8') | ||
etcd_cluster.etcdctl('user delete %s' % i) | ||
|
||
|
||
def enable_auth(etcd_cluster): # pragma: no cover | ||
etcd_cluster.etcdctl('user add root:root') | ||
etcd_cluster.etcdctl('role add root') | ||
etcd_cluster.etcdctl('user grant root root') | ||
etcd_cluster.etcdctl('auth enable') |
Uh oh!
There was an error while loading. Please reload this page.