Skip to content

Commit 1db6146

Browse files
authored
feat(scheduler): use /scale endpoints for RC and Deployments to only update replicas during scale events (#1029)
Prior to this a full resource update was being done during a scale event, this could lead to potential pod template updates that were not desired Fixes #1012
1 parent 5c83d80 commit 1db6146

4 files changed

Lines changed: 112 additions & 31 deletions

File tree

rootfs/scheduler/mock.py

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
from zlib import adler32
1313

1414
import scheduler
15-
from scheduler import KubeHTTPClient, KubeHTTPException
15+
from scheduler import KubeHTTPClient
16+
from scheduler.exceptions import KubeHTTPException
1617

1718
from django.conf import settings
1819
from django.core.cache import cache
@@ -79,7 +80,7 @@ def _acquire(self):
7980
resources = [
8081
'namespaces', 'nodes', 'pods', 'replicationcontrollers',
8182
'secrets', 'services', 'events', 'deployments', 'replicasets',
82-
'horizontalpodautoscalers'
83+
'horizontalpodautoscalers', 'scale',
8384
]
8485

8586

@@ -103,15 +104,48 @@ def cache_key(key):
103104
return key
104105

105106

106-
def get_type(key, pos=-1):
107+
def get_type(key):
107108
key = key.strip('/').split('/')
108-
if key[pos] in resources:
109-
return key[pos]
109+
110+
# check if is subresource (last element)
111+
if len(key) and key[-1] in resources:
112+
return key[-1]
113+
114+
# go back to second last
115+
if len(key) > 1 and key[-2] in resources:
116+
return key[-2]
110117

111118
# bad if it gets there
112119
return 'unknown'
113120

114121

122+
def get_namespace(url, resource_type):
123+
"""Inspects the URL and gets namespace from it if there is any"""
124+
# correct back to proper namespace API
125+
url = url.replace('apis_autoscaling_v1', 'api_v1')
126+
url = url.replace('apis_extensions_v1beta1', 'api_v1')
127+
# check if this is a subresource
128+
subresource, resource_type, url = is_subresource(resource_type, url)
129+
# get namespace name
130+
name = url.split('api_v1_namespaces_').pop(1).split("_{}_".format(resource_type), 1).pop(0)
131+
return 'api_v1_namespaces_' + name
132+
133+
134+
def is_subresource(resource_type, url):
135+
# check if this is a subresource
136+
subresource = resource_type
137+
if url.endswith(resource_type):
138+
# find the real resource type to do splitting below
139+
url = url.replace('_' + resource_type, '')
140+
match = list(set(resources) & set(url.split('_')))
141+
match.remove('namespaces') # not needed
142+
# it is only a subresource when another resource can be found
143+
if match:
144+
resource_type = match.pop()
145+
146+
return subresource, resource_type, url
147+
148+
115149
@CacheLock()
116150
def process_hpa():
117151
"""
@@ -570,7 +604,6 @@ def fetch_all(request, context):
570604
url = urlparse(request.url)
571605
filters = prepare_query_filters(url.query)
572606
cache_path = cache_key(request.path)
573-
574607
data = filter_data(filters, cache_path)
575608
return {'items': data}
576609

@@ -615,7 +648,9 @@ def get(request, context):
615648
"""Process a GET request to the kubernetes API"""
616649
# Figure out if it is a GET operation for a single element or a list
617650
url = urlparse(request.url)
618-
if get_type(url.path) in resources:
651+
resource_type = get_type(url.path)
652+
# is the last element a resource type or a name
653+
if url.path.strip('/').split('/')[-1] == resource_type:
619654
return fetch_all(request, context)
620655

621656
# fetch singular item
@@ -629,9 +664,7 @@ def post(request, context):
629664
resource_type = get_type(request.url)
630665
# check if the namespace being posted to exists
631666
if resource_type != 'namespaces':
632-
namespace, _ = url.split('_{}_'.format(resource_type))
633-
namespace = namespace.replace('apis_autoscaling_v1', 'api_v1')
634-
namespace = namespace.replace('apis_extensions_v1beta1', 'api_v1')
667+
namespace = get_namespace(url, resource_type)
635668
if cache.get(namespace) is None:
636669
context.status_code = 404
637670
context.reason = 'Not Found'
@@ -682,17 +715,21 @@ def put(request, context):
682715
"""Process a PUT request to the kubernetes API"""
683716
url = cache_key(request.url)
684717
# type is the second last element
685-
resource_type = get_type(request.url, -2)
718+
resource_type = get_type(request.url)
686719
# check if the namespace being posted to exists
687720
if resource_type != 'namespaces':
688-
namespace, _ = url.split('_{}_'.format(resource_type))
689-
namespace = namespace.replace('apis_autoscaling_v1', 'api_v1')
690-
namespace = namespace.replace('apis_extensions_v1beta1', 'api_v1')
721+
namespace = get_namespace(url, resource_type)
691722
if cache.get(namespace) is None:
692723
context.status_code = 404
693724
context.reason = 'Not Found'
694725
return {}
695726

727+
# figure out main resource if in subresource
728+
original_url = url
729+
subresource, resource_type, url = is_subresource(resource_type, url)
730+
if subresource != resource_type:
731+
cache.set(original_url, request.json(), None)
732+
696733
item = cache.get(url)
697734
if item is None:
698735
context.status_code = 404
@@ -702,14 +739,21 @@ def put(request, context):
702739
data = request.json()
703740

704741
# merge new data into old but keep labels separate in case they changed
705-
labels = data['metadata'].pop('labels')
706-
item['metadata'].update(data['metadata'])
707-
data['metadata'] = item['metadata']
708-
# make sure only new labels are used
709-
data['metadata']['labels'] = labels
742+
if 'labels' in data['metadata']:
743+
labels = data['metadata'].pop('labels')
744+
item['metadata'].update(data['metadata'])
745+
data['metadata'] = item['metadata']
746+
# make sure only new labels are used
747+
data['metadata']['labels'] = labels
710748

711749
# split out deployments and replicasets? due to upsert_pods
712750
if resource_type in ['replicationcontrollers', 'replicasets', 'deployments']:
751+
if subresource == 'scale':
752+
# has minimal info so need to copy data
753+
replicas = data['spec']['replicas']
754+
data = copy.deepcopy(item) # full copy
755+
data['spec']['replicas'] = replicas
756+
713757
if 'status' not in data:
714758
# just use what was set last time
715759
data['status'] = {'observedGeneration': item['status']['observedGeneration']}
@@ -744,7 +788,7 @@ def delete(request, context):
744788
context.reason = 'Not Found'
745789
return {}
746790

747-
resource_type = get_type(request.url, -2)
791+
resource_type = get_type(request.url)
748792
# clean everything from a namespace
749793
if resource_type == 'namespaces':
750794
for resource in resources:
@@ -783,8 +827,8 @@ def add_cache_item(url, resource_type, data):
783827
cache.set(resource_type, items, None)
784828

785829
# Keep track of what resources exist under other resources (mostly namespace)
786-
namespace, item = url.split('_{}_'.format(resource_type))
787-
cache_url = '{}_{}'.format(namespace, resource_type)
830+
# sneaky way of getting data up to the resource type without too much magic
831+
cache_url = ''.join(url.partition(resource_type)[0:2])
788832
items = cache.get(cache_url, [])
789833
if url not in items:
790834
items.append(url)
@@ -806,8 +850,8 @@ def remove_cache_item(url, resource_type):
806850
cache.set(resource_type, items, None)
807851

808852
# remove from namespace specific scope
809-
namespace, item = url.split('_{}_'.format(resource_type))
810-
cache_url = '{}_{}'.format(namespace, resource_type)
853+
# sneaky way of getting data up to the resource type without too much magic
854+
cache_url = ''.join(url.partition(resource_type)[0:2])
811855
items = cache.get(cache_url, [])
812856
if url in items:
813857
items.remove(url)

rootfs/scheduler/resources/deployment.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,14 @@ def scale(self, namespace, name, image, entrypoint, command, **kwargs):
162162
self.log(namespace, "Not scaling Deployment {} to {} replicas. Already at desired replicas".format(name, desired)) # noqa
163163
return
164164
elif desired != current:
165+
self.log(namespace, "scaling Deployment {} from {} to {} replicas".format(name, current, desired)) # noqa
166+
self.scales.update(namespace, name, desired, deployment)
167+
168+
# wait until scaling is done
169+
self.wait_until_updated(namespace, name)
165170
# set the previous replicas count so the wait logic can deal with terminating pods
166171
kwargs['previous_replicas'] = current
167-
self.log(namespace, "scaling Deployment {} from {} to {} replicas".format(name, current, desired)) # noqa
168-
self.update(namespace, name, image, entrypoint, command, **kwargs)
172+
self.wait_until_ready(namespace, name, **kwargs)
169173

170174
def in_progress(self, namespace, name, deploy_timeout, batches, replicas, tags):
171175
"""

rootfs/scheduler/resources/replicationcontroller.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,8 @@ def scale(self, namespace, name, desired, timeout):
9393
self.log(namespace, "Not scaling RC {} to {} replicas. Already at desired replicas".format(name, desired)) # noqa
9494
return
9595
elif desired != rc['spec']['replicas']: # RC needs new replica count
96-
# Set the new desired replica count
97-
rc['spec']['replicas'] = desired
98-
9996
self.log(namespace, "scaling RC {} from {} to {} replicas".format(name, current, desired)) # noqa
100-
101-
self.update(namespace, name, rc)
97+
self.scales.update(namespace, name, desired, rc)
10298
self.wait_until_updated(namespace, name)
10399

104100
# Double check enough pods are in the required state to service the application
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from scheduler.resources import Resource
2+
from scheduler.exceptions import KubeHTTPException
3+
4+
5+
class Scale(Resource):
6+
def manifest(self, namespace, name, replicas):
7+
manifest = {
8+
'kind': 'Scale',
9+
'apiVersion': self.api_version,
10+
'metadata': {
11+
'namespace': namespace,
12+
'name': name,
13+
},
14+
'spec': {
15+
'replicas': replicas,
16+
}
17+
}
18+
19+
return manifest
20+
21+
def update(self, namespace, name, replicas, target):
22+
# use API version and prefix from target use pick the right endpoint
23+
resource_type = target['kind'].lower() + 's' # make plural for url
24+
self.api_version = getattr(self, resource_type).api_version
25+
self.api_prefix = getattr(self, resource_type).api_prefix
26+
27+
manifest = self.manifest(namespace, name, replicas)
28+
url = self.api("/namespaces/{}/{}/{}/scale", namespace, resource_type, name)
29+
response = self.session.put(url, json=manifest)
30+
if self.unhealthy(response.status_code):
31+
raise KubeHTTPException(
32+
response,
33+
'scale {} "{}" in Namespace "{}"', target['kind'], name, namespace
34+
)
35+
self.log(namespace, 'template used: {}'.format(json.dumps(manifest, indent=4)), 'DEBUG') # noqa
36+
37+
return response

0 commit comments

Comments
 (0)