Skip to content

Commit 7306202

Browse files
authored
ref(scheduler): split up the scheduler code into individual resources and to be modular (#1016)
This adds a resources package which contains all Kubernetes resources as their own individual classes. Each resource is self registering via the ResourceRegistry metaclass, this allows the base Resource class to know what resources are available and expose that up into KubeHTTPClient Each resource can specify their own api prefix and version, which allows us to support resources moving between API endpoints (see HPA, a new resource). To make it easy for resource to interact with one another each resource and the KubeHTTPClient can access other resources via `self.hpa.create()` or `self.pod.get()`, the short form (if a resource supports that) works, singular and plural also work via some intelligent mapping `version()` was added to let Resources find out what Kubernetes version they are dealing with. Closes #875
1 parent 899e008 commit 7306202

32 files changed

Lines changed: 2320 additions & 1786 deletions

rootfs/api/models/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ class Meta:
4747
@property
4848
def _scheduler(self):
4949
mod = importlib.import_module(settings.SCHEDULER_MODULE)
50-
return mod.SchedulerClient()
50+
return mod.SchedulerClient(settings.SCHEDULER_URL)
5151

5252
def _fetch_service_config(self, app):
5353
try:
5454
# Get the service from k8s to attach the domain correctly
55-
svc = self._scheduler.get_service(app, app).json()
55+
svc = self._scheduler.svc.get(app, app).json()
5656
except KubeException as e:
5757
raise ServiceUnavailable('Could not fetch Kubernetes Service {}'.format(app)) from e
5858

@@ -93,7 +93,7 @@ def _save_service_config(self, app, component, data):
9393

9494
# Update the k8s service for the application with new service information
9595
try:
96-
self._scheduler.update_service(app, app, svc)
96+
self._scheduler.svc.update(app, app, svc)
9797
except KubeException as e:
9898
raise ServiceUnavailable('Could not update Kubernetes Service {}'.format(app)) from e
9999

rootfs/api/models/app.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def save(self, *args, **kwargs):
104104
self.release_set.latest()
105105
except Release.DoesNotExist:
106106
try:
107-
if self._scheduler.get_namespace(self.id).status_code == 200:
107+
if self._scheduler.ns.get(self.id).status_code == 200:
108108
# Namespace already exists
109109
err = "{} already exists as a namespace in this kuberenetes setup".format(self.id) # noqa
110110
self.log(err, logging.INFO)
@@ -203,18 +203,18 @@ def create(self, *args, **kwargs): # noqa
203203
self.log('creating Namespace {} and services'.format(namespace), level=logging.DEBUG)
204204
# Create essential resources
205205
try:
206-
self._scheduler.get_namespace(namespace)
206+
self._scheduler.ns.get(namespace)
207207
except KubeException:
208-
self._scheduler.create_namespace(namespace)
208+
self._scheduler.ns.create(namespace)
209209

210210
try:
211-
self._scheduler.get_service(namespace, service)
211+
self._scheduler.svc.get(namespace, service)
212212
except KubeException:
213-
self._scheduler.create_service(namespace, service)
213+
self._scheduler.svc.create(namespace, service)
214214
except KubeException as e:
215215
# Blow it all away only if something horrible happens
216216
try:
217-
self._scheduler.delete_namespace(namespace)
217+
self._scheduler.ns.delete(namespace)
218218
except KubeException as e:
219219
# Just feed into the item below
220220
raise ServiceUnavailable('Could not delete the Namespace in Kubernetes') from e
@@ -234,12 +234,12 @@ def delete(self, *args, **kwargs):
234234
"""Delete this application including all containers"""
235235
self.log("deleting environment")
236236
try:
237-
self._scheduler.delete_namespace(self.id)
237+
self._scheduler.ns.delete(self.id)
238238

239239
# wait 30 seconds for termination
240240
for _ in range(30):
241241
try:
242-
self._scheduler.get_namespace(self.id)
242+
self._scheduler.ns.get(self.id)
243243
except KubeException:
244244
break
245245
except KubeException as e:
@@ -264,7 +264,7 @@ def restart(self, **kwargs): # noqa
264264
desired = 0
265265
labels = self._scheduler_filter(**kwargs)
266266
# fetch RS (which represent Deployments)
267-
controllers = self._scheduler.get_replicasets(kwargs['id'], labels=labels)
267+
controllers = self._scheduler.rs.get(kwargs['id'], labels=labels)
268268

269269
for controller in controllers.json()['items']:
270270
desired += controller['spec']['replicas']
@@ -275,7 +275,7 @@ def restart(self, **kwargs): # noqa
275275
try:
276276
tasks = [
277277
functools.partial(
278-
self._scheduler.delete_pod,
278+
self._scheduler.pod.delete,
279279
self.id,
280280
pod['name']
281281
) for pod in self.list_pods(**kwargs)
@@ -577,7 +577,7 @@ def _check_deployment_in_progress(self, deploys, force_deploy=False):
577577
for scale_type, kwargs in deploys.items():
578578
# Is there an existing deployment in progress?
579579
name = self._get_job_id(scale_type)
580-
in_progress, deploy_okay = self._scheduler.deployment_in_progress(
580+
in_progress, deploy_okay = self._scheduler.deployment.in_progress(
581581
self.id, name, kwargs.get("deploy_timeout"), kwargs.get("deploy_batches"),
582582
kwargs.get("replicas"), kwargs.get("tags")
583583
)
@@ -768,9 +768,9 @@ def list_pods(self, *args, **kwargs):
768768

769769
# in case a singular pod is requested
770770
if 'name' in kwargs:
771-
pods = [self._scheduler.get_pod(self.id, kwargs['name']).json()]
771+
pods = [self._scheduler.pod.get(self.id, kwargs['name']).json()]
772772
else:
773-
pods = self._scheduler.get_pods(self.id, labels=labels).json()['items']
773+
pods = self._scheduler.pod.get(self.id, labels=labels).json()['items']
774774

775775
data = []
776776
for p in pods:
@@ -779,14 +779,14 @@ def list_pods(self, *args, **kwargs):
779779
if labels['type'] == 'run':
780780
continue
781781

782-
state = str(self._scheduler.pod_state(p))
782+
state = str(self._scheduler.pod.state(p))
783783

784784
# follows kubelete convention - these are hidden unless show-all is set
785785
if state in ['down', 'crashed']:
786786
continue
787787

788788
# hide pod if it is passed the graceful termination period
789-
if self._scheduler.pod_deleted(p):
789+
if self._scheduler.pod.deleted(p):
790790
continue
791791

792792
item = Pod()
@@ -862,9 +862,9 @@ def maintenance_mode(self, mode):
862862

863863
try:
864864
service['metadata']['annotations']['router.deis.io/maintenance'] = str(mode).lower()
865-
self._scheduler.update_service(self.id, self.id, data=service)
865+
self._scheduler.svc.update(self.id, self.id, data=service)
866866
except KubeException as e:
867-
self._scheduler.update_service(self.id, self.id, data=old_service)
867+
self._scheduler.svc.update(self.id, self.id, data=old_service)
868868
raise ServiceUnavailable(str(e)) from e
869869

870870
def routable(self, routable):
@@ -876,9 +876,9 @@ def routable(self, routable):
876876

877877
try:
878878
service['metadata']['labels']['router.deis.io/routable'] = str(routable).lower()
879-
self._scheduler.update_service(self.id, self.id, data=service)
879+
self._scheduler.svc.update(self.id, self.id, data=service)
880880
except KubeException as e:
881-
self._scheduler.update_service(self.id, self.id, data=old_service)
881+
self._scheduler.svc.update(self.id, self.id, data=old_service)
882882
raise ServiceUnavailable(str(e)) from e
883883

884884
def _update_application_service(self, namespace, app_type, port, routable=False, annotations={}): # noqa
@@ -907,10 +907,10 @@ def _update_application_service(self, namespace, app_type, port, routable=False,
907907
# port 80 is the only one we care about right now
908908
service['spec']['ports'][pos]['targetPort'] = int(port)
909909

910-
self._scheduler.update_service(namespace, namespace, data=service)
910+
self._scheduler.svc.update(namespace, namespace, data=service)
911911
except Exception as e:
912912
# Fix service to old port and app type
913-
self._scheduler.update_service(namespace, namespace, data=old_service)
913+
self._scheduler.svc.update(namespace, namespace, data=old_service)
914914
raise KubeException(str(e)) from e
915915

916916
def whitelist(self, whitelist):
@@ -922,6 +922,6 @@ def whitelist(self, whitelist):
922922
try:
923923
addresses = ",".join(address for address in whitelist)
924924
service['metadata']['annotations']['router.deis.io/whitelist'] = addresses
925-
self._scheduler.update_service(self.id, self.id, data=service)
925+
self._scheduler.svc.update(self.id, self.id, data=service)
926926
except KubeException as e:
927927
raise ServiceUnavailable(str(e)) from e

rootfs/api/models/certificate.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,14 @@ def attach_in_kubernetes(self, domain):
183183
'tls.key': self.key
184184
}
185185

186-
secret = self._scheduler.get_secret(namespace, name).json()['data']
186+
secret = self._scheduler.secret.get(namespace, name).json()['data']
187187
except KubeException:
188-
self._scheduler.create_secret(namespace, name, data)
188+
self._scheduler.secret.create(namespace, name, data)
189189
else:
190190
# update cert secret to the TLS Ingress format if required
191191
if secret != data:
192192
try:
193-
self._scheduler.update_secret(namespace, name, data)
193+
self._scheduler.secret.update(namespace, name, data)
194194
except KubeException as e:
195195
msg = 'There was a problem updating the certificate secret ' \
196196
'{} for {}'.format(name, namespace)
@@ -225,8 +225,8 @@ def detach(self, *args, **kwargs):
225225
if len(self.domains) == 0:
226226
try:
227227
# We raise an exception when a secret doesn't exist
228-
self._scheduler.get_secret(namespace, name)
229-
self._scheduler.delete_secret(namespace, name)
228+
self._scheduler.secret.get(namespace, name)
229+
self._scheduler.secret.delete(namespace, name)
230230
except KubeException as e:
231231
raise ServiceUnavailable("Could not delete certificate secret {} for application {}".format(name, namespace)) from e # noqa
232232

rootfs/api/models/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def set_tags(self, previous_config):
9494
return
9595

9696
# Get all nodes with label selectors
97-
nodes = self._scheduler.get_nodes(labels=self.tags).json()
97+
nodes = self._scheduler.node.get(labels=self.tags).json()
9898
if nodes['items']:
9999
return
100100

rootfs/api/models/release.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ def cleanup_old(self): # noqa
259259
# Cleanup controllers
260260
labels = {'heritage': 'deis'}
261261
controller_removal = []
262-
controllers = self._scheduler.get_rcs(self.app.id, labels=labels).json()
262+
controllers = self._scheduler.rc.get(self.app.id, labels=labels).json()
263263
for controller in controllers['items']:
264264
current_version = controller['metadata']['labels']['version']
265265
# skip the latest release
@@ -286,20 +286,20 @@ def cleanup_old(self): # noqa
286286
'app': self.app.id,
287287
'type': 'env',
288288
}
289-
secrets = self._scheduler.get_secrets(self.app.id, labels=labels).json()
289+
secrets = self._scheduler.secret.get(self.app.id, labels=labels).json()
290290
for secret in secrets['items']:
291291
current_version = secret['metadata']['labels']['version']
292292
# skip the latest release
293293
if current_version == latest_version:
294294
continue
295295

296-
self._scheduler.delete_secret(self.app.id, secret['metadata']['name'])
296+
self._scheduler.secret.delete(self.app.id, secret['metadata']['name'])
297297

298298
# Remove stray pods
299299
labels = {'heritage': 'deis'}
300-
pods = self._scheduler.get_pods(self.app.id, labels=labels).json()
300+
pods = self._scheduler.pod.get(self.app.id, labels=labels).json()
301301
for pod in pods['items']:
302-
if self._scheduler.pod_deleted(pod):
302+
if self._scheduler.pod.deleted(pod):
303303
continue
304304

305305
current_version = pod['metadata']['labels']['version']
@@ -308,7 +308,7 @@ def cleanup_old(self): # noqa
308308
continue
309309

310310
try:
311-
self._scheduler.delete_pod(self.app.id, pod['metadata']['name'])
311+
self._scheduler.pod.delete(self.app.id, pod['metadata']['name'])
312312
except KubeHTTPException as e:
313313
# Sometimes k8s will manage to remove the pod from under us
314314
if e.response.status_code == 404:
@@ -329,7 +329,7 @@ def _cleanup_deployment_secrets_and_configs(self, namespace):
329329
# Find all ReplicaSets
330330
versions = []
331331
labels = {'heritage': 'deis', 'app': namespace}
332-
replicasets = self._scheduler.get_replicasets(namespace, labels=labels).json()
332+
replicasets = self._scheduler.rs.get(namespace, labels=labels).json()
333333
for replicaset in replicasets['items']:
334334
if (
335335
'version' not in replicaset['metadata']['labels'] or
@@ -348,9 +348,9 @@ def _cleanup_deployment_secrets_and_configs(self, namespace):
348348
'version__notin': versions
349349
}
350350
self.app.log('Cleaning up orphaned env var secrets for application {}'.format(namespace), level=logging.DEBUG) # noqa
351-
secrets = self._scheduler.get_secrets(namespace, labels=labels).json()
351+
secrets = self._scheduler.secret.get(namespace, labels=labels).json()
352352
for secret in secrets['items']:
353-
self._scheduler.delete_secret(namespace, secret['metadata']['name'])
353+
self._scheduler.secret.delete(namespace, secret['metadata']['name'])
354354

355355
def _delete_release_in_scheduler(self, namespace, version):
356356
"""
@@ -368,14 +368,14 @@ def _delete_release_in_scheduler(self, namespace, version):
368368
# see if the app config has deploy timeout preference, otherwise use global
369369
deploy_timeout = self.config.values.get('DEIS_DEPLOY_TIMEOUT', settings.DEIS_DEPLOY_TIMEOUT) # noqa
370370

371-
controllers = self._scheduler.get_rcs(namespace, labels=labels).json()
371+
controllers = self._scheduler.rc.get(namespace, labels=labels).json()
372372
for controller in controllers['items']:
373373
self._scheduler.cleanup_release(namespace, controller, deploy_timeout)
374374

375375
# remove secret that contains env vars for the release
376376
try:
377377
secret_name = "{}-{}-env".format(namespace, version)
378-
self._scheduler.delete_secret(namespace, secret_name)
378+
self._scheduler.secret.delete(namespace, secret_name)
379379
except KubeHTTPException:
380380
pass
381381

rootfs/api/tests/test_app.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,22 +373,22 @@ def test_app_exists_in_kubernetes(self, mock_requests):
373373

374374
def test_app_create_failure_kubernetes_create(self, mock_requests):
375375
"""
376-
Create an app but have scheduler.create_service fail with an exception
376+
Create an app but have scheduler.svc.create fail with an exception
377377
"""
378-
with mock.patch('scheduler.KubeHTTPClient.create_service') as mock_kube:
378+
with mock.patch('scheduler.resources.service.Service.create') as mock_kube:
379379
mock_kube.side_effect = KubeException('Boom!')
380380
response = self.client.post('/v2/apps')
381381
self.assertEqual(response.status_code, 503, response.data)
382382

383383
def test_app_delete_failure_kubernetes_destroy(self, mock_requests):
384384
"""
385-
Create an app and then delete but have scheduler.delete_namespace
385+
Create an app and then delete but have scheduler.ns.delete
386386
fail with an exception
387387
"""
388388
# create
389389
app_id = self.create_app()
390390

391-
with mock.patch('scheduler.KubeHTTPClient.delete_namespace') as mock_kube:
391+
with mock.patch('scheduler.resources.namespace.Namespace.delete') as mock_kube:
392392
# delete
393393
mock_kube.side_effect = KubeException('Boom!')
394394
response = self.client.delete('/v2/apps/{}'.format(app_id))

rootfs/api/tests/test_app_settings.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,8 @@ def test_settings_routable(self, mock_requests):
7272
Create an application with the routable flag turned on or off
7373
"""
7474
# create app, expecting routable to be true
75-
body = {'id': 'myid'}
76-
response = self.client.post('/v2/apps', body)
77-
self.assertEqual(response.status_code, 201, response.data)
78-
app = App.objects.get(id='myid')
75+
app_id = self.create_app()
76+
app = App.objects.get(id=app_id)
7977
self.assertTrue(app.appsettings_set.latest().routable)
8078
# Set routable to false
8179
response = self.client.post(
@@ -163,8 +161,8 @@ def test_kubernetes_service_failure(self, mock_requests):
163161
"""
164162
app_id = self.create_app()
165163

166-
# scheduler.update_service exception
167-
with mock.patch('scheduler.KubeHTTPClient.update_service') as mock_kube:
164+
# scheduler.svc.update exception
165+
with mock.patch('scheduler.resources.service.Service.update') as mock_kube:
168166
mock_kube.side_effect = KubeException('Boom!')
169167
addresses = ["2.3.4.5"]
170168
url = '/v2/apps/{}/whitelist'.format(app_id)

rootfs/api/tests/test_domain.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,16 +325,16 @@ def test_kubernetes_service_failure(self):
325325
"""
326326
app_id = self.create_app()
327327

328-
# scheduler.get_service exception
329-
with mock.patch('scheduler.KubeHTTPClient.get_service') as mock_kube:
328+
# scheduler.svc.get exception
329+
with mock.patch('scheduler.resources.service.Service.get') as mock_kube:
330330
mock_kube.side_effect = KubeException('Boom!')
331331
domain = 'foo.com'
332332
url = '/v2/apps/{}/domains'.format(app_id)
333333
response = self.client.post(url, {'domain': domain})
334334
self.assertEqual(response.status_code, 503, response.data)
335335

336-
# scheduler.update_service exception
337-
with mock.patch('scheduler.KubeHTTPClient.update_service') as mock_kube:
336+
# scheduler.svc.update exception
337+
with mock.patch('scheduler.resources.service.Service.update') as mock_kube:
338338
domain = 'foo.com'
339339
url = '/v2/apps/{}/domains'.format(app_id)
340340
response = self.client.post(url, {'domain': domain})

rootfs/api/tests/test_pods.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -792,8 +792,8 @@ def test_list_pods_failure(self, mock_requests):
792792

793793
app_id = self.create_app()
794794

795-
with mock.patch('scheduler.KubeHTTPClient.get_pod') as kube_pod:
796-
with mock.patch('scheduler.KubeHTTPClient.get_pods') as kube_pods:
795+
with mock.patch('scheduler.resources.pod.Pod.get') as kube_pod:
796+
with mock.patch('scheduler.resources.pod.Pod.get') as kube_pods:
797797
kube_pod.side_effect = KubeException('boom!')
798798
kube_pods.side_effect = KubeException('boom!')
799799
url = "/v2/apps/{app_id}/pods".format(**locals())

0 commit comments

Comments
 (0)