Skip to content

Commit e7ae1d8

Browse files
author
Gabriel Monroy
committed
refactor(controller): the great celery purge
1 parent 4400979 commit e7ae1d8

20 files changed

Lines changed: 139 additions & 238 deletions

controller/api/models.py

Lines changed: 130 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import re
1313
import subprocess
1414
import time
15+
import threading
1516

16-
from celery.canvas import group
1717
from django.conf import settings
1818
from django.contrib.auth.models import User
1919
from django.core.exceptions import ValidationError
@@ -24,9 +24,11 @@
2424
from django.utils.encoding import python_2_unicode_compatible
2525
from django_fsm import FSMField, transition
2626
from django_fsm.signals import post_transition
27+
from docker.utils import utils
2728
from json_field.fields import JSONField
29+
import requests
2830

29-
from api import fields, tasks
31+
from api import fields
3032
from registry import publish_release
3133
from utils import dict_diff, fingerprint
3234

@@ -119,13 +121,13 @@ def create(self):
119121
"""
120122
Initialize a cluster's router and log aggregator
121123
"""
122-
return tasks.create_cluster.delay(self).get()
124+
return self._scheduler.setUp()
123125

124126
def destroy(self):
125127
"""
126128
Destroy a cluster's router and log aggregator
127129
"""
128-
return tasks.destroy_cluster.delay(self).get()
130+
return self._scheduler.tearDown()
129131

130132

131133
@python_2_unicode_compatible
@@ -162,63 +164,87 @@ def log(self, message):
162164
f.write(msg.encode('utf-8'))
163165

164166
def create(self, *args, **kwargs):
167+
"""Create a new application with an initial release"""
165168
config = Config.objects.create(owner=self.owner, app=self)
166169
build = Build.objects.create(owner=self.owner, app=self, image=settings.DEFAULT_BUILD)
167170
Release.objects.create(version=1, owner=self.owner, app=self, config=config, build=build)
168171

169172
def delete(self, *args, **kwargs):
173+
"""Delete this application including all containers"""
170174
for c in self.container_set.all():
171175
c.destroy()
172-
# delete application logs stored by deis/logger
176+
self._clean_app_logs()
177+
return super(App, self).delete(*args, **kwargs)
178+
179+
def _clean_app_logs(self):
180+
"""Delete application logs stored by the logger component"""
173181
path = os.path.join(settings.DEIS_LOG_DIR, self.id + '.log')
174182
if os.path.exists(path):
175183
os.remove(path)
176-
return super(App, self).delete(*args, **kwargs)
177184

