Skip to content

Commit 0a9fb6f

Browse files
committed
feat(deploy): finer grained ptype locks
1 parent 241765b commit 0a9fb6f

7 files changed

Lines changed: 92 additions & 109 deletions

File tree

rootfs/api/models/app.py

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,14 @@ def restart(self, **kwargs): # noqa
249249
deployments.append(self._get_job_id(kwargs['type']))
250250
try:
251251
tasks = [
252-
functools.partial(
253-
self.scheduler().deployment.restart,
254-
self.id,
255-
deployment
252+
(
253+
functools.partial(
254+
self.scheduler().deployment.restart,
255+
self.id,
256+
deployment
257+
),
258+
lambda future: self.log(
259+
f'restart {kwargs['type']} callback: {future.result()}'),
256260
) for deployment in deployments
257261
]
258262
apply_tasks(tasks)
@@ -273,7 +277,7 @@ def scale(self, user, structure):
273277
app_settings = self.appsettings_set.latest()
274278
self._scale(user, structure, release, app_settings)
275279

276-
def pipeline(self, release, ptypes=None, force_deploy=False):
280+
def pipeline(self, release, ptypes, force_deploy=False):
277281
prefix = f"[pipeline] release {release.version_name}"
278282
try:
279283
if release.build is not None:
@@ -306,7 +310,7 @@ def pipeline(self, release, ptypes=None, force_deploy=False):
306310
state="crashed", action="pipeline", ptypes=ptypes, exception=str(e))
307311
self.log(f"{prefix} pipeline runtime error: {release.exception}", logging.ERROR)
308312
finally:
309-
DeployLock(self.pk).release(ptypes)
313+
DeployLock(self.pk).release(ptypes) # release all locks
310314
release.save(update_fields=["state", "failed"]) # avoid overwriting other fields
311315
self.log(f"{prefix} run completed...")
312316

