Skip to content

Commit 7b6e17b

Browse files
committed
chore(utils): use threads replace asyncio
1 parent c4320cd commit 7b6e17b

2 files changed

Lines changed: 7 additions & 29 deletions

File tree

rootfs/api/models/app.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from api.models.tls import TLS
2727
from api.models.appsettings import AppSettings
2828
from api.models.volume import Volume
29-
from api.utils import generate_app_name, async_run
29+
from api.utils import generate_app_name, apply_tasks
3030
from scheduler import KubeHTTPException, KubeException
3131

3232
logger = logging.getLogger(__name__)
@@ -383,7 +383,7 @@ def restart(self, **kwargs): # noqa
383383
) for pod in self.list_pods(**kwargs)
384384
]
385385

386-
async_run(tasks)
386+
apply_tasks(tasks)
387387
except Exception as e:
388388
err = "warning, some pods failed to stop:\n{}".format(str(e))
389389
self.log(err, logging.WARNING)
@@ -528,7 +528,7 @@ def _scale_pods(self, scale_types):
528528
# create the application config in k8s (secret in this case) for all deploy objects
529529
self.set_application_config(release)
530530

531-
async_run(tasks)
531+
apply_tasks(tasks)
532532
except Exception as e:
533533
err = '(scale): {}'.format(e)
534534
self.log(err, logging.ERROR)
@@ -619,7 +619,7 @@ def deploy(self, release, force_deploy=False, rollback_on_failure=True): # noqa
619619
]
620620

621621
try:
622-
async_run(tasks)
622+
apply_tasks(tasks)
623623
except KubeException as e:
624624
# Don't rollback if the previous release doesn't have a build which means
625625
# this is the first build and all the previous releases are just config changes.

rootfs/api/utils.py

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""
22
Helper functions used by the Drycc server.
33
"""
4-
import asyncio
54
import base64
65
import concurrent
76
import hashlib
@@ -145,43 +144,22 @@ def dict_merge(origin, merge):
145144
return result
146145

147146

148-
def async_run(tasks):
147+
def apply_tasks(tasks):
149148
"""
150149
run a group of tasks async
151150
Requires the tasks arg to be a list of functools.partial()
152151
"""
153152
if not tasks:
154153
return
155154

156-
# start a new async event loop
157-
loop = asyncio.get_event_loop()
158-
# https://github.com/python/asyncio/issues/258
159155
executor = concurrent.futures.ThreadPoolExecutor(5)
160-
loop.set_default_executor(executor)
161-
162-
async_tasks = [asyncio.ensure_future(async_task(task, loop)) for task in tasks]
163-
# run tasks in parallel
164-
loop.run_until_complete(asyncio.wait(async_tasks))
165-
# deal with errors (exceptions, etc)
166-
for task in async_tasks:
167-
error = task.exception()
156+
for future in [executor.submit(task) for task in tasks]:
157+
error = future.exception()
168158
if error is not None:
169159
raise error
170-
171160
executor.shutdown(wait=True)
172161

173162

174-
async def async_task(params, loop):
175-
"""
176-
Perform a task asynchronously.
177-
"""
178-
# get the calling function
179-
logger.debug('Running {}'.format(params))
180-
# This executes a task in its own thread (in parallel)
181-
await loop.run_in_executor(None, params)
182-
logger.debug('Finished running {}'.format(params))
183-
184-
185163
if __name__ == "__main__":
186164
import doctest
187165
doctest.testmod()

0 commit comments

Comments
 (0)