Skip to content

Commit 0591f87

Browse files
committed
feat(scheduler): add in better pod cleanup procedure in mock and add in deletionTimestamp handling
1 parent ca32938 commit 0591f87

2 files changed

Lines changed: 78 additions & 36 deletions

File tree

rootfs/deis/testing.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,6 @@
2222
'KEY_PREFIX': DATABASES['default']['NAME'],
2323
}
2424
}
25+
26+
# How long k8s waits for a pod to finish work after a SIGTERM before sending SIGKILL
27+
KUBERNETES_POD_TERMINATION_GRACE_PERIOD_SECONDS = int(os.environ.get('KUBERNETES_POD_TERMINATION_GRACE_PERIOD_SECONDS', 2)) # noqa

rootfs/scheduler/mock.py

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime
1+
from datetime import datetime, timedelta
22
import requests
33
import requests_mock
44
from urllib.parse import urlparse, parse_qs
@@ -47,30 +47,66 @@ def get_type(key, pos=-1):
4747
return 'unknown'
4848

4949

50+
def cleanup_pods():
51+
"""Can be called during any sort of access, it will cleanup pods as needed"""
52+
pods = cache.get('cleanup_pods', {})
53+
for pod, timestamp in pods.copy().items():
54+
if timestamp > datetime.utcnow():
55+
continue
56+
57+
del pods[pod]
58+
remove_cache_item(pod, 'pods')
59+
cache.set('cleanup_pods', pods)
60+
61+
62+
def add_cleanup_pod(url):
63+
"""populate the cleanup pod list"""
64+
# variance allows a pod to stay alive past grace period
65+
variance = random.uniform(0.1, 1.5)
66+
grace = round(settings.KUBERNETES_POD_TERMINATION_GRACE_PERIOD_SECONDS * variance)
67+
68+
# save
69+
pods = cache.get('cleanup_pods', {})
70+
pods[url] = (datetime.utcnow() + timedelta(seconds=grace))
71+
cache.set('cleanup_pods', pods)
72+
73+
# add grace period timestamp
74+
pod = cache.get(url)
75+
pd = datetime.utcnow() + timedelta(seconds=settings.KUBERNETES_POD_TERMINATION_GRACE_PERIOD_SECONDS) # noqa
76+
timestamp = str(pd.strftime(settings.DEIS_DATETIME_FORMAT))
77+
pod['metadata']['deletionTimestamp'] = timestamp
78+
cache.set(url, pod)
79+
80+
81+
def delete_pod(url, data, request):
82+
# Try to determine the connected RC to readjust pod count
83+
# One way is to look at annotations:kubernetes.io/created-by and read
84+
# the serialized reference but that looks clunky right now
85+
controllers = filter_data({'labels': data['metadata']['labels']}, 'replicationcontrollers')
86+
controller = controllers.pop()
87+
upsert_pods(controller, cache_key(request.path))
88+
89+
5090
def delete_pods(url, pods, current, desired):
51-
# remove from namespace scope
91+
if not pods:
92+
return
93+
5294
delta = current - desired
5395

5496
removed = []
55-
items = cache.get(url, [])
56-
for _ in range(delta):
97+
while True:
98+
if len(removed) == delta:
99+
break
100+
57101
item = pods.pop()
58-
items.remove(item)
102+
pod = cache.get(item)
103+
if 'deletionTimestamp' in pod['metadata']:
104+
continue
105+
59106
removed.append(item)
60-
# Remove individual item
61-
cache.delete(item)
62-
# remove from namespace
63-
cache.set(url, items, None)
64107

65-
# remove from global scope
66-
items = cache.get('pods', []) # global scope
67108
for item in removed:
68-
if item in items:
69-
items.remove(item)
70-
cache.set('pods', items, None)
71-
72-
# Remove operation is done. No additions
73-
return
109+
add_cleanup_pod(item)
74110

75111

76112
def create_pods(url, labels, base, new_pods):
@@ -98,7 +134,7 @@ def create_pods(url, labels, base, new_pods):
98134
'name': '{}-{}'.format(labels['app'], labels['type']),
99135
# TODO ready can be True / False (boolean)
100136
'ready': True,
101-
# TODO can be running / terminated or nothing if in pending mode
137+
# TODO can be running / terminated / waiting
102138
'state': {
103139
'running': {
104140
'startedAt': timestamp
@@ -139,8 +175,13 @@ def upsert_pods(controller, url):
139175
# fetch a list of all the pods given the labels
140176
items = []
141177
for item in filter_data({'labels': data['metadata']['labels']}, url):
178+
# skip pods being deleted
179+
if 'deletionTimestamp' in data['metadata']:
180+
continue
181+
142182
# Translate to a cache key since a full object gets passed
143183
items.append(cache_key(url + '_' + item['metadata']['name']))
184+
144185
current = len(items)
145186
desired = controller['spec']['replicas']
146187

@@ -320,16 +361,7 @@ def delete(request, context):
320361
context.reason = 'Not Found'
321362
return {}
322363

323-
# remove data object from individual cache
324-
cache.delete(url)
325-
326-
# remove from the resource type global scope
327364
resource_type = get_type(request.url, -2)
328-
items = cache.get(resource_type, [])
329-
if url in items:
330-
items.remove(url)
331-
cache.set(resource_type, items, None)
332-
333365
# clean everything from a namespace
334366
if resource_type == 'namespaces':
335367
for resource in resources:
@@ -346,14 +378,8 @@ def delete(request, context):
346378
# If a pod belongs to an RC and DELETE operation makes it fall below the
347379
# minimum replicas count then a new pod comes into service
348380
elif resource_type == 'pods':
349-
remove_cache_item(url, resource_type)
350-
351-
# Try to determine the connected RC to readjust pod count
352-
# One way is to look at annotations:kubernetes.io/created-by and read
353-
# the serialized reference but that looks clunky right now
354-
controllers = filter_data({'labels': data['metadata']['labels']}, 'replicationcontrollers')
355-
controller = controllers.pop()
356-
upsert_pods(controller, cache_key(request.path))
381+
# pods have a graceful termination period, handle pods different
382+
delete_pod(url, data, request)
357383
else:
358384
remove_cache_item(url, resource_type)
359385

@@ -364,16 +390,29 @@ def delete(request, context):
364390

365391

366392
def remove_cache_item(url, resource_type):
393+
# remove data object from individual cache
394+
cache.delete(url)
395+
367396
# remove from namespace specific scope
368397
namespace, item = url.split('_{}_'.format(resource_type))
369398
cache_url = '{}_{}'.format(namespace, resource_type)
370399
items = cache.get(cache_url, [])
371400
if url in items:
372401
items.remove(url)
373-
cache.set(cache_url, items, None)
402+
403+
cache.set(cache_url, items, None)
404+
405+
# remove from the resource type global scope
406+
items = cache.get(resource_type, [])
407+
if url in items:
408+
items.remove(url)
409+
cache.set(resource_type, items, None)
374410

375411

376412
def mock(request, context):
413+
# always cleanup pods
414+
cleanup_pods()
415+
377416
# What to do about context
378417
if request.method == 'POST':
379418
return post(request, context)

0 commit comments

Comments
 (0)