Skip to content

Commit 39eef5d

Browse files
committed
Merge pull request #488 from helgi/remove_events
fix(scheduler): stop relying on namespace events in k8s for scale information
2 parents a675233 + 4990a46 commit 39eef5d

1 file changed

Lines changed: 16 additions & 97 deletions

File tree

rootfs/scheduler/__init__.py

Lines changed: 16 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ def deploy(self, namespace, name, image, command, **kwargs):
361361
))
362362

363363
if old_rc:
364-
logger.debug('scaling old release {} from {} to {}'.format(
364+
logger.debug('scaling old release {} from original {} to {}'.format(
365365
old_rc["metadata"]["name"], desired, (desired-count))
366366
)
367367
self._scale_rc(namespace, old_rc["metadata"]["name"], (desired-count))
@@ -737,100 +737,25 @@ def _get_rcs(self, namespace, **kwargs):
737737

738738
return response
739739

740-
def _get_schedule_status(self, namespace, name, current, desired, resource_version): # noqa
741-
if int(desired) > int(current):
742-
# new pods are going to be scheduled
743-
component = 'scheduler'
744-
reason = 'Scheduled'
745-
# events endpoints will return *all* scheduled pods
746-
state_count = desired
747-
else:
748-
# pods will be deleted
749-
component = 'kubelet'
750-
reason = 'Killing'
751-
# only the delta should be found in a particular state
752-
state_count = current - desired
753-
754-
# always get the highest value so all pods are processed
755-
ready_desired = desired if int(desired) > int(current) else current
756-
logger.debug("waiting for {} pods to be processed in {} namespace to be known by the k8s api (120s timeout)".format(ready_desired, namespace)) # noqa
757-
758-
pods = []
759-
for waited in range(120):
760-
count = 0
761-
pods = []
762-
data = self._get_pods(namespace).json()
763-
for pod in data['items']:
764-
if pod['metadata']['generateName'] == name+'-':
765-
count += 1
766-
pods.append(pod['metadata']['name'])
767-
768-
if count == ready_desired:
769-
break
770-
771-
if waited > 0 and (waited % 10) == 0:
772-
logger.debug("waited {}s and {} pods are found ".format(waited, count))
773-
774-
time.sleep(1)
775-
776-
logger.debug("{} out of {} pods to be processed in namespace {} were found".format(count, ready_desired, namespace)) # noqa
777-
778-
logger.debug("waiting for {} pods to get to state {} in {} namespace (120s timeout)".format(state_count, reason, namespace)) # noqa
740+
def _wait_until_pods_terminate(self, namespace, labels, current, desired):
741+
delta = current - desired
742+
logger.debug("waiting for {} pods in {} namespace to be terminated (120s timeout)".format(delta, namespace)) # noqa
779743
for waited in range(120):
780-
waiting_pods = []
781-
# TODO Too many objects returned for this... look for alternative
782-
events = self._get_namespace_events(namespace, resourceVersion=resource_version).json()
783-
for event in events['items']:
784-
if (
785-
event['involvedObject']['name'] in pods and
786-
event['source']['component'] == component and
787-
event['reason'] == reason and
788-
event['involvedObject']['name'] not in waiting_pods
789-
):
790-
# certain reasons, like Killing, can happen many times per pod
791-
waiting_pods.append(event['involvedObject']['name'])
792-
793-
if len(waiting_pods) == state_count:
794-
break
795-
796-
if waited > 0 and (waited % 10) == 0:
797-
logger.debug("waited {}s and {} pods are in state {}".format(waited, len(waiting_pods), reason)) # noqa
798-
799-
time.sleep(1)
800-
801-
logger.debug("{} out of {} pods in namespace {} are in state {}".format(len(waiting_pods), state_count, namespace, reason)) # noqa
802-
803-
# if it was a scale down operation, wait until terminating pods are done
804-
if reason == 'Killing':
805-
self._wait_until_pods_terminate(namespace, name, state_count)
806-
807-
def _wait_until_pods_terminate(self, namespace, name, desired):
808-
logger.debug("waiting for {} pods in {} namespace to be terminated (120s timeout)".format(desired, namespace)) # noqa
809-
for waited in range(120):
810-
count = 0
811-
pods = self._get_pods(namespace).json()
812-
for pod in pods['items']:
813-
# now that state is running time to see if probes are passing
814-
if (
815-
pod['metadata']['generateName'] == name+'-' and
816-
pod['status']['phase'] == 'Running' and
817-
# is the readiness probe passing?
818-
self._pod_readiness_status(pod) == 'Terminating'
819-
):
820-
count += 1
744+
pods = self._get_pods(namespace, labels=labels).json()
745+
count = len(pods['items'])
821746

822747
# stop when all pods are terminated as expected
823-
if count == 0:
748+
if count == desired:
824749
break
825750

826751
if waited > 0 and (waited % 10) == 0:
827-
logger.debug("waited {}s and {} pods out of {} are fully terminated".format(waited, (desired - count), desired)) # noqa
752+
logger.debug("waited {}s and {} pods out of {} are fully terminated".format(waited, (delta - count), delta)) # noqa
828753

829754
time.sleep(1)
830755

831-
logger.debug("{} pods in namespace {} are terminated".format(desired, namespace)) # noqa
756+
logger.debug("{} pods in namespace {} are terminated".format(delta, namespace)) # noqa
832757

833-
def _get_pod_ready_status(self, namespace, name, desired):
758+
def _get_pod_ready_status(self, namespace, labels, desired):
834759
# If desired is 0 then there is no ready state to check on
835760
if desired == 0:
836761
return
@@ -839,11 +764,10 @@ def _get_pod_ready_status(self, namespace, name, desired):
839764
logger.debug("waiting for {} pods in {} namespace to be in services (120s timeout)".format(desired, namespace)) # noqa
840765
for waited in range(120):
841766
count = 0
842-
pods = self._get_pods(namespace).json()
767+
pods = self._get_pods(namespace, labels=labels).json()
843768
for pod in pods['items']:
844769
# now that state is running time to see if probes are passing
845770
if (
846-
pod['metadata']['generateName'] == name+'-' and
847771
pod['status']['phase'] == 'Running' and
848772
# is the readiness probe passing?
849773
self._pod_readiness_status(pod) == 'Running' and
@@ -897,17 +821,12 @@ def _scale_rc(self, namespace, name, desired):
897821

898822
logger.debug("RC {} has a new resource version {}".format(name, js_template["metadata"]["resourceVersion"])) # noqa
899823

900-
# figure out the schedule state
901-
self._get_schedule_status(
902-
namespace,
903-
name,
904-
current,
905-
desired,
906-
js_template['metadata']['resourceVersion']
907-
)
908-
909824
# Double check enough pods are in the required state to service the application
910-
self._get_pod_ready_status(namespace, name, desired)
825+
self._get_pod_ready_status(namespace, labels, desired)
826+
827+
# if it was a scale down operation, wait until terminating pods are done
828+
if int(desired) < int(current):
829+
self._wait_until_pods_terminate(namespace, labels, current, desired)
911830

912831
def _create_rc(self, namespace, name, image, command, **kwargs): # noqa
913832
app_type = kwargs.get('app_type')

0 commit comments

Comments
 (0)