Skip to content

Commit 11c9cec

Browse files
author
Gabriel Monroy
committed
refactor(controller): chaos scheduler tests and cleanup
1 parent 72546d7 commit 11c9cec

11 files changed

Lines changed: 591 additions & 226 deletions

File tree

controller/api/models.py

Lines changed: 142 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838

3939
def log_event(app, msg, level=logging.INFO):
4040
msg = "{}: {}".format(app.id, msg)
41-
logger.log(level, msg)
41+
logger.log(level, msg) # django logger
42+
app.log(msg) # local filesystem
4243

4344

4445
def validate_app_structure(value):
@@ -95,7 +96,7 @@ class Cluster(UuidAuditedModel):
9596

9697
CLUSTER_TYPES = (('mock', 'Mock Cluster'),
9798
('coreos', 'CoreOS Cluster'),
98-
('faulty', 'Faulty Cluster'))
99+
('chaos', 'Chaos Cluster'))
99100

100101
owner = models.ForeignKey(settings.AUTH_USER_MODEL)
101102
id = models.CharField(max_length=128, unique=True)
@@ -182,44 +183,6 @@ def _clean_app_logs(self):
182183
if os.path.exists(path):
183184
os.remove(path)
184185

185-
def deploy(self, user, release, initial=False):
186-
"""Deploy a new release to this application"""
187-
containers = self.container_set.all()
188-
self._deploy_containers(containers, release)
189-
# update release in database
190-
for c in containers:
191-
c.release = release
192-
c.save()
193-
self.release = release
194-
self.save()
195-
# perform default scaling if necessary
196-
if initial:
197-
self._default_scale(user, release)
198-
199-
def _default_scale(self, user, release):
200-
"""Scale to default structure based on release type"""
201-
# if there is no SHA, assume a docker image is being promoted
202-
if not release.build.sha:
203-
structure = {'cmd': 1}
204-
# if a dockerfile exists without a procfile, assume docker workflow
205-
elif release.build.dockerfile and not release.build.procfile:
206-
structure = {'cmd': 1}
207-
# if a procfile exists without a web entry, assume docker workflow
208-
elif release.build.procfile and 'web' not in release.build.procfile:
209-
structure = {'cmd': 1}
210-
# default to heroku workflow
211-
else:
212-
structure = {'web': 1}
213-
self.scale(user, structure)
214-
215-
def _deploy_containers(self, to_deploy, release, **kwargs):
216-
"""Deploys containers via the scheduler"""
217-
threads = []
218-
for c in to_deploy:
219-
threads.append(threading.Thread(target=c.deploy, args=(release,)))
220-
[t.start() for t in threads]
221-
[t.join() for t in threads]
222-
223186
def scale(self, user, structure): # noqa
224187
"""Scale containers up or down to match requested structure."""
225188
requested_structure = structure.copy()
@@ -235,7 +198,6 @@ def scale(self, user, structure): # noqa
235198
msg = '{} scaled containers '.format(user.username) + ' '.join(
236199
"{}={}".format(k, v) for k, v in requested_structure.items())
237200
log_event(self, msg)
238-
self.log(msg)
239201
# iterate and scale by container type (web, worker, etc)
240202
changed = False
241203
to_add, to_remove = [], []
@@ -268,9 +230,6 @@ def scale(self, user, structure): # noqa
268230
self._start_containers(to_add)
269231
if to_remove:
270232
self._destroy_containers(to_remove)
271-
# remove the database record
272-
for c in to_remove:
273-
c.delete()
274233
# save new structure to the database
275234
self.structure = structure
276235
self.save()
@@ -285,8 +244,15 @@ def _start_containers(self, to_add):
285244
start_threads.append(threading.Thread(target=c.start))
286245
[t.start() for t in create_threads]
287246
[t.join() for t in create_threads]
247+
if set([c.state for c in to_add]) != set([Container.CREATED]):
248+
err = 'aborting, failed to create some containers'
249+
log_event(self, err, logging.ERROR)
250+
raise RuntimeError(err)
288251
[t.start() for t in start_threads]
289252
[t.join() for t in start_threads]
253+
if set([c.state for c in to_add]) != set([Container.UP]):
254+
err = 'warning, some containers failed to start'
255+
log_event(self, err, logging.WARNING)
290256

