Skip to content

Commit 09476aa

Browse files
committed
ref(scheduler): add Deployments resource understanding to test mocks
1 parent c3c2494 commit 09476aa

1 file changed

Lines changed: 179 additions & 53 deletions

File tree

rootfs/scheduler/mock.py

Lines changed: 179 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from datetime import datetime, timedelta
2+
import json
3+
import random
4+
import re
25
import requests
36
import requests_mock
4-
from urllib.parse import urlparse, parse_qs
57
import string
6-
import random
7-
import re
88
import time
9+
from urllib.parse import urlparse, parse_qs
10+
from zlib import adler32
911

1012
from . import KubeHTTPClient, KubeHTTPException
1113

@@ -18,7 +20,7 @@
1820

1921
resources = [
2022
'namespaces', 'nodes', 'pods', 'replicationcontrollers',
21-
'secrets', 'services', 'events'
23+
'secrets', 'services', 'events', 'deployments', 'replicasets',
2224
]
2325

2426

@@ -73,7 +75,7 @@ def pod_state_transitions(pod_url=None):
7375
# Loops through all the pods to see if next phase needs to be done
7476
for pod_url, state_time in pods.items():
7577
if datetime.utcnow() < state_time:
76-
# it is now time yet!
78+
# it is not time yet!
7779
continue
7880

7981
# Look at the current phase
@@ -85,10 +87,15 @@ def pod_state_transitions(pod_url=None):
8587

8688
# Is this Pod part of an RC or not
8789
if pod['status']['phase'] == 'Running':
88-
# Try to determine the connected RC to readjust pod count
90+
# Try to determine the connected RC / RS to readjust pod count
8991
# One way is to look at annotations:kubernetes.io/created-by and read
9092
# the serialized reference but that looks clunky right now
91-
controllers = filter_data({'labels': pod['metadata']['labels']}, 'replicationcontrollers') # noqa
93+
94+
if 'pod-template-hash' in pod['metadata']['labels']:
95+
controllers = filter_data({'labels': pod['metadata']['labels']}, 'replicasets') # noqa
96+
else:
97+
controllers = filter_data({'labels': pod['metadata']['labels']}, 'replicationcontrollers') # noqa
98+
9299
# If Pod is in an RC then do nothing
93100
if not controllers:
94101
# If Pod is not in an RC then it needs to move forward
@@ -137,10 +144,16 @@ def add_cleanup_pod(url):
137144

138145

139146
def delete_pod(url, data, request):
140-
# Try to determine the connected RC to readjust pod count
147+
# Try to determine the connected RC / RS to readjust pod count
141148
# One way is to look at annotations:kubernetes.io/created-by and read
142149
# the serialized reference but that looks clunky right now
143-
controllers = filter_data({'labels': data['metadata']['labels']}, 'replicationcontrollers')
150+
151+
# Try RC first and then RS
152+
if 'pod-template-hash' in data['metadata']['labels']:
153+
controllers = filter_data({'labels': data['metadata']['labels']}, 'replicasets')
154+
else:
155+
controllers = filter_data({'labels': data['metadata']['labels']}, 'replicationcontrollers')
156+
144157
if controllers:
145158
controller = controllers.pop()
146159
upsert_pods(controller, cache_key(request.path))
@@ -213,20 +226,8 @@ def create_pods(url, labels, base, new_pods):
213226
pod_key = url
214227
if cache_key(data['metadata']['name']) not in url:
215228
pod_key = cache_key(url + '_' + data['metadata']['name'])
216-
cache.set(pod_key, data, None)
217-
218-
# Keep track of what resources are in a given resource type
219-
items = cache.get('pods', []) # global scope
220-
if pod_key not in items:
221-
items.append(pod_key)
222-
cache.set('pods', items, None)
223229

224-
# make sure url only has up to "_pods"
225-
namespaced_url = url[0:(url.find("_pods") + 5)]
226-
items = cache.get(namespaced_url, []) # pods within the namespace
227-
if pod_key not in items:
228-
items.append(pod_key)
229-
cache.set(namespaced_url, items, None)
230+
add_cache_item(pod_key, 'pods', data)
230231

231232
# set up a fake log for the pod
232233
log = "I did stuff today"
@@ -238,9 +239,12 @@ def create_pods(url, labels, base, new_pods):
238239

239240

240241
def upsert_pods(controller, url):
241-
# turn RC url into pods one
242+
# turn RC / RS (which a Deployment creates) url into pods one
242243
url = url.replace(cache_key(controller['metadata']['name']), '')
243-
url = url.replace('_replicationcontrollers_', '_pods')
244+
if '_replicasets_' in url:
245+
url = url.replace('_replicasets_', '_pods').replace('apis_extensions_v1beta1', 'api_v1') # noqa
246+
else:
247+
url = url.replace('_replicationcontrollers_', '_pods')
244248
# make sure url only has up to "_pods"
245249
url = url[0:(url.find("_pods") + 5)]
246250

