Skip to content

Commit 996be37

Browse files
committed
fix(mock): mock scheduler could not gracefully handle the concurrent deploy / scale
This adds a simple locking mechanism to ensure cache items are not overwriting one another when handling some of the bigger "global" lists
1 parent 0e2ebd7 commit 996be37

1 file changed

Lines changed: 64 additions & 1 deletion

File tree

rootfs/scheduler/mock.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,61 @@
1818
logger = logging.getLogger(__name__)
1919

2020

21+
class LockNotAcquiredError(Exception):
22+
pass
23+
24+
25+
class CacheLock(object):
26+
def __init__(self, key=None, timeout=60, block=True):
27+
"""
28+
:param key: unique key for lock (unique through Django cache)
29+
:param timeout: timeout of lock, in seconds
30+
:param block: if a lock is blocking
31+
"""
32+
self.key = key
33+
self.timeout = timeout
34+
self.block = block
35+
36+
# for use with decorator
37+
def __call__(self, f):
38+
if not self.key:
39+
self.key = "%s:%s" % (f.__module__, f.__name__)
40+
41+
def wrapped(*args, **kargs):
42+
with self:
43+
return f(*args, **kargs)
44+
45+
return wrapped
46+
47+
def __enter__(self):
48+
if not type(self.key) == str and self.key == '':
49+
raise RuntimeError("Key not specified!")
50+
51+
if not self.acquire(self.block):
52+
raise LockNotAcquiredError()
53+
54+
logger.debug("locking with key %s" % self.key)
55+
56+
def __exit__(self, type, value, traceback):
57+
logger.debug('releasing lock {}'.format(self.key))
58+
self.release()
59+
60+
def acquire(self, block=True):
61+
while not self._acquire():
62+
if not block:
63+
return False
64+
65+
time.sleep(0.1)
66+
67+
return True
68+
69+
def release(self):
70+
cache.delete(self.key)
71+
72+
def _acquire(self):
73+
return cache.add(self.key, 'true', self.timeout)
74+
75+
2176
resources = [
2277
'namespaces', 'nodes', 'pods', 'replicationcontrollers',
2378
'secrets', 'services', 'events', 'deployments', 'replicasets',
@@ -53,6 +108,7 @@ def get_type(key, pos=-1):
53108
return 'unknown'
54109

55110

111+
@CacheLock()
56112
def pod_state_transitions(pod_url=None):
57113
"""
58114
Move pods through the various states while maintaining
@@ -110,6 +166,7 @@ def pod_state_transitions(pod_url=None):
110166
cache.set('pods_states', pods)
111167

112168

169+
@CacheLock()
113170
def cleanup_pods():
114171
"""Can be called during any sort of access, it will cleanup pods as needed"""
115172
pods = cache.get('cleanup_pods', {})
@@ -119,10 +176,10 @@ def cleanup_pods():
119176

120177
del pods[pod]
121178
remove_cache_item(pod, 'pods')
122-
cache.delete(pod + '_log')
123179
cache.set('cleanup_pods', pods)
124180

125181

182+
@CacheLock()
126183
def add_cleanup_pod(url):
127184
"""populate the cleanup pod list"""
128185
# variance allows a pod to stay alive past grace period
@@ -278,6 +335,7 @@ def upsert_pods(controller, url):
278335
create_pods(url, data['metadata']['labels'], data, delta)
279336

280337

338+
@CacheLock()
281339
def manage_replicasets(deployment, url):
282340
"""
283341
Creates a new ReplicaSet, scales up the pods and
@@ -618,6 +676,7 @@ def delete(request, context):
618676
return {}
619677

620678

679+
@CacheLock()
621680
def add_cache_item(url, resource_type, data):
622681
cache.set(url, data, None)
623682

@@ -636,9 +695,13 @@ def add_cache_item(url, resource_type, data):
636695
cache.set(cache_url, items, None)
637696

638697

698+
@CacheLock()
639699
def remove_cache_item(url, resource_type):
640700
# remove data object from individual cache
641701
cache.delete(url)
702+
# get rid of log element as well for pods
703+
if resource_type == 'pod':
704+
cache.delete(url + '_log')
642705

643706
# remove from the resource type global scope
644707
items = cache.get(resource_type, [])

0 commit comments

Comments
 (0)