Skip to content

Commit 22feb44

Browse files
committed
ref(scheduler): improve scale operation logging and performance
1 parent e14502a commit 22feb44

1 file changed

Lines changed: 155 additions & 62 deletions

File tree

rootfs/scheduler/__init__.py

Lines changed: 155 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import os
34
import re
45
import string
56
import time
@@ -320,14 +321,21 @@ def deploy(self, name, image, command, **kwargs):
320321
count = 1
321322
while desired >= count:
322323
logger.debug('scaling release {} to {} out of final {}'.format(
323-
new_rc["metadata"]["name"], count, desired)
324-
)
324+
new_rc["metadata"]["name"], count, desired
325+
))
325326
self._scale_rc(new_rc["metadata"]["name"], app_name, count)
327+
logger.debug('scaled up pod number {} for {}'.format(
328+
count, new_rc["metadata"]["name"]
329+
))
330+
326331
if old_rc:
327332
logger.debug('scaling old release {} from {} to {}'.format(
328333
old_rc["metadata"]["name"], desired, (desired-count))
329334
)
330335
self._scale_rc(old_rc["metadata"]["name"], app_name, (desired-count))
336+
logger.debug('scaled down pod number {} for {}'.format(
337+
count, old_rc["metadata"]["name"]
338+
))
331339

332340
count += 1
333341
except Exception as e:
@@ -359,6 +367,7 @@ def scale(self, name, image, command, **kwargs):
359367
try:
360368
self._scale_rc(name, app_name, num)
361369
except Exception as e:
370+
logger.debug("Scaling failed because of: {}".format(str(e)))
362371
self._scale_rc(name, app_name, old_replicas)
363372
raise RuntimeError('{} (Scale): {}'.format(name, e))
364373

@@ -524,18 +533,43 @@ def _api(self, tmpl, *args):
524533
url = "/api/{}".format(self.apiversion) + tmpl.format(*args)
525534
return urljoin(self.url, url)
526535

527-
# EVENTS #
536+
def _selectors(self, **kwargs):
537+
query = {}
528538

529-
def _get_events(self, namespace):
530-
url = self._api("/namespaces/{}/events", namespace)
531-
resp = self.session.get(url)
532-
if unhealthy(resp.status_code):
533-
error(resp, "get Events in Namespace {}", namespace)
539+
# labels and fields are encoded slightly differently than python-requests can do
540+
labels = kwargs.get('labels', {})
541+
if labels:
542+
# http://kubernetes.io/v1.1/docs/user-guide/labels.html#list-and-watch-filtering
543+
labels = ['{}={}'.format(key, value) for key, value in labels.items()]
544+
query['labelSelector'] = ','.join(labels)
534545

535-
return resp.status_code, resp.text, resp.reason
546+
fields = kwargs.get('fields', {})
547+
if fields:
548+
fields = ['{}={}'.format(key, value) for key, value in fields.items()]
549+
query['fieldSelector'] = ','.join(fields)
550+
551+
# Which resource version to start from. Otherwise starts from the beginning
552+
resource_version = kwargs.get('resourceVersion', None)
553+
if resource_version:
554+
query['resourceVersion'] = resource_version
555+
556+
# If output should pretty print, only True / False allowed
557+
pretty = bool(kwargs.get('pretty', False))
558+
if pretty:
559+
query['pretty'] = pretty
560+
561+
return query
536562

537563
# NAMESPACE #
538564