291257
def _destroy_containers(self, to_destroy):
292258
"""Destroys containers via the scheduler"""
@@ -295,6 +261,75 @@ def _destroy_containers(self, to_destroy):
295261
destroy_threads.append(threading.Thread(target=c.destroy))
296262
[t.start() for t in destroy_threads]
297263
[t.join() for t in destroy_threads]
264+
[c.delete() for c in to_destroy if c.state == Container.DESTROYED]
265+
if set([c.state for c in to_destroy]) != set([Container.DESTROYED]):
266+
err = 'aborting, failed to destroy some containers'
267+
log_event(self, err, logging.ERROR)
268+
raise RuntimeError(err)
269+
270+
def deploy(self, user, release, initial=False):
271+
"""Deploy a new release to this application"""
272+
existing = self.container_set.all()
273+
new = []
274+
for e in existing:
275+
n = e.clone(release)
276+
n.save()
277+
new.append(n)
278+
279+
# create new containers
280+
threads = []
281+
for c in new:
282+
threads.append(threading.Thread(target=c.create))
283+
[t.start() for t in threads]
284+
[t.join() for t in threads]
285+
286+
# check for containers that failed to create
287+
if len(new) > 0 and set([c.state for c in new]) != set([Container.CREATED]):
288+
err = 'aborting, failed to create some containers'
289+
log_event(self, err, logging.ERROR)
290+
self._destroy_containers(new)
291+
raise RuntimeError(err)
292+
293+
# start new containers
294+
threads = []
295+
for c in new:
296+
threads.append(threading.Thread(target=c.start))
297+
[t.start() for t in threads]
298+
[t.join() for t in threads]
299+
300+
# check for containers that didn't come up correctly
301+
if len(new) > 0 and set([c.state for c in new]) != set([Container.UP]):
302+
# report the deploy error
303+
err = 'warning, some containers failed to start'
304+
log_event(self, err, logging.WARNING)
305+
306+
# destroy old containers
307+
if existing:
308+
self._destroy_containers(existing)
309+
310+
# perform default scaling if necessary
311+
if initial:
312+
self._default_scale(user, release)
313+
314+
def _default_scale(self, user, release):
315+
"""Scale to default structure based on release type"""
316+
# if there is no SHA, assume a docker image is being promoted
317+
if not release.build.sha:
318+
structure = {'cmd': 1}
319+
320+
# if a dockerfile exists without a procfile, assume docker workflow
321+
elif release.build.dockerfile and not release.build.procfile:
322+
structure = {'cmd': 1}
323+
324+
# if a procfile exists without a web entry, assume docker workflow
325+
elif release.build.procfile and 'web' not in release.build.procfile:
326+
structure = {'cmd': 1}
327+
328+
# default to heroku workflow
329+
else:
330+
structure = {'web': 1}
331+
332+
self.scale(user, structure)
298333

299334
def logs(self):
300335
"""Return aggregated log data for this application."""
@@ -309,7 +344,6 @@ def run(self, user, command):
309344
# TODO: add support for interactive shell
310345
msg = "{} runs '{}'".format(user.username, command)
311346
log_event(self, msg)
312-
self.log(msg)
313347
c_num = max([c.num for c in self.container_set.filter(type='admin')] or [0]) + 1
314348
try:
315349
# create database record for admin process
@@ -347,20 +381,25 @@ class Container(UuidAuditedModel):
347381
UP = 'up'
348382
DOWN = 'down'
349383
DESTROYED = 'destroyed'
384+
CRASHED = 'crashed'
385+
ERROR = 'error'
350386
STATE_CHOICES = (
351387
(INITIALIZED, 'initialized'),
352388
(CREATED, 'created'),
353389
(UP, 'up'),
354390
(DOWN, 'down'),
355-
(DESTROYED, 'destroyed')
391+
(DESTROYED, 'destroyed'),
392+
(CRASHED, 'crashed'),
393+
(ERROR, 'error'),
356394
)
357395

