Skip to content
This repository was archived by the owner on Aug 17, 2023. It is now read-only.

Commit e0d44e8

Browse files
authored
add funtions to apply kubernetes objects (#483)
1 parent 33b8ef2 commit e0d44e8

File tree

4 files changed

+203
-2
lines changed

4 files changed

+203
-2
lines changed

kubeflow/fairing/kubernetes/manager.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import logging
22
import retrying
3+
import yaml
34

45
from kubernetes import client, config, watch
56
from kfserving import KFServingClient
67

78
from kubeflow.tfjob import TFJobClient
89
from kubeflow.pytorchjob import PyTorchJobClient
910

10-
from kubeflow.fairing.utils import is_running_in_k8s
11+
from kubeflow.fairing.utils import is_running_in_k8s, camel_to_snake
1112
from kubeflow.fairing.constants import constants
13+
from kubeflow.fairing import utils
1214

1315
logger = logging.getLogger(__name__)
1416

@@ -331,3 +333,89 @@ def log(self, name, namespace, selectors=None, container='', follow=True):
331333
print(chunk.rstrip().decode('utf8'))
332334
finally:
333335
tail.release_conn()
336+
337+
338+
def apply_namespaced_object(self, spec, mode='create'): #pylint:disable=too-many-branches
339+
"""Run apply on the provided Kubernetes specs.
340+
341+
:param specs: The YAML specs to apply.
342+
:param mode: 4 valid modes: create, patch, replace and delete.
343+
:returns: applied resources.
344+
"""
345+
346+
if mode not in ['create', 'patch', 'replace', 'delete']:
347+
raise ValueError("Unknown mode %s, "
348+
"valid modes: create, patch, replace and delete." % mode)
349+
350+
if not isinstance(spec, dict):
351+
spec = yaml.load(spec)
352+
353+
try:
354+
namespace = spec["metadata"]["namespace"]
355+
except KeyError:
356+
namespace = utils.get_default_target_namespace()
357+
358+
kind = spec["kind"]
359+
kind_snake = camel_to_snake(kind)
360+
plural = spec["kind"].lower() + "s"
361+
362+
if mode in ['patch', 'replace', 'delete']:
363+
try:
364+
name = spec["metadata"]["name"]
365+
except Exception:
366+
raise RuntimeError("Cannot get the name in the spec for the operation %s." % mode)
367+
368+
if not "/" in spec["apiVersion"]:
369+
group = None
370+
else:
371+
group, version = spec["apiVersion"].split("/", 1)
372+
373+
if group is None or group.lower() == "apps":
374+
if group is None:
375+
api = client.CoreV1Api()
376+
else:
377+
api = client.AppsV1Api()
378+
method_name = mode + "_namespaced_" + kind_snake
379+
if mode == 'create':
380+
method_args = [namespace, spec]
381+
elif mode == 'delete':
382+
method_args = [name, namespace]
383+
else:
384+
method_args = [name, namespace, spec]
385+
else:
386+
api = client.CustomObjectsApi()
387+
method_name = mode + "_namespaced_custom_object"
388+
389+
if mode == 'create':
390+
method_args = [group, version, namespace, plural, spec]
391+
elif mode == 'delete':
392+
method_args = [group, version, namespace, plural, name, client.V1DeleteOptions()]
393+
else:
394+
method_args = [group, version, namespace, plural, name, spec]
395+
396+
apply_method = getattr(api, method_name)
397+
398+
try:
399+
result = apply_method(*method_args)
400+
except client.rest.ApiException as e:
401+
raise RuntimeError(
402+
"Exception when calling %s->%s: %s\n" % api, apply_method, e)
403+
404+
return result
405+
406+
407+
def apply_namespaced_objects(self, specs, mode='create'):
408+
"""Run apply on the provided Kubernetes specs.
409+
410+
:param specs: A list of strings or dicts providing the YAML specs to apply.
411+
:param mode: 4 valid modes: create, patch, replace and delete.
412+
:returns:
413+
414+
"""
415+
results = []
416+
417+
for spec in specs:
418+
result = self.apply_namespaced_object(spec, mode=mode)
419+
results.append(result)
420+
421+
return results

kubeflow/fairing/utils.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import zlib
33
import uuid
4-
4+
import re
55

66
def get_image(repository, name):
77
"""Get the full image name by integrating repository and image name.
@@ -47,3 +47,10 @@ def crc(file_name):
4747
def random_tag():
4848
"""Get a random tag."""
4949
return str(uuid.uuid4()).split('-')[0]
50+
51+
def camel_to_snake(name):
52+
"""
53+
Converts a string that is camelCase into snake_case
54+
"""
55+
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
56+
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()

tests/integration/kubernetes/__init__.py

Whitespace-only changes.
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from kubeflow.fairing.kubernetes.manager import KubeManager
2+
3+
core_api_test = '''
4+
apiVersion: v1
5+
kind: Pod
6+
metadata:
7+
name: core-api-test
8+
labels:
9+
name: nginx
10+
spec:
11+
containers:
12+
- name: nginx
13+
image: nginx
14+
ports:
15+
- containerPort: 80
16+
'''
17+
18+
apps_api_test = '''
19+
apiVersion: apps/v1
20+
kind: Deployment
21+
metadata:
22+
name: apps-api-test
23+
labels:
24+
app: nginx
25+
spec:
26+
replicas: 1
27+
selector:
28+
matchLabels:
29+
app: nginx
30+
template:
31+
metadata:
32+
labels:
33+
app: nginx
34+
spec:
35+
containers:
36+
- name: nginx
37+
image: nginx:1.14.2
38+
ports:
39+
- containerPort: 80
40+
'''
41+
42+
custom_resource_test = '''
43+
apiVersion: "kubeflow.org/v1"
44+
kind: "TFJob"
45+
metadata:
46+
name: "custom-resource-test"
47+
spec:
48+
tfReplicaSpecs:
49+
PS:
50+
replicas: 1
51+
restartPolicy: Never
52+
template:
53+
spec:
54+
containers:
55+
- name: tensorflow
56+
image: kubeflow/tf-dist-mnist-test:1.0
57+
Worker:
58+
replicas: 1
59+
restartPolicy: Never
60+
template:
61+
spec:
62+
containers:
63+
- name: tensorflow
64+
image: kubeflow/tf-dist-mnist-test:1.0
65+
'''
66+
67+
kubeflow_client = KubeManager()
68+
69+
def test_apply_namespaced_object_core_v1_api():
70+
'''
71+
Test apply_namespaced_object API for CoreV1Api
72+
'''
73+
kubeflow_client.apply_namespaced_object(core_api_test)
74+
kubeflow_client.apply_namespaced_object(core_api_test, mode='patch')
75+
kubeflow_client.apply_namespaced_object(core_api_test, mode='delete')
76+
77+
78+
def test_apply_namespaced_object_apps_v1_api():
79+
'''
80+
Test apply_namespaced_object API for AppV1Api
81+
'''
82+
kubeflow_client.apply_namespaced_object(apps_api_test)
83+
kubeflow_client.apply_namespaced_object(apps_api_test, mode='patch')
84+
kubeflow_client.apply_namespaced_object(apps_api_test, mode='delete')
85+
86+
def test_apply_namespaced_object_custom_resource_api():
87+
'''
88+
Test apply_namespaced_object API for CRD API
89+
'''
90+
kubeflow_client.apply_namespaced_object(custom_resource_test)
91+
kubeflow_client.apply_namespaced_object(custom_resource_test, mode='patch')
92+
kubeflow_client.apply_namespaced_object(custom_resource_test, mode='delete')
93+
94+
def test_apply_namespaced_objects():
95+
'''
96+
Test apply_namespaced_objects for buck applying
97+
'''
98+
# To avoid error about the object already exists, rename the resource.
99+
bulk_resources = [
100+
core_api_test.replace('core-api-test', 'core-api-test-buck'),
101+
apps_api_test.replace('apps-api-test', 'apps-api-test-buck'),
102+
custom_resource_test.replace('custom-resource-test', 'custom-resource-test-buck')
103+
]
104+
kubeflow_client.apply_namespaced_objects(bulk_resources)
105+
kubeflow_client.apply_namespaced_objects(bulk_resources, mode='patch')
106+
kubeflow_client.apply_namespaced_objects(bulk_resources, mode='delete')

0 commit comments

Comments
 (0)