565+
def _get_namespace_events(self, namespace, **kwargs):
566+
url = self._api("/namespaces/{}/events", namespace)
567+
response = self.session.get(url, params=self._selectors(**kwargs))
568+
if unhealthy(response.status_code):
569+
error(response, "get Events in Namespace {}", namespace)
570+
571+
return response
572+
539573
def _create_namespace(self, app_name):
540574
url = self._api("/namespaces")
541575
data = {
@@ -589,59 +623,72 @@ def _get_rc(self, name, namespace):
589623

590624
return resp.json()
591625

592-
def _get_schedule_status(self, name, num, namespace):
626+
def _get_schedule_status(self, namespace, name, current, desired, resource_version): # noqa
627+
if int(desired) > int(current):
628+
# new pods are going to be scheduled
629+
component = 'scheduler'
630+
reason = 'Scheduled'
631+
# events endpoints will return *all* scheduled pods
632+
state_count = desired
633+
else:
634+
# pods will be deleted
635+
component = 'kubelet'
636+
reason = 'Killing'
637+
# only the delta should be found in a particular state
638+
state_count = current - desired
639+
640+
# always get the highest value so all pods are processed
641+
ready_desired = desired if int(desired) > int(current) else current
642+
logger.debug("waiting for {} pods to be processed in {} namespace to be known by the k8s api (120s timeout)".format(ready_desired, namespace)) # noqa
593643
pods = []
594-
for _ in range(120):
644+
for waited in range(120):
595645
count = 0
596-
pods = self._get_pods(namespace)
597-
parsed_json = pods.json()
598646
pods = []
647+
parsed_json = self._get_pods(namespace).json()
599648
for pod in parsed_json['items']:
600649
if pod['metadata']['generateName'] == name+'-':
601650
count += 1
602651
pods.append(pod['metadata']['name'])
603652

604-
if count == num:
653+
if count == ready_desired:
605654
break
606655

607-
time.sleep(1)
608-
609-
for _ in range(120):
610-
count = 0
611-
status, data, reason = self._get_events(namespace)
612-
parsed_json = json.loads(data)
613-
for event in parsed_json['items']:
614-
if(event['involvedObject']['name'] in pods and
615-
event['source']['component'] == 'scheduler'):
616-
if event['reason'] == 'Scheduled':
617-
count += 1
618-
else:
619-
raise RuntimeError(event['message'])
620-
621-
if count == num:
622-
break
656+
if waited > 0 and (waited % 10) == 0:
657+
logger.debug("waited {}s and {} pods are found ".format(waited, count))
623658

624659
time.sleep(1)
625660

626-
def _scale_rc(self, name, namespace, num):
627-
rc = self._get_rc(name, namespace)
628-
rc["spec"]["replicas"] = num
661+
logger.debug("{} out of {} pods to be processed in namespace {} were found".format(count, ready_desired, namespace)) # noqa
629662

630-
url = self._api("/namespaces/{}/replicationcontrollers/{}", namespace, name)
631-
resp = self.session.put(url, json=rc)
632-
if unhealthy(resp.status_code):
633-
error(resp, 'scale ReplicationController "{}"', name)
663+
logger.debug("waiting for {} pods to get to state {} in {} namespace (120s timeout)".format(state_count, reason, namespace)) # noqa
664+
for waited in range(120):
665+
waiting_pods = []
666+
# TODO Too many objects returned for this... look for alternative
667+
events = self._get_namespace_events(namespace, resourceVersion=resource_version).json()
668+
for event in events['items']:
669+
if (
670+
event['involvedObject']['name'] in pods and
671+
event['source']['component'] == component and
672+
event['reason'] == reason and
673+
event['involvedObject']['name'] not in waiting_pods
674+
):
675+
# certain reasons, like Killing, can happen many times per pod
676+
waiting_pods.append(event['involvedObject']['name'])
634677

635-
resource_ver = rc['metadata']['resourceVersion']
636-
for _ in range(30):
637-
js_template = self._get_rc(name, namespace)
638-
if js_template["metadata"]["resourceVersion"] != resource_ver:
678+
if len(waiting_pods) == state_count:
639679
break
640680

681+
if waited > 0 and (waited % 10) == 0:
682+
logger.debug("waited {}s and {} pods are in state {}".format(waited, len(waiting_pods), reason)) # noqa
683+
641684
time.sleep(1)
642685

643-
self._get_schedule_status(name, num, namespace)
644-
for _ in range(120):
686+
logger.debug("{} out of {} pods in namespace {} are in state {}".format(len(waiting_pods), state_count, namespace, reason)) # noqa
687+
688+
def _get_pod_ready_status(self, namespace, name, num):
689+
# Ensure the minimum desired number of pods are available
690+
logger.debug("waiting for {} pods in {} namespace to be in services (120s timeout)".format(num, namespace)) # noqa
691+
for waited in range(120):
645692
count = 0
646693
pods = self._get_pods(namespace).json()
647694
for pod in pods['items']:
@@ -659,8 +706,59 @@ def _scale_rc(self, name, namespace, num):
659706
if count == num:
660707
break
661708

709+
if waited > 0 and (waited % 10) == 0:
710+
logger.debug("waited {}s and {} pods are in service".format(waited, count))
711+
712+
time.sleep(1)
713+
714+
logger.debug("{} out of {} pods in namespace {} are in service".format(count, num, namespace)) # noqa
715+
716+
def _scale_rc(self, name, namespace, desired):
717+
rc = self._get_rc(name, namespace)
718+
719+
# get the current replica count by querying for pods instead of introspecting RC
720+
labels = {
721+
'app': rc['spec']['selector']['app'],
722+
'type': rc['spec']['selector']['type'],
723+
'version': rc['spec']['selector']['version']
724+
}
725+
current = len(self._get_pods(namespace, labels=labels).json()['items'])
726+
727+
# Set the new desired replica count
728+
rc['spec']['replicas'] = desired
729+
730+
logger.debug("scaling RC {} in namespace {} from {} to {} replicas".format(name, namespace, current, desired)) # noqa
731+
732+
url = self._api("/namespaces/{}/replicationcontrollers/{}", namespace, name)
733+
resp = self.session.put(url, json=rc)
734+
if unhealthy(resp.status_code):
735+
error(resp, 'scale ReplicationController "{}"', name)
736+
737+
resource_ver = rc['metadata']['resourceVersion']
738+
logger.debug("waiting for RC {} to get a newer resource version than {} (30s timeout)".format(name, resource_ver)) # noqa
739+
for waited in range(30):
740+
js_template = self._get_rc(name, namespace)
741+
if js_template["metadata"]["resourceVersion"] != resource_ver:
742+
break
743+
744+
if waited > 0 and (waited % 10) == 0:
745+
logger.debug("waited {}s so far for a new resource version".format(waited))
746+
662747
time.sleep(1)
663748

749+
logger.debug("RC {} has a new resource version {}".format(name, js_template["metadata"]["resourceVersion"])) # noqa
750+
751+
# figure out the schedule state
752+
self._get_schedule_status(
753+
namespace,
754+
name,
755+
current,
756+
desired,
757+
js_template['metadata']['resourceVersion']
758+
)
759+
760+
self._get_pod_ready_status(namespace, name, desired)
761+
664762
def _create_rc(self, name, image, command, **kwargs): # noqa
665763
container_fullname = name
666764
app_name = kwargs.get('aname', {})
@@ -703,8 +801,18 @@ def _create_rc(self, name, image, command, **kwargs): # noqa
703801
env = kwargs.get('envs', {})
704802

705803
if env:
706-
for k, v in env.items():
707-
containers[0]["env"].append({"name": k, "value": str(v)})
804+
for key, value in env.items():
805+
containers[0]["env"].append({
806+
"name": key,
807+
"value": str(value)
808+
})
809+
810+
# Inject debugging if workflow is in debug mode
811+
if os.environ.get("DEBUG", False):
812+
containers[0]["env"].append({
813+
"name": "DEBUG",
814+
"value": "1"
815+
})
708816

709817
if mem or cpu:
710818
containers[0]["resources"] = {"limits": {}}
@@ -917,23 +1025,8 @@ def _get_pod(self, name, namespace, return_response=False):
9171025
return resp.status_code, resp.reason, resp.text
9181026

9191027
def _get_pods(self, namespace, **kwargs):
920-
path = "/namespaces/{}/pods"
921-
query = {}
922-
923-
# labels and fields are encoded slightly differently than python-requests can do
924-
labels = kwargs.get('labels', {})
925-
if labels:
926-
# http://kubernetes.io/v1.1/docs/user-guide/labels.html#list-and-watch-filtering
927-
labels = ['{}={}'.format(key, value) for key, value in labels.items()]
928-
query['labelSelector'] = ','.join(labels)
929-
930-
fields = kwargs.get('fields', {})
931-
if fields:
932-
fields = ['{}={}'.format(key, value) for key, value in fields.items()]
933-
query['fieldSelector'] = ','.join(fields)
934-
935-
url = self._api(path, namespace)
936-
response = self.session.get(url, params=query)
1028+
url = self._api("/namespaces/{}/pods", namespace)
1029+
response = self.session.get(url, params=self._selectors(**kwargs))
9371030
if unhealthy(response.status_code):
9381031
error(response, 'get Pods in Namespace "{}"', namespace)
9391032

0 commit comments

Comments
 (0)