358396
owner = models.ForeignKey(settings.AUTH_USER_MODEL)
359397
app = models.ForeignKey('App')
360398
release = models.ForeignKey('Release')
361399
type = models.CharField(max_length=128, blank=False)
362400
num = models.PositiveIntegerField()
363-
state = FSMField(default=INITIALIZED, choices=STATE_CHOICES, protected=True)
401+
state = FSMField(default=INITIALIZED, choices=STATE_CHOICES,
402+
protected=True, propagate=False)
364403

365404
def short_name(self):
366405
return "{}.{}.{}".format(self.release.app.id, self.type, self.num)
@@ -400,62 +439,73 @@ def _get_command(self):
400439
def _command_announceable(self):
401440
return self._command.lower() in ['start web', '']
402441

403-
@transition(field=state, source=INITIALIZED, target=CREATED)
442+
def clone(self, release):
443+
c = Container.objects.create(owner=self.owner,
444+
app=self.app,
445+
release=release,
446+
type=self.type,
447+
num=self.num)
448+
return c
449+
450+
@transition(field=state, source=INITIALIZED, target=CREATED, on_error=ERROR)
404451
def create(self):
405452
image = self.release.image + ':v' + str(self.release.version)
406453
kwargs = {'memory': self.release.config.memory,
407454
'cpu': self.release.config.cpu,
408455
'tags': self.release.config.tags}
409-
self._scheduler.create(name=self._job_id,
410-
image=image,
411-
command=self._command,
412-
use_announcer=self._command_announceable(),
413-
**kwargs)
414-
415-
@transition(field=state,
416-
source=[CREATED, UP, DOWN],
417-
target=UP, crashed=DOWN)
456+
job_id = self._job_id
457+
try:
458+
self._scheduler.create(
459+
name=job_id,
460+
image=image,
461+
command=self._command,
462+
use_announcer=self._command_announceable(), **kwargs)
463+
except Exception as e:
464+
err = '{} (create): {}'.format(job_id, e)
465+
log_event(self.app, err, logging.ERROR)
466+
raise
467+
468+
@transition(field=state, source=[CREATED, UP, DOWN], target=UP, on_error=CRASHED)
418469
def start(self):
419-
self._scheduler.start(self._job_id, self._command_announceable())
420-
421-
@transition(field=state,
422-
source=[INITIALIZED, CREATED, UP, DOWN],
423-
target=UP, crashed=DOWN)
424-
def deploy(self, new_release):
425-
old_job_id = self._job_id
426-
# update release
427-
self.release = new_release
428-
# deploy new container
429-
new_job_id = self._job_id
430-
image = self.release.image + ':v' + str(self.release.version)
431-
c_type = self.type
432-
kwargs = {'memory': self.release.config.memory,
433-
'cpu': self.release.config.cpu,
434-
'tags': self.release.config.tags}
435-
self._scheduler.create(name=new_job_id,
436-
image=image,
437-
command=self._command.format(**locals()),
438-
use_announcer=self._command_announceable(),
439-
**kwargs)
440-
self._scheduler.start(new_job_id, self._command_announceable())
441-
# destroy old container
442-
self._scheduler.destroy(old_job_id, self._command_announceable())
443-
444-
@transition(field=state, source=UP, target=DOWN)
470+
job_id = self._job_id
471+
try:
472+
self._scheduler.start(job_id, self._command_announceable())
473+
except Exception as e:
474+
err = '{} (start): {}'.format(job_id, e)
475+
log_event(self.app, err, logging.WARNING)
476+
raise
477+
478+
@transition(field=state, source=UP, target=DOWN, on_error=ERROR)
445479
def stop(self):
446-
self._scheduler.stop(self._job_id, self._command_announceable())
480+
job_id = self._job_id
481+
try:
482+
self._scheduler.stop(job_id, self._command_announceable())
483+
except Exception as e:
484+
err = '{} (stop): {}'.format(job_id, e)
485+
log_event(self.app, err, logging.ERROR)
486+
raise
447487

448-
@transition(field=state,
449-
source=[INITIALIZED, CREATED, UP, DOWN],
450-
target=DESTROYED)
488+
@transition(field=state, source='*', target=DESTROYED, on_error=ERROR)
451489
def destroy(self):
452-
self._scheduler.destroy(self._job_id, self._command_announceable())
490+
job_id = self._job_id
491+
try:
492+
self._scheduler.destroy(job_id, self._command_announceable())
493+
except Exception as e:
494+
err = '{} (destroy): {}'.format(job_id, e)
495+
log_event(self.app, err, logging.ERROR)
496+
raise
453497

