3131import requests
3232from rest_framework .authtoken .models import Token
3333
34- from api import fields , utils
34+ from api import fields , utils , exceptions
3535from registry import publish_release
3636from utils import dict_diff , fingerprint
3737
@@ -118,6 +118,20 @@ def validate_certificate(value):
118118 raise ValidationError ('Could not load certificate: {}' .format (e ))
119119
120120
121+ def get_etcd_client ():
122+ if not hasattr (get_etcd_client , "client" ):
123+ # wire up etcd publishing if we can connect
124+ try :
125+ get_etcd_client .client = etcd .Client (
126+ host = settings .ETCD_HOST ,
127+ port = int (settings .ETCD_PORT ))
128+ get_etcd_client .client .get ('/deis' )
129+ except etcd .EtcdException :
130+ logger .log (logging .WARNING , 'Cannot synchronize with etcd cluster' )
131+ get_etcd_client .client = None
132+ return get_etcd_client .client
133+
134+
121135class AuditedModel (models .Model ):
122136 """Add created and updated fields to a model."""
123137
@@ -353,6 +367,58 @@ def _start_containers(self, to_add):
353367 if set ([c .state for c in to_add ]) != set (['up' ]):
354368 err = 'warning, some containers failed to start'
355369 log_event (self , err , logging .WARNING )
370+ # if the user specified a health check, try checking to see if it's running
371+ try :
372+ config = self .config_set .latest ()
373+ if 'HEALTHCHECK_URL' in config .values .keys ():
374+ self ._healthcheck (to_add , config .values )
375+ except Config .DoesNotExist :
376+ pass
377+
378+ def _healthcheck (self , containers , config ):
379+ # if at first it fails, back off and try again at 10%, 50% and 100% of INITIAL_DELAY
380+ intervals = [1.0 , 0.1 , 0.5 , 1.0 ]
381+ # HACK (bacongobbler): we need to wait until publisher has a chance to publish each
382+ # service to etcd, which can take up to 20 seconds.
383+ time .sleep (20 )
384+ for i in range (0 , 4 ):
385+ delay = int (config .get ('HEALTHCHECK_INITIAL_DELAY' , 0 ))
386+ try :
387+ # sleep until the initial timeout is over
388+ if delay > 0 :
389+ time .sleep (delay * intervals [i ])
390+ self ._do_healthcheck (containers , config )
391+ break
392+ except exceptions .HealthcheckException as e :
393+ try :
394+ new_delay = delay * intervals [i + 1 ]
395+ msg = "{}; trying again in {} seconds" .format (e , new_delay )
396+ log_event (self , msg , logging .WARNING )
397+ except IndexError :
398+ log_event (self , e , logging .WARNING )
399+ else :
400+ self ._destroy_containers (containers )
401+ msg = "aborting, app containers failed to respond to health check"
402+ log_event (self , msg , logging .ERROR )
403+ raise RuntimeError (msg )
404+
405+ def _do_healthcheck (self , containers , config ):
406+ path = config .get ('HEALTHCHECK_URL' , '/' )
407+ timeout = int (config .get ('HEALTHCHECK_TIMEOUT' , 1 ))
408+ if not _etcd_client :
409+ raise exceptions .HealthcheckException ('no etcd client available' )
410+ for container in containers :
411+ try :
412+ key = "/deis/services/{self}/{container.job_id}" .format (** locals ())
413+ url = "http://{}{}" .format (_etcd_client .get (key ).value , path )
414+ response = requests .get (url , timeout = timeout )
415+ if response .status_code != requests .codes .OK :
416+ raise exceptions .HealthcheckException (
417+ "app failed health check (got '{}', expected: '200')" .format (
418+ response .status_code ))
419+ except (requests .Timeout , requests .ConnectionError , KeyError ) as e :
420+ raise exceptions .HealthcheckException (
421+ 'failed to connect to container ({})' .format (e ))
356422
357423 def _restart_containers (self , to_restart ):
358424 """Restarts containers via the scheduler"""
@@ -452,7 +518,7 @@ def _default_scale(self, user, release):
452518
453519 self .scale (user , structure )
454520
455- def logs (self , log_lines ):
521+ def logs (self , log_lines = str ( settings . LOG_LINES ) ):
456522 """Return aggregated log data for this application."""
457523 path = os .path .join (settings .DEIS_LOG_DIR , self .id + '.log' )
458524 if not os .path .exists (path ):
@@ -513,7 +579,7 @@ def _scheduler(self):
513579
514580 @property
515581 def state (self ):
516- return self ._scheduler .state (self ._job_id ).name
582+ return self ._scheduler .state (self .job_id ).name
517583
518584 def short_name (self ):
519585 return "{}.{}.{}" .format (self .app .id , self .type , self .num )
@@ -526,15 +592,10 @@ class Meta:
526592 get_latest_by = '-created'
527593 ordering = ['created' ]
528594
529- def _get_job_id (self ):
530- app = self .app .id
531- release = self .release
532- version = "v{}" .format (release .version )
533- num = self .num
534- job_id = "{app}_{version}.{self.type}.{num}" .format (** locals ())
535- return job_id
536-
537- _job_id = property (_get_job_id )
595+ @property
596+ def job_id (self ):
597+ version = "v{}" .format (self .release .version )
598+ return "{self.app.id}_{version}.{self.type}.{self.num}" .format (** locals ())
538599
539600 def _get_command (self ):
540601 try :
@@ -566,45 +627,41 @@ def create(self):
566627 kwargs = {'memory' : self .release .config .memory ,
567628 'cpu' : self .release .config .cpu ,
568629 'tags' : self .release .config .tags }
569- job_id = self ._job_id
570630 try :
571631 self ._scheduler .create (
572- name = job_id ,
632+ name = self . job_id ,
573633 image = image ,
574634 command = self ._command ,
575635 ** kwargs )
576636 except Exception as e :
577- err = '{} (create): {}' .format (job_id , e )
637+ err = '{} (create): {}' .format (self . job_id , e )
578638 log_event (self .app , err , logging .ERROR )
579639 raise
580640
581641 @close_db_connections
582642 def start (self ):
583- job_id = self ._job_id
584643 try :
585- self ._scheduler .start (job_id )
644+ self ._scheduler .start (self . job_id )
586645 except Exception as e :
587- err = '{} (start): {}' .format (job_id , e )
646+ err = '{} (start): {}' .format (self . job_id , e )
588647 log_event (self .app , err , logging .WARNING )
589648 raise
590649
591650 @close_db_connections
592651 def stop (self ):
593- job_id = self ._job_id
594652 try :
595- self ._scheduler .stop (job_id )
653+ self ._scheduler .stop (self . job_id )
596654 except Exception as e :
597- err = '{} (stop): {}' .format (job_id , e )
655+ err = '{} (stop): {}' .format (self . job_id , e )
598656 log_event (self .app , err , logging .ERROR )
599657 raise
600658
601659 @close_db_connections
602660 def destroy (self ):
603- job_id = self ._job_id
604661 try :
605- self ._scheduler .destroy (job_id )
662+ self ._scheduler .destroy (self . job_id )
606663 except Exception as e :
607- err = '{} (destroy): {}' .format (job_id , e )
664+ err = '{} (destroy): {}' .format (self . job_id , e )
608665 log_event (self .app , err , logging .ERROR )
609666 raise
610667
@@ -614,7 +671,6 @@ def run(self, command):
614671 raise EnvironmentError ('No build associated with this release '
615672 'to run this command' )
616673 image = self .release .image
617- job_id = self ._job_id
618674 entrypoint = '/bin/bash'
619675 # if this is a procfile-based app, switch the entrypoint to slugrunner's default
620676 # FIXME: remove slugrunner's hardcoded entrypoint
@@ -626,10 +682,10 @@ def run(self, command):
626682 else :
627683 command = "-c '{}'" .format (command )
628684 try :
629- rc , output = self ._scheduler .run (job_id , image , entrypoint , command )
685+ rc , output = self ._scheduler .run (self . job_id , image , entrypoint , command )
630686 return rc , output
631687 except Exception as e :
632- err = '{} (run): {}' .format (job_id , e )
688+ err = '{} (run): {}' .format (self . job_id , e )
633689 log_event (self .app , err , logging .ERROR )
634690 raise
635691
@@ -1103,6 +1159,34 @@ def _etcd_purge_cert(**kwargs):
11031159 pass
11041160
11051161
1162+ def _etcd_publish_config (** kwargs ):
1163+ config = kwargs ['instance' ]
1164+ # we purge all existing config when adding the newest instance. This is because
1165+ # deis config:unset would remove an existing value, but not delete the
1166+ # old config object
1167+ try :
1168+ _etcd_client .delete ('/deis/config/{}' .format (config .app ),
1169+ prevExist = True , dir = True , recursive = True )
1170+ except KeyError :
1171+ pass
1172+ if kwargs ['created' ]:
1173+ for k , v in config .values .iteritems ():
1174+ _etcd_client .write (
1175+ '/deis/config/{}/{}' .format (
1176+ config .app ,
1177+ unicode (k ).encode ('utf-8' ).lower ()),
1178+ unicode (v ).encode ('utf-8' ))
1179+
1180+
1181+ def _etcd_purge_config (** kwargs ):
1182+ config = kwargs ['instance' ]
1183+ try :
1184+ _etcd_client .delete ('/deis/config/{}' .format (config .app ),
1185+ prevExist = True , dir = True , recursive = True )
1186+ except KeyError :
1187+ pass
1188+
1189+
11061190def _etcd_publish_domains (** kwargs ):
11071191 domain = kwargs ['instance' ]
11081192 if kwargs ['created' ]:
@@ -1134,13 +1218,9 @@ def create_auth_token(sender, instance=None, created=False, **kwargs):
11341218 if created :
11351219 Token .objects .create (user = instance )
11361220
1137- # wire up etcd publishing if we can connect
1138- try :
1139- _etcd_client = etcd .Client (host = settings .ETCD_HOST , port = int (settings .ETCD_PORT ))
1140- _etcd_client .get ('/deis' )
1141- except etcd .EtcdException :
1142- logger .log (logging .WARNING , 'Cannot synchronize with etcd cluster' )
1143- _etcd_client = None
1221+
1222+ _etcd_client = get_etcd_client ()
1223+
11441224
11451225if _etcd_client :
11461226 post_save .connect (_etcd_publish_key , sender = Key , dispatch_uid = 'api.models' )
@@ -1152,3 +1232,5 @@ def create_auth_token(sender, instance=None, created=False, **kwargs):
11521232 post_delete .connect (_etcd_purge_app , sender = App , dispatch_uid = 'api.models' )
11531233 post_save .connect (_etcd_publish_cert , sender = Certificate , dispatch_uid = 'api.models' )
11541234 post_delete .connect (_etcd_purge_cert , sender = Certificate , dispatch_uid = 'api.models' )
1235+ post_save .connect (_etcd_publish_config , sender = Config , dispatch_uid = 'api.models' )
1236+ post_delete .connect (_etcd_purge_config , sender = Config , dispatch_uid = 'api.models' )
0 commit comments