@@ -255,8 +255,18 @@ def scale(self, user, structure): # noqa
255255 changed = True
256256 while diff < 0 :
257257 c = containers .pop ()
258- to_remove .append (c )
258+ if settings .SCHEDULER_MODULE == "scheduler.k8s" :
259+ if len (containers ) == 0 :
260+ t = Thread (target = c .scale )
261+ t .start ()
262+ t .join ()
263+ c .delete ()
264+ else :
265+ to_remove .append (c )
259266 diff += 1
267+ if diff == 0 and len (containers ) != 0 :
268+ c = containers .pop ()
269+ self ._start_containers ([c ])
260270 while diff > 0 :
261271 # create a database record
262272 c = Container .objects .create (owner = self .owner ,
@@ -269,7 +279,10 @@ def scale(self, user, structure): # noqa
269279 diff -= 1
270280 if changed :
271281 if to_add :
272- self ._start_containers (to_add )
282+ if settings .SCHEDULER_MODULE == "scheduler.k8s" :
283+ self ._start_containers ([to_add .pop ()])
284+ else :
285+ self ._start_containers (to_add )
273286 if to_remove :
274287 self ._destroy_containers (to_remove )
275288 # save new structure to the database
@@ -289,15 +302,15 @@ def _start_containers(self, to_add):
289302 start_threads = [Thread (target = c .start ) for c in to_add ]
290303 [t .start () for t in create_threads ]
291304 [t .join () for t in create_threads ]
292- if any (c .state != 'created' for c in to_add ):
293- err = 'aborting, failed to create some containers'
305+ if settings . SCHEDULER_MODULE != "scheduler.k8s" and any (c .state != 'created' for c in to_add ):
306+ err = 'aborting, failed to create some containers ' + c . state
294307 log_event (self , err , logging .ERROR )
295308 self ._destroy_containers (to_add )
296309 raise RuntimeError (err )
297310 [t .start () for t in start_threads ]
298311 [t .join () for t in start_threads ]
299312 if set ([c .state for c in to_add ]) != set (['up' ]):
300- err = 'warning, some containers failed to start'
313+ err = 'warning, some containers failed to start' + c . state
301314 log_event (self , err , logging .WARNING )
302315
303316 def _restart_containers (self , to_restart ):
@@ -321,12 +334,18 @@ def _destroy_containers(self, to_destroy):
321334 """Destroys containers via the scheduler"""
322335 if not to_destroy :
323336 return
324- destroy_threads = [Thread (target = c .destroy ) for c in to_destroy ]
337+ if settings .SCHEDULER_MODULE == "scheduler.k8s" :
338+ destroy_threads = [Thread (target = to_destroy [0 ].destroy )]
339+ else :
340+ destroy_threads = [Thread (target = c .destroy ) for c in to_destroy ]
325341 [t .start () for t in destroy_threads ]
326342 [t .join () for t in destroy_threads ]
327- [c .delete () for c in to_destroy if c .state == 'destroyed' ]
343+ if settings .SCHEDULER_MODULE == "scheduler.k8s" :
344+ [c .delete () for c in to_destroy ]
345+ else :
346+ [c .delete () for c in to_destroy if c .state == 'destroyed' ]
328347 if any (c .state != 'destroyed' for c in to_destroy ):
329- err = 'aborting, failed to destroy some containers'
348+ err = 'aborting, failed to destroy some containers' + c . state
330349 log_event (self , err , logging .ERROR )
331350 raise RuntimeError (err )
332351
@@ -339,7 +358,10 @@ def deploy(self, user, release):
339358 n .save ()
340359 new .append (n )
341360
342- self ._start_containers (new )
361+ if new and settings .SCHEDULER_MODULE == "scheduler.k8s" :
362+ self ._start_containers ([new .pop ()])
363+ else :
364+ self ._start_containers (new )
343365
344366 # destroy old containers
345367 if existing :
@@ -482,7 +504,9 @@ def create(self):
482504 image = self .release .image
483505 kwargs = {'memory' : self .release .config .memory ,
484506 'cpu' : self .release .config .cpu ,
485- 'tags' : self .release .config .tags }
507+ 'tags' : self .release .config .tags ,
508+ 'aname' : self .app .id ,
509+ 'num' : self .num }
486510 job_id = self ._job_id
487511 try :
488512 self ._scheduler .create (
@@ -495,6 +519,20 @@ def create(self):
495519 log_event (self .app , err , logging .ERROR )
496520 raise
497521
522+ @close_db_connections
523+ def scale (self ):
524+ image = self .release .image
525+ job_id = self ._job_id
526+ try :
527+ self ._scheduler .scale (
528+ name = job_id ,
529+ image = image ,
530+ num = 0 )
531+ except Exception as e :
532+ err = '{} (scale): {}' .format (job_id , e )
533+ log_event (self .app , err , logging .ERROR )
534+ raise
535+
498536 @close_db_connections
499537 def start (self ):
500538 job_id = self ._job_id
0 commit comments