Skip to content

Commit 8b8a827

Browse files
committed
Merge remote-tracking branch 'origin/terraform' into devel
2 parents c9af369 + 593877a commit 8b8a827

File tree

8 files changed

+352
-70
lines changed

8 files changed

+352
-70
lines changed

configs/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ occopus_endpoint: 'http://localhost:5000'
1111
occopus_infra_name: 'micado_worker_infra'
1212
occopus_worker_name: 'worker'
1313

14+
# Terraform options
15+
terraform_path: '/var/lib/micado/terraform/submitter'
16+
terraform_container_name: 'terraform'
17+
1418
logging:
1519
version: 1
1620
root:

handle_k8s.py

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
import kubernetes.client
2-
import kubernetes.config
31
import logging
42
import pk_config
53
import time
64

5+
import pykube
6+
77
dryrun_id='k8s'
88
MASTER = 'node-role.kubernetes.io/master'
9-
NOTREADY = 'node.kubernetes.io/unreachable'
9+
10+
kube = pykube.HTTPClient(pykube.KubeConfig.from_file("/root/.kube/config"))
11+
12+
class Deploymentv1(pykube.Deployment):
13+
version = "apps/v1"
1014

1115
def query_list_of_nodes(endpoint,worker_name='micado-worker',status='ready'):
1216
log=logging.getLogger('pk_k8s')
@@ -18,22 +22,30 @@ def query_list_of_nodes(endpoint,worker_name='micado-worker',status='ready'):
1822
a['Addr']='127.0.0.1'
1923
list_of_nodes.append(a.copy())
2024
return list_of_nodes
21-
kubernetes.config.load_kube_config()
22-
client = kubernetes.client.CoreV1Api()
25+
2326
try:
24-
nodes = [x for x in client.list_node().items if MASTER not in x.metadata.labels]
2527
if status=='ready':
26-
nodes = [x for x in nodes if NOTREADY not in [y.key for y in x.spec.taints or []]]
27-
nodes = [x for x in nodes if x.metadata.labels.get('micado.eu/node_type') == worker_name]
28+
query = pykube.Node.objects(kube).filter(selector={"micado.eu/node_type__in": {worker_name}})
29+
nodes = [x for x in query if "taints" not in x.obj["spec"]]
2830
elif status=='down':
29-
nodes = [x for x in nodes if NOTREADY in [y.key for y in x.spec.taints or []]]
31+
nodes = []
32+
worker_nodes = [x for x in pykube.Node.objects(kube) if MASTER not in x.labels]
33+
for node in worker_nodes:
34+
ready_condition = [
35+
x.iteritems()
36+
for x in node.obj["status"]["conditions"]
37+
if x.get("type") == "Ready"
38+
][0]
39+
if ("status", "Unknown") in ready_condition:
40+
nodes.append(node)
3041
for n in nodes:
3142
a = {}
32-
a['ID']=n.metadata.name
33-
a['Addr']=n.status.addresses[0].address
43+
n.reload()
44+
a['ID']=n.metadata["name"]
45+
a['Addr']=n.obj["status"]["addresses"][0]["address"]
3446
list_of_nodes.append(a.copy())
3547
return list_of_nodes
36-
except Exception as e:
48+
except Exception:
3749
log.exception('(Q) Query of k8s nodes failed.')
3850
return dict()
3951

@@ -44,12 +56,12 @@ def scale_k8s_deploy(endpoint,service_name,replicas):
4456
if pk_config.dryrun_get(dryrun_id):
4557
log.info('(S) DRYRUN enabled. Skipping...')
4658
return
47-
kubernetes.config.load_kube_config()
48-
client = kubernetes.client.ExtensionsV1beta1Api()
59+
4960
try:
50-
dep = client.read_namespaced_deployment(service_name, "default")
51-
dep.spec.replicas = replicas
52-
client.patch_namespaced_deployment_scale(service_name, "default", dep)
61+
query = Deploymentv1.objects(kube).filter(field_selector={"metadata.name": service_name})
62+
deployment = [x for x in query][0]
63+
deployment.reload()
64+
deployment.scale(replicas)
5365
except Exception as e:
5466
log.warning('(S) Scaling of k8s service "{0}" failed: {1}'.format(service_name,str(e)))
5567
return
@@ -61,12 +73,13 @@ def query_k8s_replicas(endpoint,service_name):
6173
if pk_config.dryrun_get(dryrun_id):
6274
log.info('(I) DRYRUN enabled. Skipping...')
6375
return instance
64-
kubernetes.config.load_kube_config()
65-
client = kubernetes.client.ExtensionsV1beta1Api()
76+
6677
try:
67-
dep = client.read_namespaced_deployment(service_name, "default")
68-
replicas = dep.spec.replicas
69-
log.debug('(I) => m_container_count for {0}: {1}'.format(service_name,replicas))
78+
query = Deploymentv1.objects(kube).filter(field_selector={"metadata.name": service_name})
79+
deployment = [x for x in query][0]
80+
deployment.reload()
81+
instance = deployment.replicas
82+
log.debug('(I) => m_container_count for {0}: {1}'.format(service_name,instance))
7083
except Exception as e:
7184
log.warning('(Q) Querying k8s service "{0}" replicas failed: {1}'.format(service_name,str(e)))
7285
return instance
@@ -78,16 +91,18 @@ def remove_node(endpoint,id):
7891
if pk_config.dryrun_get(dryrun_id):
7992
log.info('(M) DRYRUN enabled. Skipping...')
8093
return
81-
kubernetes.config.load_kube_config()
82-
client = kubernetes.client.CoreV1Api()
94+
8395
try:
84-
client.delete_node(id)
96+
query = pykube.Node.objects(kube).filter(field_selector={"metadata.name": id})
97+
node = [x for x in query][0]
98+
node.reload()
99+
node.delete()
85100
except Exception:
86101
log.error('(M) => Removing k8s node failed.')
87102
return
88103