@@ -274,6 +278,104 @@ def upsert_pods(controller, url):
274278
create_pods(url, data['metadata']['labels'], data, delta)
275279

276280

281+
def manage_replicasets(deployment, url):
282+
"""
283+
Creates a new ReplicaSet, scales up the pods and
284+
scales down the old ReplicaSet (if applicable) to 0
285+
and terminates all Pods
286+
287+
The input data is going to be a Deployment object
288+
"""
289+
# reset Deployments status
290+
deployment['status']['replicas'] = deployment['spec']['replicas']
291+
deployment['status']['unavailableReplicas'] = deployment['spec']['replicas']
292+
if 'updatedReplicas' in deployment['status']:
293+
del deployment['status']['updatedReplicas']
294+
if 'availableReplicas' in deployment['status']:
295+
del deployment['status']['availableReplicas']
296+
cache.set(url, deployment, None)
297+
298+
# hash deployment.spec.template with adler32 to get pod hash
299+
pod_hash = str(adler32(bytes(json.dumps(deployment['spec']['template'], sort_keys=True), 'UTF-8'))) # noqa
300+
301+
# fix up url
302+
rs_url = url.replace('_deployments_', '_replicasets_')
303+
304+
# create new RS
305+
rs = deployment.copy()
306+
rs['kind'] = 'ReplicaSet'
307+
# fix up the name
308+
rs['metadata']['name'] = rs['metadata']['name'] + '-' + pod_hash
309+
rs_url += '_' + pod_hash
310+
311+
# add the pod-template-hash label
312+
rs['metadata']['labels'] = rs['spec']['template']['metadata']['labels'].copy()
313+
rs['metadata']['labels']['pod-template-hash'] = pod_hash
314+
rs['spec']['template']['metadata']['labels']['pod-template-hash'] = pod_hash
315+
316+
# deployment only
317+
del rs['spec']['strategy']
318+
319+
# save new ReplicaSet to cache
320+
add_cache_item(rs_url, 'replicasets', rs)
321+
322+
namespaced_url = rs_url[0:(rs_url.find("_replicasets") + 12)]
323+
data = cache.get(namespaced_url, [])
324+
325+
# spin up/down pods for RS
326+
upsert_pods(rs, rs_url)
327+
328+
# Scale down older ReplicaSets
329+
for item in data:
330+
# skip latest
331+
if item == rs_url:
332+
continue
333+
334+
old_rs = cache.get(item)
335+
336+
# lame way of seeing if the RSs have the same Deployment parent
337+
# have to prune of hash as well
338+
deployment_url = item.replace('_replicasets_', '_deployments_').replace('_' + old_rs['metadata']['labels']['pod-template-hash'], '') # noqa
339+
if url != deployment_url:
340+
continue
341+
342+
if old_rs['spec']['replicas'] == 0:
343+
continue
344+
345+
old_rs['spec']['replicas'] = 0
346+
347+
upsert_pods(old_rs, item)
348+
349+
# Fill out deployment.status for success as pods transition to running state
350+
pod_url = namespaced_url.replace('_replicasets', '_pods').replace('apis_extensions_v1beta1', 'api_v1') # noqa
351+
while True:
352+
# The below needs to be done to emulate Deployment handling things
353+
# always cleanup pods
354+
cleanup_pods()
355+
# always transition pods
356+
pod_state_transitions()
357+
358+
current = 0
359+
for pod in filter_data({'labels': rs['metadata']['labels']}, pod_url):
360+
# when this is in the Mock class this can be _pod_ready call
361+
if pod['status']['phase'] == 'Running':
362+
current += 1
363+
364+
deployment['status']['updatedReplicas'] = current
365+
deployment['status']['availableReplicas'] = current
366+
deployment['status']['unavailableReplicas'] = deployment['spec']['replicas'] - current
367+
cache.set(url, deployment, None)
368+
369+
# all ready and matching the intent
370+
if current == deployment['spec']['replicas']:
371+
break
372+
373+
time.sleep(0.5)
374+
375+
del deployment['status']['unavailableReplicas']
376+
cache.set(url, deployment, None)
377+
378+
277379
def filter_data(filters, path):
278380
data = []
279381
rows = cache.get(path, [])
@@ -408,35 +510,27 @@ def post(request, context):
408510
# don't bother adding it to those two resources since they live outside namespace
409511
if resource_type not in ['nodes', 'namespaces']:
410512
namespace = request.url.replace(settings.SCHEDULER_URL + '/api/v1/namespaces/', '')
513+
namespace = request.url.replace(settings.SCHEDULER_URL + '/apis/extensions/v1beta1/namespaces/', '') # noqa
411514
namespace = namespace.split('/')[0]
412515
data['metadata']['namespace'] = namespace
413516

414-
if resource_type == 'replicationcontrollers':
517+
# Handle RC / RS / Deployments
518+
if resource_type in ['replicationcontrollers', 'replicasets', 'deployments']:
415519
data['status'] = {
416520
'observedGeneration': 1
417521
}
418522
data['metadata']['generation'] = 1
419523

