-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy path__init__.py
More file actions
340 lines (292 loc) · 13.4 KB
/
__init__.py
File metadata and controls
340 lines (292 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
from collections import OrderedDict
from datetime import datetime, timezone
import logging
from packaging.version import Version, parse
import requests
import requests.exceptions
from requests_toolbelt import user_agent
import re
from urllib.parse import urljoin
from api import __version__ as drycc_version
from scheduler.exceptions import KubeException, KubeHTTPException
logger = logging.getLogger(__name__)
session = None
resource_mapping = OrderedDict()
def get_k8s_session(k8s_api_verify_tls):
global session
if session is None:
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as token_file:
token = token_file.read()
session = requests.Session()
session.headers = {
'Authorization': 'Bearer ' + token,
'Content-Type': 'application/json',
'User-Agent': user_agent('Drycc Controller', drycc_version)
}
if k8s_api_verify_tls:
session.verify = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
else:
session.verify = False
return session
class KubeHTTPClient(object):
api_version = 'v1'
api_prefix = 'api'
# ISO-8601 which is used by kubernetes
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def __init__(self, url, k8s_api_verify_tls=True):
global resource_mapping
self.url = url
self.k8s_api_verify_tls = k8s_api_verify_tls
self.session = get_k8s_session(self.k8s_api_verify_tls)
# map the various k8s Resources to an internal property
from scheduler.resources import Resource # lazy load
for res in Resource:
name = str(res.__name__).lower() # singular
component = name + 's' # make plural
# check if component has already been processed
if component in resource_mapping:
continue
# get past recursion problems in case of self reference
resource_mapping[component] = ''
resource_mapping[component] = res(self.url, self.k8s_api_verify_tls)
# map singular Resource name to the plural one
resource_mapping[name] = component
if res.short_name is not None:
# map short name to long name so a resource can be named rs
# but have the main object live at replicasets
resource_mapping[str(res.short_name).lower()] = component
def api(self, tmpl, *args):
"""Return a fully-qualified Kubernetes API URL from a string template with args."""
return "/{}/{}".format(self.api_prefix, self.api_version) + tmpl.format(*args)
def __getattr__(self, name):
global resource_mapping
if name in resource_mapping:
# resolve to final name if needed
component = resource_mapping[name]
if type(component) is not str:
# already a component object
return component
return resource_mapping[component]
return object.__getattribute__(self, name)
def version(self):
"""Get Kubernetes version"""
response = self.http_get('/version')
if self.unhealthy(response.status_code):
raise KubeHTTPException(response, 'fetching Kubernetes version')
data = response.json()
parsed_version = parse(
re.sub(r"[^0-9\.]", '', str('{}.{}'.format(data['major'], data['minor']))))
return Version('{}'.format(parsed_version))
@staticmethod
def parse_date(date):
return datetime.strptime(date, KubeHTTPClient.DATETIME_FORMAT).replace(tzinfo=timezone.utc)
@staticmethod
def unhealthy(status_code):
return not 200 <= status_code <= 299
@staticmethod
def query_params(labels=None, fields=None, resource_version=None, pretty=False):
query = {}
# labels and fields are encoded slightly differently than python-requests can do
if labels:
selectors = []
for key, value in labels.items():
# http://kubernetes.io/docs/user-guide/labels/#set-based-requirement
if '__notin' in key:
key = key.replace('__notin', '')
selectors.append('{} notin({})'.format(key, ','.join(value)))
# list is automagically a in()
elif '__in' in key or isinstance(value, list):
key = key.replace('__in', '')
selectors.append('{} in({})'.format(key, ','.join(value)))
elif value is None:
# allowing a check if a label exists (or not) without caring about value
selectors.append(key)
# http://kubernetes.io/docs/user-guide/labels/#equality-based-requirement
elif isinstance(value, str):
selectors.append('{}={}'.format(key, value))
query['labelSelector'] = ','.join(selectors)
if fields:
fields = ['{}={}'.format(key, value) for key, value in fields.items()]
query['fieldSelector'] = ','.join(fields)
# Which resource version to start from. Otherwise starts from the beginning
if resource_version:
query['resourceVersion'] = resource_version
# If output should pretty print, only True / False allowed
if pretty:
query['pretty'] = pretty
return query
@staticmethod
def log(namespace, message, level='INFO'):
"""Logs a message in the context of this application.
This prefixes log messages with a namespace "tag".
When it's seen, the message-- usually an application event of some
sort like releasing or scaling, will be considered as "belonging" to the application
instead of the controller and will be handled accordingly.
"""
lvl = getattr(logging, level.upper()) if hasattr(logging, level.upper()) else logging.INFO
logger.log(lvl, "[{}]: {}".format(namespace, message))
def http_head(self, path, **kwargs):
"""
Make a HEAD request to the k8s server.
"""
try:
url = urljoin(self.url, path)
response = self.session.head(url, **kwargs)
except requests.exceptions.ConnectionError as err:
# reraise as KubeException, but log stacktrace.
message = "There was a problem retrieving headers from " \
"the Kubernetes API server. URL: {}".format(url)
logger.error(message)
raise KubeException(message) from err
return response
def http_get(self, path, params=None, **kwargs):
"""
Make a GET request to the k8s server.
"""
try:
url = urljoin(self.url, path)
response = self.session.get(url, params=params, **kwargs)
except requests.exceptions.ConnectionError as err:
# reraise as KubeException, but log stacktrace.
message = "There was a problem retrieving data from " \
"the Kubernetes API server. URL: {}, params: {}".format(url, params)
logger.error(message)
raise KubeException(message) from err
return response
def http_post(self, path, data=None, json=None, **kwargs):
"""
Make a POST request to the k8s server.
"""
try:
url = urljoin(self.url, path)
response = self.session.post(url, data=data, json=json, **kwargs)
except requests.exceptions.ConnectionError as err:
# reraise as KubeException, but log stacktrace.
message = "There was a problem posting data to " \
"the Kubernetes API server. URL: {}, " \
"data: {}, json: {}".format(url, data, json)
logger.error(message)
raise KubeException(message) from err
return response
def http_put(self, path, data=None, **kwargs):
"""
Make a PUT request to the k8s server.
"""
try:
url = urljoin(self.url, path)
response = self.session.put(url, data=data, **kwargs)
except requests.exceptions.ConnectionError as err:
# reraise as KubeException, but log stacktrace.
message = "There was a problem putting data to " \
"the Kubernetes API server. URL: {}, " \
"data: {}".format(url, data)
logger.error(message)
raise KubeException(message) from err
return response
def http_patch(self, path, data=None, **kwargs):
"""
Make a PATCH request to the k8s server.
"""
try:
url = urljoin(self.url, path)
# accepted media types include:
# application/json-patch+json,
# application/merge-patch+json,
# application/apply-patch+yaml
# self.session.headers["Content-Type"] = "application/json-patch+json"
response = self.session.patch(url, data=data, **kwargs)
except requests.exceptions.ConnectionError as err:
# reraise as KubeException, but log stacktrace.
message = "There was a problem patching data to " \
"the Kubernetes API server. URL: {}, " \
"data: {}".format(url, data)
logger.error(message)
raise KubeException(message) from err
return response
def http_delete(self, path, **kwargs):
"""
Make a DELETE request to the k8s server.
"""
try:
url = urljoin(self.url, path)
response = self.session.delete(url, **kwargs)
except requests.exceptions.ConnectionError as err:
# reraise as KubeException, but log stacktrace.
message = "There was a problem deleting data from " \
"the Kubernetes API server. URL: {}".format(url)
logger.error(message)
raise KubeException(message) from err
return response
def deploy(self, namespace, name, image, command, args, **kwargs):
"""Deploy Deployment depending on what's requested"""
app_type = kwargs.get('app_type')
version = kwargs.get('version')
spec_annotations = {}
# If an RC already exists then stop processing of the deploy
try:
# construct old school RC name
rc_name = '{}-{}-{}'.format(namespace, version, app_type)
self.rc.get(namespace, rc_name)
self.log(namespace, 'RC {} already exists. Stopping deploy'.format(rc_name))
return
except KubeHTTPException:
# if RC doesn't exist then let the app continue
pass
# create a deployment if missing, otherwise update to trigger a release
try:
# labels that represent the pod(s)
labels = {
'app': namespace,
'version': version,
'type': app_type,
'heritage': 'drycc',
}
# this depends on the deployment object having the latest information
deployment = self.deployment.get(namespace, name).json()
# a hack to persist the spec annotations on the deployment object to next release
# instantiate spec_annotations and set to blank to avoid errors
if 'annotations' in deployment['spec']['template']['metadata'].keys():
old_spec_annotations = deployment['spec']['template']['metadata']['annotations']
spec_annotations = old_spec_annotations
if deployment['spec']['template']['metadata']['labels'] == labels:
self.log(namespace, 'Deployment {} with release {} already exists. Stopping deploy'.format(name, version)) # noqa
return
except KubeException:
# create the initial deployment object (and the first revision)
self.deployment.create(
namespace, name, image, command, args, spec_annotations, **kwargs
)
else:
try:
# kick off a new revision of the deployment
self.deployment.update(
namespace, name, image, command, args, spec_annotations, **kwargs
)
except KubeException as e:
raise KubeException(
'There was a problem while deploying {} of {}-{}. '
"Additional information:\n{}".format(version, namespace, app_type, str(e))
) from e
def scale(self, namespace, name, image, command, args, **kwargs):
"""Scale Deployment"""
try:
self.deployment.get(namespace, name)
except KubeHTTPException as e:
if e.response.status_code == 404:
# create missing deployment - deleted if it fails
try:
spec_annotations = {}
self.deployment.create(
namespace, name, image, command, args, spec_annotations, **kwargs
)
except KubeException:
# see if the deployment got created
try:
self.deployment.get(namespace, name)
except KubeHTTPException as e:
if e.response.status_code != 404:
self.deployment.delete(namespace, name)
raise
# let the scale failure bubble up
self.deployment.scale(namespace, name, **kwargs)
SchedulerClient = KubeHTTPClient