Skip to content

Commit fbf05a1

Browse files
authored
Merge pull request #882 from helgi/scheduler_logging
feat(scheduler) prepend [namespace] to Scheduler log message for better traceability
2 parents 2bb7724 + edb0383 commit fbf05a1

1 file changed

Lines changed: 40 additions & 30 deletions

File tree

rootfs/scheduler/__init__.py

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ def __init__(self):
9797
self.url = settings.SCHEDULER_URL
9898
self.session = get_session()
9999

100+
def log(self, namespace, message, level=logging.INFO):
101+
"""Logs a message in the context of this application.
102+
103+
This prefixes log messages with a namespace "tag".
104+
When it's seen, the message-- usually an application event of some
105+
sort like releasing or scaling, will be considered as "belonging" to the application
106+
instead of the controller and will be handled accordingly.
107+
"""
108+
logger.log(level, "[{}]: {}".format(namespace, message))
109+
100110
def deploy(self, namespace, name, image, command, **kwargs): # noqa
101111
"""Scale RC or Deployment depending on what's requested"""
102112
self.deploy_timeout = kwargs.get('deploy_timeout', 120)
@@ -124,7 +134,7 @@ def deploy_deployment(self, namespace, name, image, command, **kwargs): # noqa
124134
}
125135
# this depends on the deployment object having the latest information
126136
if deployment['spec']['template']['metadata']['labels'] == labels:
127-
logger.info('Deployment {} with release {} already exists under Namespace {}. Stopping deploy'.format(name, version, namespace)) # noqa
137+
self.log(namespace, 'Deployment {} with release {} already exists. Stopping deploy'.format(name, version)) # noqa
128138
return
129139
except KubeException:
130140
# create the initial deployment object (and the first revision)
@@ -160,7 +170,7 @@ def deploy_rc(self, namespace, name, image, command, **kwargs): # noqa
160170
# If an RC already exists then stop processing of the deploy
161171
try:
162172
self.get_rc(namespace, name)
163-
logger.info('RC {} already exists under Namespace {}. Stopping deploy'.format(name, namespace)) # noqa
173+
self.log(namespace, 'RC {} already exists. Stopping deploy'.format(name)) # noqa
164174
return
165175
except KubeHTTPException:
166176
# make replicas 0 so scaling handles the work
@@ -173,7 +183,7 @@ def deploy_rc(self, namespace, name, image, command, **kwargs): # noqa
173183
desired = int(old_rc["spec"]["replicas"])
174184
else:
175185
desired = kwargs['replicas']
176-
logger.info('No prior RC could be found for {}-{}'.format(namespace, app_type))
186+
self.log(namespace, 'No prior RC could be found for {}'.format(app_type))
177187