420-
upsert_pods(data, url)
524+
if resource_type in ['replicationcontrollers', 'replicasets']:
525+
upsert_pods(data, url)
526+
elif resource_type == 'deployments':
527+
manage_replicasets(data, url)
421528

422529
# deis run is the only thing that creates pods directly
423530
if resource_type == 'pods':
424531
create_pods(url, data['metadata']['labels'], data, 1)
425532
else:
426-
cache.set(url, data, None)
427-
428-
# Keep track of what resources are in a given resource
429-
items = cache.get(resource_type, [])
430-
if url not in items:
431-
items.append(url)
432-
cache.set(resource_type, items, None)
433-
434-
# Keep track of what resources exist under other resources (mostly namespace)
435-
other = cache_key(request.url)
436-
items = cache.get(other, [])
437-
if url not in items:
438-
items.append(url)
439-
cache.set(other, items, None)
533+
add_cache_item(url, resource_type, data)
440534

441535
context.status_code = 201
442536
context.reason = 'Created'
@@ -446,8 +540,8 @@ def post(request, context):
446540
def put(request, context):
447541
"""Process a PUT request to the kubernetes API"""
448542
url = cache_key(request.url)
449-
data = cache.get(url)
450-
if data is None:
543+
item = cache.get(url)
544+
if item is None:
451545
context.status_code = 404
452546
context.reason = 'Not Found'
453547
return {}
@@ -457,11 +551,26 @@ def put(request, context):
457551
# type is the second last element
458552
resource_type = get_type(request.url, -2)
459553

460-
if resource_type == 'replicationcontrollers':
554+
# merge new data into old but keep labels separate in case they changed
555+
labels = data['metadata'].pop('labels')
556+
item['metadata'].update(data['metadata'])
557+
data['metadata'] = item['metadata']
558+
# make sure only new labels are used
559+
data['metadata']['labels'] = labels
560+
561+
# split out deployments and replicasets? due to upsert_pods
562+
if resource_type in ['replicationcontrollers', 'replicasets', 'deployments']:
563+
if 'status' not in data:
564+
# just use what was set last time
565+
data['status'] = {'observedGeneration': item['status']['observedGeneration']}
566+
461567
data['metadata']['resourceVersion'] += 1
462568
data['metadata']['generation'] += 1
463569
data['status']['observedGeneration'] += 1
464-
upsert_pods(data, url)
570+
if resource_type in ['replicationcontrollers', 'replicasets']:
571+
upsert_pods(data, url)
572+
elif resource_type == 'deployments':
573+
manage_replicasets(data, url)
465574

466575
# Update the individual resource
467576
cache.set(url, data, None)
@@ -509,25 +618,42 @@ def delete(request, context):
509618
return {}
510619

511620

512-
def remove_cache_item(url, resource_type):
513-
# remove data object from individual cache
514-
cache.delete(url)
621+
def add_cache_item(url, resource_type, data):
622+
cache.set(url, data, None)
515623

516-
# remove from namespace specific scope
624+
# Keep track of what resources are in a given resource
625+
items = cache.get(resource_type, [])
626+
if url not in items:
627+
items.append(url)
628+
cache.set(resource_type, items, None)
629+
630+
# Keep track of what resources exist under other resources (mostly namespace)
517631
namespace, item = url.split('_{}_'.format(resource_type))
518632
cache_url = '{}_{}'.format(namespace, resource_type)
519633
items = cache.get(cache_url, [])
520-
if url in items:
521-
items.remove(url)
634+
if url not in items:
635+
items.append(url)
636+
cache.set(cache_url, items, None)
522637

523-
cache.set(cache_url, items, None)
638+
639+
def remove_cache_item(url, resource_type):
640+
# remove data object from individual cache
641+
cache.delete(url)
524642

525643
# remove from the resource type global scope
526644
items = cache.get(resource_type, [])
527645
if url in items:
528646
items.remove(url)
529647
cache.set(resource_type, items, None)
530648

649+
# remove from namespace specific scope
650+
namespace, item = url.split('_{}_'.format(resource_type))
651+
cache_url = '{}_{}'.format(namespace, resource_type)
652+
items = cache.get(cache_url, [])
653+
if url in items:
654+
items.remove(url)
655+
cache.set(cache_url, items, None)
656+
531657

532658
def mock(request, context):
533659
# always cleanup pods
@@ -690,5 +816,5 @@ def __init__(self):
690816
# POST | GET /namespaces/{namespace}/pods # noqa
691817
# PATCH (NI) | PUT (NI) | GET | DELETE /namespaces/{namespace}/pods/{pod} # noqa
692818
# GET /namespaces/{namespace}/pods/{pod}/log (needs to be special cased) # noqa
693-
694-
# TODO transitions pod between the various states to emulate real life more
819+
# POST (NI) | GET /namespaces/{namespace}/deployments # noqa
820+
# PATCH (NI) | PUT (NI) | GET | DELETE /namespaces/{namespace}/deployments/{deployment} # noqa

0 commit comments

Comments
 (0)