178-
def deploy(self, release, initial=False):
179-
tasks.deploy_release.delay(self, release).get()
185+
def deploy(self, user, release, initial=False):
186+
"""Deploy a new release to this application"""
187+
containers = self.container_set.all()
188+
self._deploy_containers(containers, release)
189+
# update release in database
190+
for c in containers:
191+
c.release = release
192+
c.save()
193+
self.release = release
194+
self.save()
195+
# perform default scaling if necessary
180196
if initial:
181-
# if there is no SHA, assume a docker image is being promoted
182-
if not release.build.sha:
183-
self.structure = {'cmd': 1}
184-
# if a dockerfile exists without a procfile, assume docker workflow
185-
elif release.build.dockerfile and not release.build.procfile:
186-
self.structure = {'cmd': 1}
187-
# if a procfile exists without a web entry, assume docker workflow
188-
elif release.build.procfile and 'web' not in release.build.procfile:
189-
self.structure = {'cmd': 1}
190-
# default to heroku workflow
191-
else:
192-
self.structure = {'web': 1}
193-
self.save()
194-
self.scale()
195-
196-
def destroy(self, *args, **kwargs):
197-
return self.delete(*args, **kwargs)
198-
199-
def scale(self, **kwargs): # noqa
200-
"""Scale containers up or down to match requested."""
201-
requested_containers = self.structure.copy()
197+
self._default_scale(user, release)
198+
199+
def _default_scale(self, user, release):
200+
"""Scale to default structure based on release type"""
201+
# if there is no SHA, assume a docker image is being promoted
202+
if not release.build.sha:
203+
structure = {'cmd': 1}
204+
# if a dockerfile exists without a procfile, assume docker workflow
205+
elif release.build.dockerfile and not release.build.procfile:
206+
structure = {'cmd': 1}
207+
# if a procfile exists without a web entry, assume docker workflow
208+
elif release.build.procfile and 'web' not in release.build.procfile:
209+
structure = {'cmd': 1}
210+
# default to heroku workflow
211+
else:
212+
structure = {'web': 1}
213+
self.scale(user, structure)
214+
215+
def _deploy_containers(self, to_deploy, release, **kwargs):
216+
"""Deploys containers via the scheduler"""
217+
threads = []
218+
for c in to_deploy:
219+
threads.append(threading.Thread(target=c.deploy, args=(release,)))
220+
[t.start() for t in threads]
221+
[t.join() for t in threads]
222+
223+
def scale(self, user, structure): # noqa
224+
"""Scale containers up or down to match requested structure."""
225+
requested_structure = structure.copy()
202226
release = self.release_set.latest()
203227
# test for available process types
204228
available_process_types = release.build.procfile or {}
205-
for container_type in requested_containers.keys():
229+
for container_type in requested_structure.keys():
206230
if container_type == 'cmd':
207231
continue # allow docker cmd types in case we don't have the image source
208232
if container_type not in available_process_types:
209233
raise EnvironmentError(
210234
'Container type {} does not exist in application'.format(container_type))
211-
msg = 'containers scaled ' + ' '.join(
212-
"{}={}".format(k, v) for k, v in requested_containers.items())
235+
msg = '{} scaled containers '.format(user.username) + ' '.join(
236+
"{}={}".format(k, v) for k, v in requested_structure.items())
237+
log_event(self, msg)
238+
self.log(msg)
213239
# iterate and scale by container type (web, worker, etc)
214240
changed = False
215241
to_add, to_remove = [], []
216-
for container_type in requested_containers.keys():
242+
for container_type in requested_structure.keys():
217243
containers = list(self.container_set.filter(type=container_type).order_by('created'))
218244
# increment new container nums off the most recent container
219245
results = self.container_set.filter(type=container_type).aggregate(Max('num'))
220246
container_num = (results.get('num__max') or 0) + 1
221-
requested = requested_containers.pop(container_type)
247+
requested = requested_structure.pop(container_type)
222248
diff = requested - len(containers)
223249
if diff == 0:
224250
continue
@@ -228,6 +254,7 @@ def scale(self, **kwargs): # noqa
228254
to_remove.append(c)
229255
diff += 1
230256
while diff > 0:
257+
# create a database record
231258
c = Container.objects.create(owner=self.owner,
232259
app=self,
233260
release=release,
@@ -237,16 +264,38 @@ def scale(self, **kwargs): # noqa
237264
container_num += 1
238265
diff -= 1
239266
if changed:
240-
subtasks = []
241267
if to_add:
242-
subtasks.append(tasks.start_containers.s(to_add))
268+
self._start_containers(to_add)
243269
if to_remove:
244-
subtasks.append(tasks.stop_containers.s(to_remove))
245-
group(*subtasks).apply_async().join()
246-
log_event(self, msg)
247-
self.log(msg)
270+
self._destroy_containers(to_remove)
271+
# remove the database record
272+
for c in to_remove:
273+
c.delete()
274+
# save new structure to the database
275+
self.structure = structure
276+
self.save()
248277
return changed
249278

279+
def _start_containers(self, to_add):
280+
"""Creates and starts containers via the scheduler"""
281+
create_threads = []
282+
start_threads = []
283+
for c in to_add:
284+
create_threads.append(threading.Thread(target=c.create))
285+
start_threads.append(threading.Thread(target=c.start))
286+
[t.start() for t in create_threads]
287+
[t.join() for t in create_threads]
288+
[t.start() for t in start_threads]
289+
[t.join() for t in start_threads]
290+
291+
def _destroy_containers(self, to_destroy):
292+
"""Destroys containers via the scheduler"""
293+
destroy_threads = []
294+
for c in to_destroy:
295+
destroy_threads.append(threading.Thread(target=c.destroy))
296+
[t.start() for t in destroy_threads]
297+
[t.join() for t in destroy_threads]
298+
250299
def logs(self):
251300
"""Return aggregated log data for this application."""
252301
path = os.path.join(settings.DEIS_LOG_DIR, self.id + '.log')
@@ -255,20 +304,37 @@ def logs(self):
255304
data = subprocess.check_output(['tail', '-n', str(settings.LOG_LINES), path])
256305
return data
257306

258-
def run(self, command):
307+
def run(self, user, command):
259308
"""Run a one-off command in an ephemeral app container."""
260309
# TODO: add support for interactive shell
261-
msg = "deis run '{}'".format(command)
310+
msg = "{} runs '{}'".format(user.username, command)
262311
log_event(self, msg)
263312
self.log(msg)
264313
c_num = max([c.num for c in self.container_set.filter(type='admin')] or [0]) + 1
265-
c = Container.objects.create(owner=self.owner,
266-
app=self,
267-
release=self.release_set.latest(),
268-
type='admin',
269-
num=c_num)
270-
rc, output = tasks.run_command.delay(c, command).get()
271-
return rc, output
314+
try:
315+
# create database record for admin process
316+
c = Container.objects.create(owner=self.owner,
317+
app=self,
318+
release=self.release_set.latest(),
319+
type='admin',
320+
num=c_num)
321+
image = c.release.image + ':v' + str(c.release.version)
322+
323+
# check for backwards compatibility
324+
def _has_hostname(image):
325+
repo, tag = utils.parse_repository_tag(image)
326+
return True if '/' in repo and '.' in repo.split('/')[0] else False
327+
328+
if not _has_hostname(image):
329+
image = '{}:{}/{}'.format(settings.REGISTRY_HOST,
330+
settings.REGISTRY_PORT,
331+
image)
332+
# SECURITY: shell-escape user input
333+
escaped_command = command.replace("'", "'\\''")
334+
return c.run(escaped_command)
335+
# always cleanup admin containers
336+
finally:
337+
c.delete()
272338

273339

274340
@python_2_unicode_compatible
@@ -354,13 +420,11 @@ def start(self):
354420

355421
@transition(field=state,
356422
source=[INITIALIZED, CREATED, UP, DOWN],
357-
target=UP,
358-
crashed=DOWN)
359-
def deploy(self, release):
423+
target=UP, crashed=DOWN)
424+
def deploy(self, new_release):
360425
old_job_id = self._job_id
361426
# update release
362-
self.release = release
363-
self.save()
427+
self.release = new_release
364428
# deploy new container
365429
new_job_id = self._job_id
366430
image = self.release.image
@@ -385,12 +449,8 @@ def stop(self):
385449
source=[INITIALIZED, CREATED, UP, DOWN],
386450
target=DESTROYED)
387451
def destroy(self):
388-
# TODO: add check for active connections before killing
389452
self._scheduler.destroy(self._job_id, self._command_announceable())
390453

