Skip to content

Commit 680469b

Browse files
authored
Merge pull request #948 from helgi/fix_race
fix(app): adjust async for better usage and to account for asyncio bug
2 parents 780eb92 + 54f0f75 commit 680469b

1 file changed

Lines changed: 35 additions & 13 deletions

File tree

rootfs/api/utils.py

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
"""
44
import asyncio
55
import base64
6+
import concurrent
67
import hashlib
8+
import logging
79
import random
810
from copy import deepcopy
911

12+
logger = logging.getLogger(__name__)
13+
1014

1115
def generate_app_name():
1216
"""Return a randomly-generated memorable name."""
@@ -144,21 +148,39 @@ def dict_merge(origin, merge):
144148
def async_run(tasks):
145149
"""
146150
run a group of tasks async
147-
Requires the tasks arg to be a list of function.partial
151+
Requires the tasks arg to be a list of functools.partial()
148152
"""
153+
if not tasks:
154+
return
155+
149156
# start a new async event loop
150-
loop = asyncio.new_event_loop()
151-
asyncio.set_event_loop(loop)
152-
153-
async_tasks = [loop.run_in_executor(None, task) for task in tasks]
154-
if async_tasks:
155-
# run deploys in parallel
156-
loop.run_until_complete(asyncio.wait(async_tasks))
157-
# deal with errors (exceptions, etc)
158-
for task in async_tasks:
159-
error = task.exception()
160-
if error is not None:
161-
raise error
157+
loop = asyncio.get_event_loop()
158+
# https://github.com/python/asyncio/issues/258
159+
executor = concurrent.futures.ThreadPoolExecutor(5)
160+
loop.set_default_executor(executor)
161+
162+
async_tasks = [asyncio.ensure_future(async_task(task, loop)) for task in tasks]
163+
# run tasks in parallel
164+
loop.run_until_complete(asyncio.wait(async_tasks))
165+
# deal with errors (exceptions, etc)
166+
for task in async_tasks:
167+
error = task.exception()
168+
if error is not None:
169+
raise error
170+
171+
executor.shutdown(wait=True)
172+
173+
174+
@asyncio.coroutine
175+
def async_task(params, loop):
176+
"""
177+
performs an async task
178+
"""
179+
# get the calling function
180+
logger.debug('Running {}'.format(params))
181+
# This executes a task in its own thread (in parallel)
182+
yield from loop.run_in_executor(None, params)
183+
logger.debug('Finished running {}'.format(params))
162184

163185

164186
if __name__ == "__main__":

0 commit comments

Comments
 (0)