454498
def run(self, command):
455499
"""Run a one-off command"""
456500
image = self.release.image + ':v' + str(self.release.version)
457-
rc, output = self._scheduler.run(self._job_id, image, command)
458-
return rc, output
501+
job_id = self._job_id
502+
try:
503+
rc, output = self._scheduler.run(job_id, image, command)
504+
return rc, output
505+
except Exception as e:
506+
err = '{} (run): {}'.format(job_id, e)
507+
log_event(self.app, err, logging.ERROR)
508+
raise
459509

460510

461511
@python_2_unicode_compatible
@@ -579,7 +629,8 @@ def new(self, user, config=None, build=None, summary=None, source_version='lates
579629
owner=user, app=self.app, config=config,
580630
build=build, version=new_version, image=target_image, summary=summary)
581631
# IOW, this image did not come from the builder
582-
if not build.sha:
632+
# FIXME: remove check for mock registry module
633+
if not build.sha and 'mock' not in settings.REGISTRY_MODULE:
583634
# we assume that the image is not present on our registry,
584635
# so shell out a task to pull in the repository
585636
data = {

controller/api/tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,4 @@ def run_tests(self, test_labels, extra_tests=None, **kwargs):
5858
from .test_key import * # noqa
5959
from .test_perm import * # noqa
6060
from .test_release import * # noqa
61+
from .test_scheduler import * # noqa

controller/api/tests/test_container.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@ def setUp(self):
3838
response = self.client.post('/api/clusters', json.dumps(body),
3939
content_type='application/json')
4040
self.assertEqual(response.status_code, 201)
41-
# create a malicious scheduler as well
42-
body['id'] = 'autotest2'
43-
body['type'] = 'faulty'
44-
response = self.client.post('/api/clusters', json.dumps(body),
45-
content_type='application/json')
46-
self.assertEqual(response.status_code, 201)
4741

4842
def test_container_state_good(self):
4943
"""Test that the finite state machine transitions with a good scheduler"""
@@ -65,42 +59,9 @@ def test_container_state_good(self):
6559
self.assertEqual(c.state, 'created')
6660
c.start()
6761
self.assertEqual(c.state, 'up')
68-
c.deploy(App.objects.get(id=app_id).release_set.latest())
69-
self.assertEqual(c.state, 'up')
7062
c.destroy()
7163
self.assertEqual(c.state, 'destroyed')
7264

73-
def test_container_state_bad(self):
74-
"""Test that the finite state machine transitions with a faulty scheduler"""
75-
url = '/api/apps'
76-
body = {'cluster': 'autotest2'}
77-
response = self.client.post(url, json.dumps(body), content_type='application/json')
78-
self.assertEqual(response.status_code, 201)
79-
app_id = response.data['id']
80-
# create a container
81-
c = Container.objects.create(owner=User.objects.get(username='autotest'),
82-
app=App.objects.get(id=app_id),
83-
release=App.objects.get(id=app_id).release_set.latest(),
84-
type='web',
85-
num=1)
86-
self.assertEqual(c.state, 'initialized')
87-
self.assertRaises(Exception, lambda: c.create())
88-
self.assertEqual(c.state, 'initialized')
89-
# test an illegal transition
90-
self.assertRaises(TransitionNotAllowed, lambda: c.start())
91-
self.assertEqual(c.state, 'initialized')
92-
self.assertRaises(
93-
Exception,
94-
lambda: c.deploy(
95-
App.objects.get(id=app_id).release_set.latest()
96-
)
97-
)
98-
self.assertEqual(c.state, 'down')
99-
self.assertRaises(Exception, lambda: c.destroy())
100-
self.assertEqual(c.state, 'down')
101-
self.assertRaises(Exception, lambda: c.run('echo hello world'))
102-
self.assertEqual(c.state, 'down')
103-
10465
def test_container_state_protected(self):
10566
"""Test that you cannot directly modify the state"""
10667
url = '/api/apps'

0 commit comments

Comments
 (0)