178188
# see if application or global deploy batches are defined
179189
batches = kwargs.get('deploy_batches', None)
@@ -186,14 +196,14 @@ def deploy_rc(self, namespace, name, image, command, **kwargs): # noqa
186196
new_name = new_rc["metadata"]["name"]
187197
for batch in batches:
188198
count += batch
189-
logger.info('scaling release {} to {} out of final {}'.format(
199+
self.log(namespace, 'scaling release {} to {} out of final {}'.format(
190200
new_name, count, desired
191201
))
192202
self._scale_rc(namespace, new_name, count)
193203

194204
if old_rc:
195205
old_name = old_rc["metadata"]["name"]
196-
logger.info('scaling old release {} from original {} to {}'.format(
206+
self.log(namespace, 'scaling old release {} from original {} to {}'.format(
197207
old_name, desired, (desired-count))
198208
)
199209
self._scale_rc(namespace, old_name, (desired-count))
@@ -435,7 +445,7 @@ def _build_pod_manifest(self, namespace, name, image, **kwargs):
435445

436446
def run(self, namespace, name, image, entrypoint, command, **kwargs):
437447
"""Run a one-off command."""
438-
logger.info('run {}, img {}, entrypoint {}, cmd "{}"'.format(
448+
self.log(namespace, 'run {}, img {}, entrypoint {}, cmd "{}"'.format(
439449
name, image, entrypoint, command)
440450
)
441451

@@ -840,7 +850,7 @@ def _wait_until_pods_terminate(self, namespace, labels, current, desired):
840850

841851
timeout = settings.KUBERNETES_POD_TERMINATION_GRACE_PERIOD_SECONDS
842852
delta = current - desired
843-
logger.info("waiting for {} pods in {} namespace to be terminated ({}s timeout)".format(delta, namespace, timeout)) # noqa
853+
self.log(namespace, "waiting for {} pods to be terminated ({}s timeout)".format(delta, timeout)) # noqa
844854
for waited in range(timeout):
845855
pods = self.get_pods(namespace, labels=labels).json()
846856
count = len(pods['items'])
@@ -859,11 +869,11 @@ def _wait_until_pods_terminate(self, namespace, labels, current, desired):
859869
break
860870

861871
if waited > 0 and (waited % 10) == 0:
862-
logger.info("waited {}s and {} pods out of {} are fully terminated".format(waited, (delta - count), delta)) # noqa
872+
self.log(namespace, "waited {}s and {} pods out of {} are fully terminated".format(waited, (delta - count), delta)) # noqa
863873

864874
time.sleep(1)
865875

866-
logger.info("{} pods in namespace {} are terminated".format(delta, namespace))
876+
self.log(namespace, "{} pods are terminated".format(delta))
867877

868878
def _wait_until_pods_are_ready(self, namespace, containers, labels, desired): # noqa
869879
# If desired is 0 then there is no ready state to check on
@@ -881,10 +891,10 @@ def _wait_until_pods_are_ready(self, namespace, containers, labels, desired): #
881891
# this is to account for kubernetes having readiness check report as failure until
882892
# the initial delay period is up
883893
delay = int(container['readinessProbe'].get('initialDelaySeconds', 50))
884-
logger.info("adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, timeout)) # noqa
894+
self.log(namespace, "adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, timeout)) # noqa
885895
timeout += delay
886896

887-
logger.info("waiting for {} pods in {} namespace to be in services ({}s timeout)".format(desired, namespace, timeout)) # noqa
897+
self.log(namespace, "waiting for {} pods to be in services ({}s timeout)".format(desired, timeout)) # noqa
888898

889899
# Ensure the minimum desired number of pods are available
890900
waited = 0
@@ -916,30 +926,30 @@ def _wait_until_pods_are_ready(self, namespace, containers, labels, desired): #
916926
break
917927

918928
if waited > 0 and (waited % 10) == 0:
919-
logger.info("waited {}s and {} pods are in service".format(waited, count))
929+
self.log(namespace, "waited {}s and {} pods are in service".format(waited, count))
920930

921931
# increase wait time without dealing with jitters from above code
922932
waited += 1
923933
time.sleep(1)
924934

925935
# timed out
926936
if waited > timeout:
927-
logger.info('timed out ({}s) waiting for pods to come up in namespace {}'.format(timeout, namespace)) # noqa
937+
self.log(namespace, 'timed out ({}s) waiting for pods to come up in namespace {}'.format(timeout, namespace)) # noqa
928938

929-
logger.info("{} out of {} pods in namespace {} are in service".format(count, desired, namespace)) # noqa
939+
self.log(namespace, "{} out of {} pods are in service".format(count, desired)) # noqa
930940

931941
def _scale_rc(self, namespace, name, desired):
932942
rc = self.get_rc(namespace, name).json()
933943

934944
current = int(rc['spec']['replicas'])
935945
if desired == current:
936-
logger.info("Not scaling RC {} in Namespace {} to {} replicas. Already at desired replicas".format(name, namespace, desired)) # noqa
946+
self.log(namespace, "Not scaling RC {} to {} replicas. Already at desired replicas".format(name, desired)) # noqa
937947
return
938948
elif desired != rc['spec']['replicas']: # RC needs new replica count
939949
# Set the new desired replica count
940950
rc['spec']['replicas'] = desired
941951

942-
logger.info("scaling RC {} in Namespace {} from {} to {} replicas".format(name, namespace, current, desired)) # noqa
952+
self.log(namespace, "scaling RC {} from {} to {} replicas".format(name, current, desired)) # noqa
943953

944954
self.update_rc(namespace, name, rc)
945955
self._wait_until_rc_is_updated(namespace, name)
@@ -994,7 +1004,7 @@ def create_rc(self, namespace, name, image, command, **kwargs):
9941004
resp,
9951005
'create ReplicationController "{}" in Namespace "{}"', name, namespace
9961006
)
997-
logger.debug('manifest used: {}'.format(ruamel.yaml.dump(manifest)))
1007+
self.log(namespace, 'manifest used: {}'.format(ruamel.yaml.dump(manifest)), logging.DEBUG) # noqa
9981008

9991009
self._wait_until_rc_is_updated(namespace, name)
10001010

@@ -1008,15 +1018,15 @@ def _wait_until_rc_is_updated(self, namespace, name):
10081018
More information is also available at:
10091019
https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#metadata
10101020
"""
1011-
logger.debug("waiting for ReplicationController {} to get a newer generation (30s timeout)".format(name)) # noqa
1021+
self.log(namespace, "waiting for ReplicationController {} to get a newer generation (30s timeout)".format(name), logging.DEBUG) # noqa
10121022
for _ in range(30):
10131023
try:
10141024
rc = self.get_rc(namespace, name).json()
10151025
if (
10161026
"observedGeneration" in rc["status"] and
10171027
rc["status"]["observedGeneration"] >= rc["metadata"]["generation"]
10181028
):
1019-
logger.debug("ReplicationController {} got a newer generation (30s timeout)".format(name)) # noqa
1029+
self.log(namespace, "ReplicationController {} got a newer generation (30s timeout)".format(name), logging.DEBUG) # noqa
10201030
break
10211031

10221032
time.sleep(1)
@@ -1480,8 +1490,8 @@ def _handle_pod_long_image_pulling(self, reason, pod):
14801490
seconds = 60 # time threshold before padding timeout
14811491
if (start + timedelta(seconds=seconds)) < datetime.utcnow():
14821492
# add 10 minutes to timeout to allow a pull image operation to finish
1483-
logger.info('Kubernetes has been pulling the image for {} seconds'.format(seconds)) # noqa
1484-
logger.info('Increasing timeout by 10 minutes to allow a pull image operation to finish for pods in namespace {}'.format(namespace)) # noqa
1493+
self.log(namespace, 'Kubernetes has been pulling the image for {} seconds'.format(seconds)) # noqa
1494+
self.log(namespace, 'Increasing timeout by 10 minutes to allow a pull image operation to finish for pods') # noqa
14851495

14861496
# make it so function doesn't do processing again
14871497
setattr(self, '_handle_pod_long_image_pulling_applied', True)
@@ -1537,15 +1547,15 @@ def _wait_until_deployment_is_updated(self, namespace, name):
15371547
More information is also available at:
15381548
https://github.com/kubernetes/kubernetes/blob/master/docs/devel/api-conventions.md#metadata
15391549
"""
1540-
logger.debug("waiting for Deployment {} to get a newer generation (30s timeout)".format(name)) # noqa
1550+
self.log(namespace, "waiting for Deployment {} to get a newer generation (30s timeout)".format(name), logging.DEBUG) # noqa
15411551
for _ in range(30):
15421552
try:
15431553
deploy = self.get_deployment(namespace, name).json()
15441554
if (
15451555
'observedGeneration' in deploy['status'] and
15461556
deploy['status']['observedGeneration'] >= deploy['metadata']['generation']
15471557
):
1548-
logger.debug("A newer generation was found for Deployment {}".format(name))
1558+
self.log(namespace, "A newer generation was found for Deployment {}".format(name), logging.DEBUG) # noqa
15491559
break
15501560

15511561
time.sleep(1)
@@ -1596,8 +1606,8 @@ def update_deployment(self, namespace, name, image, command, **kwargs):
15961606
url = self._api("/namespaces/{}/deployments/{}", namespace, name)
15971607
response = self.session.put(url, json=manifest)
15981608
if unhealthy(response.status_code):
1609+
self.log(namespace, 'template used: {}'.format(json.dumps(manifest, indent=4)), logging.DEBUG) # noqa
15991610
raise KubeHTTPException(response, 'update Deployment "{}"', name)
1600-
logger.debug('template used: {}'.format(json.dumps(manifest, indent=4)))
16011611

16021612
self._wait_until_deployment_is_updated(namespace, name)
16031613
self._wait_until_deployment_is_ready(namespace, name, **kwargs)
@@ -1614,7 +1624,7 @@ def create_deployment(self, namespace, name, image, command, **kwargs):
16141624
response,
16151625
'create Deployment "{}" in Namespace "{}"', name, namespace
16161626
)
1617-
logger.debug('template used: {}'.format(json.dumps(manifest, indent=4)))
1627+
self.log(namespace, 'template used: {}'.format(json.dumps(manifest, indent=4)), logging.DEBUG) # noqa
16181628

16191629
self._wait_until_deployment_is_updated(namespace, name)
16201630
self._wait_until_deployment_is_ready(namespace, name, **kwargs)
@@ -1652,12 +1662,12 @@ def _wait_until_deployment_is_ready(self, namespace, name, **kwargs):
16521662
# this is to account for kubernetes having readiness check report as failure until
16531663
# the initial delay period is up
16541664
delay = int(container['readinessProbe'].get('initialDelaySeconds', 50))
1655-
logger.info("adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, deploy_timeout)) # noqa
1665+
self.log(namespace, "adding {}s on to the original {}s timeout to account for the initial delay specified in the readiness probe".format(delay, deploy_timeout)) # noqa
16561666
deploy_timeout += delay
16571667

16581668
# a rough calculation that figures out an overall timeout
16591669
timeout = len(batches) * deploy_timeout
1660-
logger.info('This deployments overall timeout is {}s - batch timout is {}s and there are {} batches to deploy with a total of {} pods'.format(timeout, deploy_timeout, len(batches), replicas)) # noqa
1670+
self.log(namespace, 'This deployments overall timeout is {}s - batch timout is {}s and there are {} batches to deploy with a total of {} pods'.format(timeout, deploy_timeout, len(batches), replicas)) # noqa
16611671

16621672
waited = 0
16631673
while waited < timeout:
@@ -1679,7 +1689,7 @@ def _wait_until_deployment_is_ready(self, namespace, name, **kwargs):
16791689
# handle errors and bubble up if need be
16801690
self._handle_pod_image_errors(pod, reason, message)
16811691

1682-
logger.info("waited {}s and {} pods are in service".format(waited, availablePods))
1692+
self.log(namespace, "waited {}s and {} pods are in service".format(waited, availablePods)) # noqa
16831693

16841694
waited += 1
16851695
time.sleep(1)
@@ -1764,12 +1774,12 @@ def _scale_deployment(self, namespace, name, image, command, **kwargs):
17641774
desired = int(kwargs.get('replicas'))
17651775
current = int(deployment['spec']['replicas'])
17661776
if desired == current:
1767-
logger.info("Not scaling Deployment {} in Namespace {} to {} replicas. Already at desired replicas".format(name, namespace, desired)) # noqa
1777+
self.log(namespace, "Not scaling Deployment {} to {} replicas. Already at desired replicas".format(name, desired)) # noqa
17681778
return
17691779
elif desired != current:
17701780
# set the previous replicas count so the wait logic can deal with terminating pods
17711781
kwargs['previous_replicas'] = current
1772-
logger.info("scaling Deployment {} in Namespace {} from {} to {} replicas".format(name, namespace, current, desired)) # noqa
1782+
self.log(namespace, "scaling Deployment {} from {} to {} replicas".format(name, current, desired)) # noqa
17731783
self.update_deployment(namespace, name, image, command, **kwargs)
17741784

17751785
def get_replicaset(self, namespace, name):

0 commit comments

Comments
 (0)