Skip to content

Commit 4990a46

Browse files
committed
fix(scheduler): stop relying on namespace events in k8s for scale information
Changed things around so that first we ensure there are enough pods running to service the desired state and then verify that all pods are terminated The reason to move away from events is that the API server (k8s one) has a watch setup on etcd to see events fly by but also utilises a fairly constrained cache of the information to speed things up. On more active k8s servers the information can go stale within seconds or drops off the radar very quickly. More info at https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/apiserver-watch.md#high-level-design Fixes 484
1 parent a675233 commit 4990a46

1 file changed

Lines changed: 16 additions & 97 deletions

File tree

rootfs/scheduler/__init__.py

Lines changed: 16 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ def deploy(self, namespace, name, image, command, **kwargs):
361361
))
362362

363363
if old_rc:
364-
logger.debug('scaling old release {} from {} to {}'.format(
364+
logger.debug('scaling old release {} from original {} to {}'.format(
365365
old_rc["metadata"]["name"], desired, (desired-count))
366366
)
367367
self._scale_rc(namespace, old_rc["metadata"]["name"], (desired-count))
@@ -737,100 +737,25 @@ def _get_rcs(self, namespace, **kwargs):
737737

738738
return response
739739

740-
def _get_schedule_status(self, namespace, name, current, desired, resource_version): # noqa
741-
if int(desired) > int(current):
742-
# new pods are going to be scheduled
743-
component = 'scheduler'
744-
reason = 'Scheduled'
745-
# events endpoints will return *all* scheduled pods
746-
state_count = desired
747-
else:
748-
# pods will be deleted
749-
component = 'kubelet'
750-
reason = 'Killing'
751-
# only the delta should be found in a particular state
752-
state_count = current - desired
753-
754-
# always get the highest value so all pods are processed
755-
ready_desired = desired if int(desired) > int(current) else current
756-
logger.debug("waiting for {} pods to be processed in {} namespace to be known by the k8s api (120s timeout)".format(ready_desired, namespace)) # noqa
757-
758-
pods = []
759-
for waited in range(120):
760-
count = 0
761-
pods = []
762-
data = self._get_pods(namespace).json()
763-
for pod in data['items']:
764-
if pod['metadata']['generateName'] == name+'-':
765-
count += 1
766-
pods.append(pod['metadata']['name'])
767-
768-
if count == ready_desired:
769-
break
770-
771-
if waited > 0 and (waited % 10) == 0:
772-
logger.debug("waited {}s and {} pods are found ".format(waited, count))
773-
774-
time.sleep(1)
775-
776-
logger.debug("{} out of {} pods to be processed in namespace {} were found".format(count, ready_desired, namespace)) # noqa
777-
778-
logger.debug("waiting for {} pods to get to state {} in {} namespace (120s timeout)".format(state_count, reason, namespace)) # noqa
740+
def _wait_until_pods_terminate(self, namespace, labels, current, desired):
741+
delta = current - desired
742+
logger.debug("waiting for {} pods in {} namespace to be terminated (120s timeout)".format(delta, namespace)) # noqa
779743
for waited in range(120):
780-
waiting_pods = []
781-
# TODO Too many objects returned for this... look for alternative
782-
events = self._get_namespace_events(namespace, resourceVersion=resource_version).json()
783-
for event in events['items']:
784-
if (
785-
event['involvedObject']['name'] in pods and
786-
event['source']['component'] == component and
787-
event['reason'] == reason and
788-
event['involvedObject']['name'] not in waiting_pods
789-
):
790-
# certain reasons, like Killing, can happen many times per pod
791-
waiting_pods.append(event['involvedObject']['name'])
792-
793-
if len(waiting_pods) == state_count:
794-
break
795-
796-
if waited > 0 and (waited % 10) == 0:
797-
logger.debug("waited {}s and {} pods are in state {}".format(waited, len(waiting_pods), reason)) # noqa
798-
799-
time.sleep(1)
800-
801-
logger.debug("{} out of {} pods in namespace {} are in state {}".format(len(waiting_pods), state_count, namespace, reason)) # noqa
802-
803-
# if it was a scale down operation, wait until terminating pods are done
804-
if reason == 'Killing':
805-
self._wait_until_pods_terminate(namespace, name, state_count)
806-
807-
def _wait_until_pods_terminate(self, namespace, name, desired):
808-
logger.debug("waiting for {} pods in {} namespace to be terminated (120s timeout)".format(desired, namespace)) # noqa
809-
for waited in range(120):
810-
count = 0
811-
pods = self._get_pods(namespace).json()
812-
for pod in pods['items']:
813-
# now that state is running time to see if probes are passing
814-
if (
815-
pod['metadata']['generateName'] == name+'-' and
816-
pod['status']['phase'] == 'Running' and
817-
# is the readiness probe passing?
818-
self._pod_readiness_status(pod) == 'Terminating'
819-
):
820-
count += 1
744+
pods = self._get_pods(namespace, labels=labels).json()
745+
count = len(pods['items'])
821746

822747
# stop when all pods are terminated as expected
823-
if count == 0:
748+
if count == desired:
824749
break
825750

826751
if waited > 0 and (waited % 10) == 0:
827-
logger.debug("waited {}s and {} pods out of {} are fully terminated".format(waited, (desired - count), desired)) # noqa
752+
logger.debug("waited {}s and {} pods out of {} are fully terminated".format(waited, (delta - count), delta)) # noqa
828753

829754
time.sleep(1)
830755

831-
logger.debug("{} pods in namespace {} are terminated".format(desired, namespace)) # noqa
756+
logger.debug("{} pods in namespace {} are terminated".format(delta, namespace)) # noqa
832757

833-
def _get_pod_ready_status(self, namespace, name, desired):
758+
def _get_pod_ready_status(self, namespace, labels, desired):
834759
# If desired is 0 then there is no ready state to check on
835760
if desired == 0:
836761
return
@@ -839,11 +764,10 @@ def _get_pod_ready_status(self, namespace, name, desired):
839764
logger.debug("waiting for {} pods in {} namespace to be in services (120s timeout)".format(desired, namespace)) # noqa
840765
for waited in range(120):
841766
count = 0
842-
pods = self._get_pods(namespace).json()
767+
pods = self._get_pods(namespace, labels=labels).json()
843768
for pod in pods['items']:
844769
# now that state is running time to see if probes are passing
845770
if (
846-
pod['metadata']['generateName'] == name+'-' and
847771
pod['status']['phase'] == 'Running' and
848772
# is the readiness probe passing?
849773
self._pod_readiness_status(pod) == 'Running' and
@@ -897,17 +821,12 @@ def _scale_rc(self, namespace, name, desired):
897821

898822
logger.debug("RC {} has a new resource version {}".format(name, js_template["metadata"]["resourceVersion"])) # noqa
899823

900-
# figure out the schedule state
901-
self._get_schedule_status(
902-
namespace,
903-
name,
904-
current,
905-
desired,
906-
js_template['metadata']['resourceVersion']
907-
)
908-
909824
# Double check enough pods are in the required state to service the application
910-
self._get_pod_ready_status(namespace, name, desired)
825+
self._get_pod_ready_status(namespace, labels, desired)
826+
827+
# if it was a scale down operation, wait until terminating pods are done
828+
if int(desired) < int(current):
829+
self._wait_until_pods_terminate(namespace, labels, current, desired)
911830

912831
def _create_rc(self, namespace, name, image, command, **kwargs): # noqa
913832
app_type = kwargs.get('app_type')

0 commit comments

Comments
 (0)