Skip to content

Commit ccf988b

Browse files
committed
feat(apps): scale / deploy run all process types in parallel instead of sequantially
This also moves the restart function to use api.utils.async_run - it is a slightly modified version of the original to be exception aware and not require an internal function
1 parent f7df29c commit ccf988b

2 files changed

Lines changed: 63 additions & 38 deletions

File tree

rootfs/api/models/app.py

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import asyncio
21
import backoff
32
from collections import OrderedDict
43
from datetime import datetime
4+
import functools
55
import logging
66
import random
77
import re
@@ -19,7 +19,7 @@
1919
from api import __version__ as deis_version
2020
from api.models import UuidAuditedModel, AlreadyExists, DeisException, ServiceUnavailable
2121

22-
from api.utils import generate_app_name
22+
from api.utils import generate_app_name, async_run
2323
from api.models.release import Release
2424
from api.models.config import Config
2525
from api.models.domain import Domain
@@ -295,24 +295,15 @@ def restart(self, **kwargs): # noqa
295295
return []
296296

297297
try:
298-
@asyncio.coroutine
299-
def delete_pod(namespace, name, loop):
300-
"""
301-
A synchronous function that deletes a pod
302-
"""
303-
logger.debug('Deleting pod {} as part of a pod restart call'.format(name))
304-
# Gives a pod however long the termination grace period is to terminate
305-
# This executes a delete in its own thread (in parallel)
306-
yield from loop.run_in_executor(None, self._scheduler.delete_pod, namespace, name)
307-
logger.debug('Finished deleting pod {}'.format(name))
308-
309-
# gather all pods to be deleted
310-
loop = asyncio.get_event_loop()
311-
tasks = [delete_pod(self.id, pod['name'], loop) for pod in self.list_pods(**kwargs)]
312-
if tasks:
313-
# run deletes in parallel
314-
loop.run_until_complete(asyncio.wait(tasks))
315-
298+
tasks = [
299+
functools.partial(
300+
self._scheduler.delete_pod,
301+
self.id,
302+
pod['name']
303+
) for pod in self.list_pods(**kwargs)
304+
]
305+
306+
async_run(tasks)
316307
except Exception as e:
317308
err = "warning, some pods failed to stop:\n{}".format(str(e))
318309
self.log(err, logging.WARNING)
@@ -436,6 +427,7 @@ def _scale_pods(self, scale_types):
436427
# see if there is a global or app specific setting to specify Deployments usage
437428
deployments = bool(envs.get('DEIS_KUBERNETES_DEPLOYMENTS', settings.DEIS_KUBERNETES_DEPLOYMENTS)) # noqa
438429

430+
tasks = []
439431
for scale_type, replicas in scale_types.items():
440432
# only web / cmd are routable
441433
# http://docs.deis.io/en/latest/using_deis/process-types/#web-vs-cmd-process-types
@@ -462,19 +454,25 @@ def _scale_pods(self, scale_types):
462454
'deploy_timeout': deploy_timeout,
463455
}
464456

465-
try:
466-
self._scheduler.scale(
457+
# gather all proc types to be deployed
458+
tasks.append(
459+
functools.partial(
460+
self._scheduler.scale,
467461
namespace=self.id,
468462
name=self._get_job_id(scale_type),
469463
image=release.image,
470464
entrypoint=self._get_entrypoint(scale_type),
471465
command=self._get_command(scale_type),
472466
**kwargs
473467
)
474-
except Exception as e:
475-
err = '{} (scale): {}'.format(self._get_job_id(scale_type), e)
476-
self.log(err, logging.ERROR)
477-
raise ServiceUnavailable(err) from e
468+
)
469+
470+
try:
471+
async_run(tasks)
472+
except Exception as e:
473+
err = '(scale): {}'.format(e)
474+
self.log(err, logging.ERROR)
475+
raise ServiceUnavailable(err) from e
478476

479477
def deploy(self, release, force_deploy=False):
480478
"""
@@ -538,32 +536,38 @@ def deploy(self, release, force_deploy=False):
538536
# Sort deploys so routable comes first
539537
deploys = OrderedDict(sorted(deploys.items(), key=lambda d: d[1].get('routable')))
540538

539+
# Check if any proc type has a Deployment in progress
541540
for scale_type, kwargs in deploys.items():
542541
# Is there an existing deployment in progress?
543542
name = self._get_job_id(scale_type)
544543
if not force_deploy and release.deployment_in_progress(self.id, name):
545544
raise AlreadyExists('Deployment for {} is already in progress'.format(name))
546545

547-
try:
548-
self._scheduler.deploy(
546+
try:
547+
# gather all proc types to be deployed
548+
tasks = [
549+
functools.partial(
550+
self._scheduler.deploy,
549551
namespace=self.id,
550552
name=self._get_job_id(scale_type),
551553
image=release.image,
552554
entrypoint=self._get_entrypoint(scale_type),
553555
command=self._get_command(scale_type),
554556
**kwargs
555-
)
557+
) for scale_type, kwargs in deploys.items()
558+
]
556559

557-
# Wait until application is available in the router
558-
# Only run when there is no previous build / release
559-
old = release.previous()
560-
if old is None or old.build is None:
561-
self.verify_application_health(**kwargs)
560+
async_run(tasks)
561+
except Exception as e:
562+
err = '(app::deploy): {}'.format(e)
563+
self.log(err, logging.ERROR)
564+
raise ServiceUnavailable(err) from e
562565

563-
except Exception as e:
564-
err = '{} (app::deploy): {}'.format(self._get_job_id(scale_type), e)
565-
self.log(err, logging.ERROR)
566-
raise ServiceUnavailable(err) from e
566+
# Wait until application is available in the router
567+
# Only run when there is no previous build / release
568+
old = release.previous()
569+
if old is None or old.build is None:
570+
self.verify_application_health(**kwargs)
567571

568572
# cleanup old release objects from kubernetes
569573
release.cleanup_old(deployments)

rootfs/api/utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""
22
Helper functions used by the Deis server.
33
"""
4+
import asyncio
45
import base64
56
import hashlib
67
import random
@@ -140,6 +141,26 @@ def dict_merge(origin, merge):
140141
return result
141142

142143

144+
def async_run(tasks):
145+
"""
146+
run a group of tasks async
147+
Requires the tasks arg to be a list of function.partial
148+
"""
149+
# start a new async event loop
150+
loop = asyncio.new_event_loop()
151+
asyncio.set_event_loop(loop)
152+
153+
async_tasks = [loop.run_in_executor(None, task) for task in tasks]
154+
if async_tasks:
155+
# run deploys in parallel
156+
loop.run_until_complete(asyncio.wait(async_tasks))
157+
# deal with errors (exceptions, etc)
158+
for task in async_tasks:
159+
error = task.exception()
160+
if error is not None:
161+
raise error
162+
163+
143164
if __name__ == "__main__":
144165
import doctest
145166
doctest.testmod()

0 commit comments

Comments
 (0)