Skip to content

Commit 068f096

Browse files
committed
fix(pynsq): no current event loop in thread
1 parent 3946faa commit 068f096

2 files changed

Lines changed: 24 additions & 17 deletions

File tree

rootfs/tasks/task.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
import os
22
import json
33
import logging
4-
import threading
4+
import asyncio
5+
import concurrent
56
import nsq
6-
from functools import wraps
7-
7+
from functools import wraps, partial
88

99
logger = logging.getLogger(__name__)
1010

1111

1212
def _message_handler(message):
1313

14-
def _method():
14+
async def _async_task(loop):
1515
data = json.loads(message.body)
16-
fun = TASKS[data["target_id"]]
17-
fun(*data["args"], **data["kwargs"])
18-
message.finish()
16+
function = TASKS[data["target_id"]]
17+
_sync_task = partial(function, *data["args"], **data["kwargs"])
18+
with concurrent.futures.ThreadPoolExecutor() as executor:
19+
await loop.run_in_executor(executor, _sync_task)
20+
message.finish()
21+
1922
message.enable_async()
20-
threading.Thread(target=_method).start()
23+
asyncio.set_event_loop(asyncio.new_event_loop())
24+
asyncio.ensure_future(_async_task(asyncio.get_event_loop()))
2125

2226

2327
TASKS = {}
@@ -41,6 +45,7 @@ def task(func):
4145
@wraps(func)
4246
def register_task(*args, **kwargs):
4347
return func(*args, **kwargs)
48+
4449
return register_task
4550

4651

@@ -53,6 +58,8 @@ def apply_async(target, delay=0, callback=None, args=(), kwargs=None):
5358
"args": args,
5459
"kwargs": {} if kwargs is None else kwargs
5560
}).encode("utf-8")
61+
62+
asyncio.set_event_loop(asyncio.new_event_loop())
5663
if delay <= 0:
5764
NSQD_WRITER.pub(NSQ_TOPIC, message, callback=callback)
5865
else:

rootfs/tasks/tests/test_task.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,21 @@ def t():
1818
def test_apply_async(self):
1919

2020
@task
21-
def t1(name, value):
22-
self.assertEqual(name == "hello", True)
21+
def t1(name, value, t):
22+
self.assertEqual(name == "hi", True)
2323
self.assertEqual(value == "word", True)
2424

2525
@task
2626
def t2(t):
2727
self.assertEqual(time.time() - t > 3, True)
2828

29-
def callback(addr, msg):
29+
def callback(_, msg):
3030
self.assertEqual(msg == b'OK', True)
3131

32-
threading.Thread(
33-
target=tornado.ioloop.IOLoop.current().start).start()
34-
time.sleep(3)
35-
apply_async(t1, callback=callback, args=("hello", "word"))
32+
loop = tornado.ioloop.IOLoop.current()
33+
threading.Thread(target=loop.start).start()
34+
time.sleep(9)
35+
apply_async(t1, callback=callback, args=("hi", "word", time.time()))
3636
apply_async(t2, callback=callback, delay=3000, args=(time.time(), ))
37-
time.sleep(12)
38-
tornado.ioloop.IOLoop.current().stop()
37+
time.sleep(9)
38+
loop.stop()

0 commit comments

Comments
 (0)