1212import re
1313import subprocess
1414import time
15+ import threading
1516
16- from celery .canvas import group
1717from django .conf import settings
1818from django .contrib .auth .models import User
1919from django .core .exceptions import ValidationError
2424from django .utils .encoding import python_2_unicode_compatible
2525from django_fsm import FSMField , transition
2626from django_fsm .signals import post_transition
27+ from docker .utils import utils
2728from json_field .fields import JSONField
29+ import requests
2830
29- from api import fields , tasks
31+ from api import fields
3032from registry import publish_release
3133from utils import dict_diff , fingerprint
3234
@@ -119,13 +121,13 @@ def create(self):
119121 """
120122 Initialize a cluster's router and log aggregator
121123 """
122- return tasks . create_cluster . delay ( self ). get ()
124+ return self . _scheduler . setUp ()
123125
124126 def destroy (self ):
125127 """
126128 Destroy a cluster's router and log aggregator
127129 """
128- return tasks . destroy_cluster . delay ( self ). get ()
130+ return self . _scheduler . tearDown ()
129131
130132
131133@python_2_unicode_compatible
@@ -162,63 +164,87 @@ def log(self, message):
162164 f .write (msg .encode ('utf-8' ))
163165
164166 def create (self , * args , ** kwargs ):
167+ """Create a new application with an initial release"""
165168 config = Config .objects .create (owner = self .owner , app = self )
166169 build = Build .objects .create (owner = self .owner , app = self , image = settings .DEFAULT_BUILD )
167170 Release .objects .create (version = 1 , owner = self .owner , app = self , config = config , build = build )
168171
169172 def delete (self , * args , ** kwargs ):
173+ """Delete this application including all containers"""
170174 for c in self .container_set .all ():
171175 c .destroy ()
172- # delete application logs stored by deis/logger
176+ self ._clean_app_logs ()
177+ return super (App , self ).delete (* args , ** kwargs )
178+
179+ def _clean_app_logs (self ):
180+ """Delete application logs stored by the logger component"""
173181 path = os .path .join (settings .DEIS_LOG_DIR , self .id + '.log' )
174182 if os .path .exists (path ):
175183 os .remove (path )
176- return super (App , self ).delete (* args , ** kwargs )
177184
178- def deploy (self , release , initial = False ):
179- tasks .deploy_release .delay (self , release ).get ()
185+ def deploy (self , user , release , initial = False ):
186+ """Deploy a new release to this application"""
187+ containers = self .container_set .all ()
188+ self ._deploy_containers (containers , release )
189+ # update release in database
190+ for c in containers :
191+ c .release = release
192+ c .save ()
193+ self .release = release
194+ self .save ()
195+ # perform default scaling if necessary
180196 if initial :
181- # if there is no SHA, assume a docker image is being promoted
182- if not release .build .sha :
183- self .structure = {'cmd' : 1 }
184- # if a dockerfile exists without a procfile, assume docker workflow
185- elif release .build .dockerfile and not release .build .procfile :
186- self .structure = {'cmd' : 1 }
187- # if a procfile exists without a web entry, assume docker workflow
188- elif release .build .procfile and 'web' not in release .build .procfile :
189- self .structure = {'cmd' : 1 }
190- # default to heroku workflow
191- else :
192- self .structure = {'web' : 1 }
193- self .save ()
194- self .scale ()
195-
196- def destroy (self , * args , ** kwargs ):
197- return self .delete (* args , ** kwargs )
198-
199- def scale (self , ** kwargs ): # noqa
200- """Scale containers up or down to match requested."""
201- requested_containers = self .structure .copy ()
197+ self ._default_scale (user , release )
198+
199+ def _default_scale (self , user , release ):
200+ """Scale to default structure based on release type"""
201+ # if there is no SHA, assume a docker image is being promoted
202+ if not release .build .sha :
203+ structure = {'cmd' : 1 }
204+ # if a dockerfile exists without a procfile, assume docker workflow
205+ elif release .build .dockerfile and not release .build .procfile :
206+ structure = {'cmd' : 1 }
207+ # if a procfile exists without a web entry, assume docker workflow
208+ elif release .build .procfile and 'web' not in release .build .procfile :
209+ structure = {'cmd' : 1 }
210+ # default to heroku workflow
211+ else :
212+ structure = {'web' : 1 }
213+ self .scale (user , structure )
214+
215+ def _deploy_containers (self , to_deploy , release , ** kwargs ):
216+ """Deploys containers via the scheduler"""
217+ threads = []
218+ for c in to_deploy :
219+ threads .append (threading .Thread (target = c .deploy , args = (release ,)))
220+ [t .start () for t in threads ]
221+ [t .join () for t in threads ]
222+
223+ def scale (self , user , structure ): # noqa
224+ """Scale containers up or down to match requested structure."""
225+ requested_structure = structure .copy ()
202226 release = self .release_set .latest ()
203227 # test for available process types
204228 available_process_types = release .build .procfile or {}
205- for container_type in requested_containers .keys ():
229+ for container_type in requested_structure .keys ():
206230 if container_type == 'cmd' :
207231 continue # allow docker cmd types in case we don't have the image source
208232 if container_type not in available_process_types :
209233 raise EnvironmentError (
210234 'Container type {} does not exist in application' .format (container_type ))
211- msg = 'containers scaled ' + ' ' .join (
212- "{}={}" .format (k , v ) for k , v in requested_containers .items ())
235+ msg = '{} scaled containers ' .format (user .username ) + ' ' .join (
236+ "{}={}" .format (k , v ) for k , v in requested_structure .items ())
237+ log_event (self , msg )
238+ self .log (msg )
213239 # iterate and scale by container type (web, worker, etc)
214240 changed = False
215241 to_add , to_remove = [], []
216- for container_type in requested_containers .keys ():
242+ for container_type in requested_structure .keys ():
217243 containers = list (self .container_set .filter (type = container_type ).order_by ('created' ))
218244 # increment new container nums off the most recent container
219245 results = self .container_set .filter (type = container_type ).aggregate (Max ('num' ))
220246 container_num = (results .get ('num__max' ) or 0 ) + 1
221- requested = requested_containers .pop (container_type )
247+ requested = requested_structure .pop (container_type )
222248 diff = requested - len (containers )
223249 if diff == 0 :
224250 continue
@@ -228,6 +254,7 @@ def scale(self, **kwargs): # noqa
228254 to_remove .append (c )
229255 diff += 1
230256 while diff > 0 :
257+ # create a database record
231258 c = Container .objects .create (owner = self .owner ,
232259 app = self ,
233260 release = release ,
@@ -237,16 +264,38 @@ def scale(self, **kwargs): # noqa
237264 container_num += 1
238265 diff -= 1
239266 if changed :
240- subtasks = []
241267 if to_add :
242- subtasks . append ( tasks . start_containers . s ( to_add ) )
268+ self . _start_containers ( to_add )
243269 if to_remove :
244- subtasks .append (tasks .stop_containers .s (to_remove ))
245- group (* subtasks ).apply_async ().join ()
246- log_event (self , msg )
247- self .log (msg )
270+ self ._destroy_containers (to_remove )
271+ # remove the database record
272+ for c in to_remove :
273+ c .delete ()
274+ # save new structure to the database
275+ self .structure = structure
276+ self .save ()
248277 return changed
249278
279+ def _start_containers (self , to_add ):
280+ """Creates and starts containers via the scheduler"""
281+ create_threads = []
282+ start_threads = []
283+ for c in to_add :
284+ create_threads .append (threading .Thread (target = c .create ))
285+ start_threads .append (threading .Thread (target = c .start ))
286+ [t .start () for t in create_threads ]
287+ [t .join () for t in create_threads ]
288+ [t .start () for t in start_threads ]
289+ [t .join () for t in start_threads ]
290+
291+ def _destroy_containers (self , to_destroy ):
292+ """Destroys containers via the scheduler"""
293+ destroy_threads = []
294+ for c in to_destroy :
295+ destroy_threads .append (threading .Thread (target = c .destroy ))
296+ [t .start () for t in destroy_threads ]
297+ [t .join () for t in destroy_threads ]
298+
250299 def logs (self ):
251300 """Return aggregated log data for this application."""
252301 path = os .path .join (settings .DEIS_LOG_DIR , self .id + '.log' )
@@ -255,20 +304,37 @@ def logs(self):
255304 data = subprocess .check_output (['tail' , '-n' , str (settings .LOG_LINES ), path ])
256305 return data
257306
258- def run (self , command ):
307+ def run (self , user , command ):
259308 """Run a one-off command in an ephemeral app container."""
260309 # TODO: add support for interactive shell
261- msg = "deis run '{}'" .format (command )
310+ msg = "{} runs '{}'" .format (user . username , command )
262311 log_event (self , msg )
263312 self .log (msg )
264313 c_num = max ([c .num for c in self .container_set .filter (type = 'admin' )] or [0 ]) + 1
265- c = Container .objects .create (owner = self .owner ,
266- app = self ,
267- release = self .release_set .latest (),
268- type = 'admin' ,
269- num = c_num )
270- rc , output = tasks .run_command .delay (c , command ).get ()
271- return rc , output
314+ try :
315+ # create database record for admin process
316+ c = Container .objects .create (owner = self .owner ,
317+ app = self ,
318+ release = self .release_set .latest (),
319+ type = 'admin' ,
320+ num = c_num )
321+ image = c .release .image + ':v' + str (c .release .version )
322+
323+ # check for backwards compatibility
324+ def _has_hostname (image ):
325+ repo , tag = utils .parse_repository_tag (image )
326+ return True if '/' in repo and '.' in repo .split ('/' )[0 ] else False
327+
328+ if not _has_hostname (image ):
329+ image = '{}:{}/{}' .format (settings .REGISTRY_HOST ,
330+ settings .REGISTRY_PORT ,
331+ image )
332+ # SECURITY: shell-escape user input
333+ escaped_command = command .replace ("'" , "'\\ ''" )
334+ return c .run (escaped_command )
335+ # always cleanup admin containers
336+ finally :
337+ c .delete ()
272338
273339
274340@python_2_unicode_compatible
@@ -354,13 +420,11 @@ def start(self):
354420
355421 @transition (field = state ,
356422 source = [INITIALIZED , CREATED , UP , DOWN ],
357- target = UP ,
358- crashed = DOWN )
359- def deploy (self , release ):
423+ target = UP , crashed = DOWN )
424+ def deploy (self , new_release ):
360425 old_job_id = self ._job_id
361426 # update release
362- self .release = release
363- self .save ()
427+ self .release = new_release
364428 # deploy new container
365429 new_job_id = self ._job_id
366430 image = self .release .image
@@ -385,12 +449,8 @@ def stop(self):
385449 source = [INITIALIZED , CREATED , UP , DOWN ],
386450 target = DESTROYED )
387451 def destroy (self ):
388- # TODO: add check for active connections before killing
389452 self ._scheduler .destroy (self ._job_id , self ._command_announceable ())
390453
391- @transition (field = state ,
392- source = [INITIALIZED , CREATED , DESTROYED ],
393- target = DESTROYED )
394454 def run (self , command ):
395455 """Run a one-off command"""
396456 rc , output = self ._scheduler .run (self ._job_id , self .release .image , command )
@@ -521,7 +581,14 @@ def new(self, user, config=None, build=None, summary=None, source_version='lates
521581 if not build .sha :
522582 # we assume that the image is not present on our registry,
523583 # so shell out a task to pull in the repository
524- tasks .import_repository .delay (build .image , self .app .id ).get ()
584+ data = {
585+ 'src' : build .image
586+ }
587+ requests .post (
588+ '{}/v1/repositories/{}/tags' .format (settings .REGISTRY_URL ,
589+ self .app .id ),
590+ data = data ,
591+ )
525592 # update the source image to the repository we just imported
526593 source_image = self .app .id
527594 # if the image imported had a tag specified, use that tag as the source
@@ -737,6 +804,10 @@ def _etcd_publish_domains(**kwargs):
737804# save FSM transitions as they happen
738805def _save_transition (** kwargs ):
739806 kwargs ['instance' ].save ()
807+ # close database connections after transition
808+ # to avoid leaking connections inside threads
809+ from django .db import connection
810+ connection .close ()
740811
741812post_transition .connect (_save_transition )
742813
0 commit comments