391-
@transition(field=state,
392-
source=[INITIALIZED, CREATED, DESTROYED],
393-
target=DESTROYED)
394454
def run(self, command):
395455
"""Run a one-off command"""
396456
rc, output = self._scheduler.run(self._job_id, self.release.image, command)
@@ -521,7 +581,14 @@ def new(self, user, config=None, build=None, summary=None, source_version='lates
521581
if not build.sha:
522582
# we assume that the image is not present on our registry,
523583
# so shell out a task to pull in the repository
524-
tasks.import_repository.delay(build.image, self.app.id).get()
584+
data = {
585+
'src': build.image
586+
}
587+
requests.post(
588+
'{}/v1/repositories/{}/tags'.format(settings.REGISTRY_URL,
589+
self.app.id),
590+
data=data,
591+
)
525592
# update the source image to the repository we just imported
526593
source_image = self.app.id
527594
# if the image imported had a tag specified, use that tag as the source
@@ -737,6 +804,10 @@ def _etcd_publish_domains(**kwargs):
737804
# save FSM transitions as they happen
738805
def _save_transition(**kwargs):
739806
kwargs['instance'].save()
807+
# close database connections after transition
808+
# to avoid leaking connections inside threads
809+
from django.db import connection
810+
connection.close()
740811

741812
post_transition.connect(_save_transition)
742813

0 commit comments

Comments
 (0)