89104
def down_nodes_cleanup_by_list(stored, actual):
90-
setStored = { v['ID'] for k,v in stored.items() }
105+
setStored = { v['ID'] for k,v in stored.iteritems() }
91106
setActual = { x['ID'] for x in actual }
92107
missing = { x for x in setStored if x not in setActual }
93108
for x in missing:
@@ -102,7 +117,7 @@ def down_nodes_add_from_list(stored, actual):
102117
def down_nodes_cleanup_by_timeout(endpoint, stored, timeout):
103118
log=logging.getLogger('pk_k8s')
104119
current_time = int(time.time())
105-
for id, node in stored.items():
120+
for id, node in stored.iteritems():
106121
if node['micado_timestamp']+timeout < current_time:
107122
log.info('(M) => Node {0} is down for more than {1} seconds, removing.'.format(id,timeout))
108123
remove_node(endpoint,id)

handle_occopus.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,51 @@
44

55
dryrun_id = 'occopus'
66

7-
def scale_worker_node(endpoint,infra_name,worker_name,replicas):
7+
CONFIG_ENDPOINT = 'occopus_endpoint'
8+
CONFIG_INFRA_NAME = 'occopus_infra_name'
9+
10+
def scale_worker_node(config,scaling_info_list):
811
log=logging.getLogger('pk_occopus')
912
if pk_config.dryrun_get(dryrun_id):
1013
log.info('(S) DRYRUN enabled. Skipping...')
1114
return
12-
log.info('(S) => m_node_count: {0}'.format(replicas))
13-
wscall = '{0}/infrastructures/{1}/scaleto/{2}/{3}'.format(endpoint,infra_name,worker_name,replicas)
14-
log.debug('-->curl -X POST {0}'.format(wscall))
15-
response = requests.post(wscall).json()
16-
log.debug('-->response: {0}'.format(response))
15+
endpoint, infra_name = config[CONFIG_ENDPOINT], config[CONFIG_INFRA_NAME]
16+
for info in scaling_info_list:
17+
worker_name, replicas = info.get('node_name'), info.get('replicas')
18+
log.info('(S) {0} => m_node_count: {1}'.format(worker_name, replicas))
19+
wscall = '{0}/infrastructures/{1}/scaleto/{2}/{3}'.format(endpoint,infra_name,worker_name,replicas)
20+
log.debug('-->curl -X POST {0}'.format(wscall))
21+
response = requests.post(wscall).json()
22+
log.debug('-->response: {0}'.format(response))
1723
return
1824

19-
def query_number_of_worker_nodes(endpoint,infra_name,worker_name):
25+
def query_number_of_worker_nodes(config,worker_name):
2026
log=logging.getLogger('pk_occopus')
2127
instances=1
2228
if pk_config.dryrun_get(dryrun_id):
2329
log.info('(C) DRYRUN enabled. Skipping...')
2430
return instances
31+
endpoint, infra_name = config[CONFIG_ENDPOINT], config[CONFIG_INFRA_NAME]
2532
wscall = '{0}/infrastructures/{1}'.format(endpoint,infra_name)
2633
log.debug('-->curl -X GET {0}'.format(wscall))
2734
response = requests.get(wscall).json()
2835
instances = response.get(worker_name,dict()).get('scaling',dict()).get('target',0)
2936
log.debug('-->instances: {0}, response: {1}'.format(instances,response))
3037
return instances
3138

32-
def drop_worker_node(endpoint,infra_name,worker_name,replica):
39+
def drop_worker_node(config,scaling_info_list):
3340
log=logging.getLogger('pk_occopus')
3441
if pk_config.dryrun_get(dryrun_id):
3542
log.info('(S) DRYRUN enabled. Skipping...')
3643
return
37-
log.info('(S) => node drop: {0}'.format(replica))
38-
wscall = '{0}/infrastructures/{1}/scaledown/{2}/{3}'.format(endpoint,infra_name,worker_name,replica)
39-
log.debug('-->curl -X POST {0}'.format(wscall))
40-
response = requests.post(wscall).json()
41-
log.debug('-->response: {0}'.format(response))
44+
endpoint, infra_name = config[CONFIG_ENDPOINT], config[CONFIG_INFRA_NAME]
45+
for info in scaling_info_list:
46+
worker_name, replicas = info.get('node_name'), info.get('replicas')
47+
for replica in replicas:
48+
log.info('(S) {0} => node drop: {1}'.format(worker_name, replica))
49+
wscall = '{0}/infrastructures/{1}/scaledown/{2}/{3}'.format(endpoint,infra_name,worker_name,replica)
50+
log.debug('-->curl -X POST {0}'.format(wscall))
51+
response = requests.post(wscall).json()
52+
log.debug('-->response: {0}'.format(response))
4253
return
4354

0 commit comments

Comments
 (0)