-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathsvcat.py
More file actions
150 lines (139 loc) · 5.26 KB
/
svcat.py
File metadata and controls
150 lines (139 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from scheduler.resources import Resource
from scheduler.exceptions import KubeHTTPException
class ServiceCatalog(Resource):
api_version = 'servicecatalog.k8s.io/v1beta1'
api_prefix = 'apis'
short_name = 'svcat'
@staticmethod
def service_instance_manifest(namespace, name, version=None, **kwargs):
labels = {
'heritage': 'drycc',
}
data = {
"apiVersion": 'servicecatalog.k8s.io/v1beta1',
"kind": "ServiceInstance",
"metadata": {
"name": name,
"namespace": namespace,
'labels': labels
},
"spec": {
"clusterServiceClassExternalName": kwargs.get('instance_class'),
"clusterServicePlanExternalName": kwargs.get('instance_plan'),
}
}
if version:
data["metadata"]["resourceVersion"] = version
if kwargs.get('parameters'):
data["spec"]["parameters"] = kwargs.get('parameters')
if kwargs.get('external_id'):
data["spec"]["externalID"] = kwargs.get('external_id')
return data
def get_instance(self, namespace, name=None):
"""
Fetch a single serviceinstance or a list of serviceinstances
"""
if name is not None:
url = self.api('/namespaces/{}/serviceinstances/{}',
namespace, name)
message = 'get serviceinstances ' + name
else:
url = self.api('/namespaces/{}/serviceinstances', namespace)
message = 'get serviceinstances'
response = self.http_get(url)
if self.unhealthy(response.status_code):
raise KubeHTTPException(response, message)
return response
def create_instance(self, namespace, name, **kwargs):
"""
Create serviceinstances
"""
url = self.api('/namespaces/{}/serviceinstances', namespace)
data = self.service_instance_manifest(namespace, name, **kwargs)
response = self.http_post(url, json=data)
if not response.status_code == 201:
raise KubeHTTPException(
response,
"create serviceinstances {}".format(namespace))
return response
def put_instance(self, namespace, name, version, **kwargs):
"""
update serviceinstances
"""
url = self.api('/namespaces/{}/serviceinstances/{}', namespace, name)
data = self.service_instance_manifest(namespace, name, version, **kwargs)
response = self.http_put(url, json=data)
if not response.status_code == 200:
raise KubeHTTPException(
response,
"update serviceinstances {}".format(namespace))
return response
def delete_instance(self, namespace, name):
"""
Delete serviceinstances
"""
url = self.api('/namespaces/{}/serviceinstances/{}', namespace,
name)
response = self.http_delete(url)
if self.unhealthy(response.status_code):
raise KubeHTTPException(response, 'delete serviceinstance ' + name)
return response
@staticmethod
def service_binding_manifest(namespace, name, version=None, **kwargs):
labels = {
'heritage': 'drycc',
}
data = {
"apiVersion": 'servicecatalog.k8s.io/v1beta1',
"kind": "ServiceBinding",
"metadata": {
"name": name,
"namespace": namespace,
'labels': labels
},
"spec": {
"instanceRef": {
"name": name
}
}
}
if version:
data["metadata"]["resourceVersion"] = version
return data
def get_binding(self, namespace, name=None):
"""
Fetch a single servicebinding or a list of servicebindings
"""
if name is not None:
url = self.api('/namespaces/{}/servicebindings/{}',
namespace, name)
message = 'get servicebindings ' + name
else:
url = self.api('/namespaces/{}/servicebindings', namespace)
message = 'get servicebindings'
response = self.http_get(url)
if self.unhealthy(response.status_code):
raise KubeHTTPException(response, message)
return response
def create_binding(self, namespace, name, **kwargs):
"""
Create servicebindings
"""
url = self.api('/namespaces/{}/servicebindings', namespace)
data = self.service_binding_manifest(namespace, name, **kwargs)
response = self.http_post(url, json=data)
if not response.status_code == 201:
raise KubeHTTPException(
response,
"create servicebindings {}".format(namespace))
return response
def delete_binding(self, namespace, name):
"""
Delete servicebindings
"""
url = self.api('/namespaces/{}/servicebindings/{}', namespace, name)
response = self.http_delete(url)
if self.unhealthy(response.status_code):
raise KubeHTTPException(response,
'delete servicebindings ' + name)
return response