@@ -744,16 +748,20 @@ def _mount(self, user, volume, release, app_settings, structure=None):
744748
'annotations', {})
745749
self.set_application_config(release, scale_type)
746750
# gather volume proc types to be deployed
747-
tasks.append(functools.partial(
748-
self.scheduler().deployment.patch,
749-
namespace=self.id,
750-
name=self._get_job_id(scale_type),
751-
image=release.get_deploy_image(scale_type),
752-
command=release.get_deploy_command(scale_type),
753-
args=release.get_deploy_args(scale_type),
754-
spec_annotations=spec_annotations,
755-
resource_version=deployment["metadata"]["resourceVersion"],
756-
**data
751+
tasks.append((
752+
functools.partial(
753+
self.scheduler().deployment.patch,
754+
namespace=self.id,
755+
name=self._get_job_id(scale_type),
756+
image=release.get_deploy_image(scale_type),
757+
command=release.get_deploy_command(scale_type),
758+
args=release.get_deploy_args(scale_type),
759+
spec_annotations=spec_annotations,
760+
resource_version=deployment["metadata"]["resourceVersion"],
761+
**data
762+
),
763+
lambda future: self.log(
764+
f'mount {volume} for {scale_type} callback: {future.result()}'),
757765
))
758766
try:
759767
apply_tasks(tasks)
@@ -769,19 +777,24 @@ def _deploy(self, deploys, ptypes, prev_release,
769777
deploys = OrderedDict(sorted(deploys.items(), key=lambda d: d[1].get('routable')))
770778
# Check if any proc type has a Deployment in progress
771779
self._check_deployment_in_progress(deploys, force_deploy)
772-
773780
try:
774781
tasks = []
782+
lock = DeployLock(self.pk)
775783
for scale_type, kwargs in deploys.items():
776784
self.set_application_config(release, scale_type)
777-
tasks.append(functools.partial(
778-
self.scheduler().deploy,
779-
namespace=self.id,
780-
name=self._get_job_id(scale_type),
781-
image=release.get_deploy_image(scale_type),
782-
command=release.get_deploy_command(scale_type),
783-
args=release.get_deploy_args(scale_type),
784-
**kwargs
785+
tasks.append((
786+
functools.partial(
787+
self.scheduler().deploy,
788+
namespace=self.id,
789+
name=self._get_job_id(scale_type),
790+
image=release.get_deploy_image(scale_type),
791+
command=release.get_deploy_command(scale_type),
792+
args=release.get_deploy_args(scale_type),
793+
**kwargs
794+
),
795+
lambda future: self.log(
796+
f'deploy and unlock callback: {[
797+
future.result(), lock.release([scale_type])]}'),
785798
))
786799
try:
787800
apply_tasks(tasks)
@@ -797,7 +810,6 @@ def _deploy(self, deploys, ptypes, prev_release,
797810
self.deploy(prev_release, ptypes, True, False)
798811
# let it bubble up
799812
raise DryccException('{}\n{}'.format(err, str(e))) from e
800-
801813
# otherwise just re-raise
802814
raise
803815
except Exception as e:
@@ -876,7 +888,7 @@ def _scale_pods(self, scale_types, release, app_settings):
876888
# create the application config in k8s (secret in this case) for all deploy objects
877889
self.set_application_config(release, scale_type)
878890
# gather all proc types to be deployed
879-
tasks.append(
891+
tasks.append((
880892
functools.partial(
881893
self.scheduler().scale,
882894
namespace=self.id,
@@ -885,8 +897,9 @@ def _scale_pods(self, scale_types, release, app_settings):
885897
command=release.get_deploy_command(scale_type),
886898
args=release.get_deploy_args(scale_type),
887899
**data
888-
)
889-
)
900+
),
901+
lambda future: self.log(f'scale {scale_type} callback: {future.result()}'),
902+
))
890903
try:
891904
apply_tasks(tasks)
892905
except Exception as e:
@@ -1046,10 +1059,9 @@ def _check_deployment_in_progress(self, deploys, force_deploy=False):
10461059
@staticmethod
10471060
def _default_structure(release):
10481061
"""Scale to default structure based on release type"""
1049-
structure = {PTYPE_WEB: 1}
1062+
structure = {}
10501063
for ptype in release.ptypes:
1051-
if ptype != PTYPE_WEB:
1052-
structure[ptype] = 0
1064+
structure[ptype] = 1 if ptype == PTYPE_WEB else 0
10531065
return structure
10541066

10551067
def _scheduler_filter(self, **kwargs):

rootfs/api/models/build.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from django.db import models
44
from django.contrib.auth import get_user_model
55
from api.exceptions import DryccException, Conflict
6-
from .base import UuidAuditedModel
6+
from .base import UuidAuditedModel, PTYPE_WEB
77

88
User = get_user_model()
99
logger = logging.getLogger(__name__)
@@ -45,7 +45,9 @@ def type(self):
4545
def ptypes(self):
4646
if self.dryccfile:
4747
return list(self.dryccfile['deploy'].keys())
48-
return list(self.procfile.keys())
48+
if self.procfile:
49+
return list(self.procfile.keys())
50+
return [PTYPE_WEB]
4951

5052
@property
5153
def source_based(self):

rootfs/api/models/release.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,23 +164,26 @@ def get_port(self, ptype):
164164
'PORT', self.config.values.get('PORT', DEFAULT_CONTAINER_PORT)))
165165

166166
def deploy(self, ptypes=None, force_deploy=False):
167-
msg = 'there is an executing pipeline, please wait or force deploy'
167+
ptypes = set(ptypes).intersection(self.ptypes) if ptypes else self.ptypes
168+
if not ptypes:
169+
self.log(f'skip deploy, ptypes are not within the optional range: {self.ptypes}')
170+
return
168171
# change deployed_ptypes lock
172+
msg = 'there is an executing pipeline, please wait or force deploy'
169173
lock = CacheLock("release:%s" % self.pk)
170174
try:
171175
if lock.acquire() or force_deploy:
172-
if ptypes is None:
173-
deployed_ptypes = self.ptypes
174-
else:
175-
deployed_ptypes = list(set(self.deployed_ptypes).union(ptypes))
176-
type(self).objects.filter(pk=self.pk).update(deployed_ptypes=deployed_ptypes)
176+
deployed_ptypes = list(set(self.deployed_ptypes).union(ptypes))
177+
if deployed_ptypes:
178+
type(self).objects.filter(pk=self.pk).update(deployed_ptypes=deployed_ptypes)
177179
else:
178180
raise DryccException(msg)
179181
finally:
180182
lock.release()
181183
# deploy lock
182-
if not DeployLock(self.app.pk).acquire(ptypes, force=force_deploy):
183-
raise DryccException(msg)
184+
lock = DeployLock(self.app.pk)
185+
if not lock.acquire(ptypes, force=force_deploy):
186+
raise DryccException(f"{msg}: {lock.locked(ptypes)}")
184187
run_pipeline.delay(self, ptypes, force_deploy)
185188

186189
def previous(self):

rootfs/api/tests/test_build.py

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def test_build_default_containers(self, mock_requests):
172172
url = f"/v2/apps/{app_id}/pods/"
173173
response = self.client.get(url)
174174
self.assertEqual(response.status_code, 200, response.data)
175-
self.assertPodContains(response.data['results'], app_id, 'web', "v2", "up")
175+
self.assertEqual(response.data['results'], [])
176176
# start with a new app
177177
app_id = self.create_app()
178178
# post a new build with procfile
@@ -263,13 +263,7 @@ def test_build_forgotten_procfile(self, mock_requests):
263263
url = f"/v2/apps/{app_id}/pods/"
264264
response = self.client.get(url)
265265
self.assertEqual(response.status_code, 200, response.data)
266-
self.assertEqual(len(response.data['results']), 1)
267-
268-
# verify web is in there
269-
url = f"/v2/apps/{app_id}/pods/"
270-
response = self.client.get(url)
271-
self.assertEqual(response.status_code, 200, response.data)
272-
self.assertEqual(len(response.data['results']), 1)
266+
self.assertEqual([item for item in response.data['results'] if item["type"] != "web"], [])
273267

274268
# look at the app structure
275269
url = f"/v2/apps/{app_id}"
@@ -319,35 +313,18 @@ def test_build_no_remove_process(self, mock_requests):
319313
response = self.client.post(url, body)
320314
self.assertEqual(response.status_code, 201, response.data)
321315

322-
# verify worker is still there
323-
url = f"/v2/apps/{app_id}/pods/"
324-
response = self.client.get(url)
325-
self.assertEqual(response.status_code, 200, response.data)
326-
self.assertPodContains(response.data['results'], app_id, 'worker', "v3", "up")
327-
328-
# verify web is still there
316+
# verify web
329317
url = f"/v2/apps/{app_id}/pods/"
330318
response = self.client.get(url)
331319
self.assertEqual(response.status_code, 200, response.data)
320+
self.assertEqual([item for item in response.data['results'] if item["type"] != "web"], [])
332321
self.assertPodContains(response.data['results'], app_id, 'web', "v3", "up")
333322

334323
# look at the app structure
335324
url = f"/v2/apps/{app_id}"
336325
response = self.client.get(url)
337326
self.assertEqual(response.status_code, 200, response.data)
338-
self.assertEqual(response.json()['structure'], {'web': 1, 'worker': 1})
339-
340-
# scale worker to make sure no info was lost
341-
url = f"/v2/apps/{app_id}/ptypes/scale"
342-
body = {'worker': 2} # bump from 1 to 2
343-
response = self.client.post(url, body)
344-
self.assertEqual(response.status_code, 204, response.data)
345-
346-
# verify worker info
347-
url = f"/v2/apps/{app_id}/pods/"
348-
response = self.client.get(url)
349-
self.assertEqual(response.status_code, 200, response.data)
350-
self.assertPodContains(response.data['results'], app_id, 'worker', "v3", "up")
327+
self.assertEqual(response.json()['structure'], {'web': 1})
351328

352329
@override_settings(DRYCC_DEPLOY_REJECT_IF_PROCFILE_MISSING=True)
353330
def test_build_forgotten_procfile_reject(self, mock_requests):
@@ -389,7 +366,12 @@ def test_build_forgotten_procfile_reject(self, mock_requests):
389366
url = f"/v2/apps/{app_id}/build"
390367
body = {'image': 'autotest/example', 'stack': 'container'}
391368
response = self.client.post(url, body)
392-
self.assertEqual(response.status_code, 409, response.data)
369+
self.assertEqual(response.status_code, 201, response.data)
370+
# verify web
371+
url = f"/v2/apps/{app_id}/pods/"
372+
response = self.client.get(url)
373+
self.assertEqual(response.status_code, 200, response.data)
374+
self.assertPodContains(response.data['results'], app_id, 'web', "v3", "up")
393375

394376
def test_build_str(self, mock_requests):
395377
"""Test the text representation of a build."""

rootfs/api/tests/test_lock.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,19 @@ def test_deploy_lock(self):
2525
app_id = f"test_key_1_{int(time.time())}"
2626
lock1 = DeployLock(app_id)
2727
lock2 = DeployLock(app_id)
28-
self.assertEqual(lock1.acquire(None), True)
29-
self.assertEqual(lock2.acquire(["web", "task"]), False)
30-
lock1.release(None)
3128
self.assertEqual(lock2.acquire(["web", "task"]), True)
32-
self.assertEqual(lock1.acquire(None), False)
3329
self.assertEqual(lock1.acquire(["web"]), False)
3430
self.assertEqual(lock1.acquire(["bing"]), True)
3531
lock2.release(["web", "task"])
3632
self.assertEqual(lock1.acquire(["web"]), True)
3733
self.assertEqual(lock1.acquire(["task"]), True)
3834
self.assertEqual(lock2.acquire(["web", "task"]), False)
39-
self.assertEqual(lock2.acquire(None), False)
4035
lock2.release(["web", "task", "bing"])
41-
self.assertEqual(lock2.acquire(None), True)
42-
lock2.release(None)
4336
self.assertEqual(lock1.acquire(["web", "task"]), True)
4437
self.assertEqual(lock2.acquire(["web", "task"]), False)
4538
self.assertEqual(lock2.acquire(["web", "bing"], True), True)
4639
self.assertEqual(lock1.acquire(["web", "bing", "task"]), False)
4740
self.assertEqual(lock1.acquire(["web", "bing", "task"], True), True)
4841
lock2.release(["web", "bing", "task"])
42+
self.assertEqual(lock1.acquire(["web"], True), True)
43+
lock1.release(["web", "bing", "task"])

rootfs/api/tests/test_pods.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -752,10 +752,12 @@ def test_modified_procfile_from_build_removes_pods(self, mock_requests):
752752
}
753753
response = self.client.post(build_url, body)
754754
self.assertEqual(response.status_code, 201, response.data)
755-
# web is default ptype
755+
# check web pods
756756
application = App.objects.get(id=app_id)
757757
pods = application.list_pods(type='web')
758-
self.assertEqual(len(pods), 4)
758+
self.assertEqual(len(pods), 0)
759+
pods = application.list_pods(type='worker')
760+
self.assertEqual(len(pods), 0)
759761

760762
def test_list_pods_failure(self, mock_requests):
761763
"""

rootfs/api/utils.py

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ def apply_tasks(tasks):
160160
return
161161

162162
executor = concurrent.futures.ThreadPoolExecutor(5)
163-
for future in [executor.submit(task) for task in tasks]:
163+
for future, callback in [(executor.submit(task[0]), task[1]) for task in tasks]:
164+
future.add_done_callback(callback)
164165
error = future.exception()
165166
if error is not None:
166167
raise error
@@ -231,32 +232,20 @@ def __init__(self, app_id):
231232

232233
def locked(self, ptypes):
233234
value = cache.get(self.ptypes_key, [])
234-
# None will locks all
235-
if ptypes is None:
236-
if value is None or len(value) > 0:
237-
return True
238-
elif len(value) == 0:
239-
return False
240-
else:
241-
if value is None:
242-
return True
243-
for ptype in ptypes:
244-
if ptype in value:
245-
return True
246-
return False
235+
locked_ptypes = []
236+
for ptype in ptypes:
237+
if ptype in value:
238+
locked_ptypes.append(ptype)
239+
return locked_ptypes
247240

248241
def acquire(self, ptypes, force=False):
249242
try:
250243
if super(DeployLock, self).acquire():
251244
value = cache.get(self.ptypes_key, [])
252-
# None will locks all
253-
if not force and self.locked(ptypes):
245+
if not force and len(self.locked(ptypes)) > 0:
254246
return False
255-
if ptypes is None:
256-
value = None
257-
else:
258-
value.extend(ptypes)
259-
value = list(set(value))
247+
value.extend(ptypes)
248+
value = list(set(value))
260249
cache.set(self.ptypes_key, value, timeout=3600)
261250
return True
262251
finally:
@@ -266,15 +255,13 @@ def acquire(self, ptypes, force=False):
266255
def release(self, ptypes):
267256
try:
268257
if super(DeployLock, self).acquire():
269-
if ptypes is None:
270-
cache.delete(self.ptypes_key)
271-
else:
272-
value = cache.get(self.ptypes_key, [])
273-
if value is None:
274-
return
275-
for ptype in ptypes:
258+
value = cache.get(self.ptypes_key, [])
259+
if value is None:
260+
return
261+
for ptype in ptypes:
262+
if ptype in value:
276263
value.remove(ptype)
277-
cache.set(self.ptypes_key, value)
264+
cache.set(self.ptypes_key, value)
278265
finally:
279266
super(DeployLock, self).release()
280267

0 commit comments

Comments
 (0)