Skip to content

Commit fca588c

Browse files
committed
feat(tasks): change nsq reader to async
1 parent af653b4 commit fca588c

2 files changed

Lines changed: 20 additions & 10 deletions

File tree

rootfs/tasks/task.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,29 @@
11
import os
22
import json
3+
import logging
34
import threading
45
import nsq
56
from functools import wraps
67

78

9+
logger = logging.getLogger(__name__)
10+
11+
812
def _message_handler(message):
9-
data = json.loads(message.body)
10-
method = TASKS[data["target_id"]]
11-
threading.Thread(
12-
target=method, args=data["args"], kwargs=data["kwargs"]).start()
13-
return True
13+
14+
def _method():
15+
data = json.loads(message.body)
16+
fun = TASKS[data["target_id"]]
17+
fun(*data["args"], **data["kwargs"])
18+
message.finish()
19+
message.enable_async()
20+
threading.Thread(target=_method).start()
1421

1522

1623
TASKS = {}
1724
NSQD_ADDRS = os.environ.get('DRYCC_NSQD_ADDRS', '127.0.0.1:4150').split(",")
18-
NSQ_TOPIC = os.environ.get('DRYCC_NSQ_TASKS_TOPIC', 'tasks:topic')
19-
NSQ_CHANNEL = os.environ.get('DRYCC_NSQ_TASKS_CHANNEL', 'tasks:channel')
25+
NSQ_TOPIC = os.environ.get('DRYCC_NSQ_TASKS_TOPIC', 'drycc-tasks-topic')
26+
NSQ_CHANNEL = os.environ.get('DRYCC_NSQ_TASKS_CHANNEL', 'drycc-tasks-channel')
2027
NSQD_WRITER = nsq.Writer(NSQD_ADDRS)
2128
NSQD_READER = nsq.Reader(
2229
message_handler=_message_handler,

rootfs/tasks/tests/test_task.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@ def t1(name, value):
2626
def t2(t):
2727
self.assertEqual(time.time() - t > 3, True)
2828

29+
def callback(addr, msg):
30+
self.assertEqual(msg==b'OK', True)
31+
2932
threading.Thread(
3033
target=tornado.ioloop.IOLoop.current().start).start()
3134
time.sleep(3)
32-
apply_async(t1, name="hello", value="word")
33-
apply_async(t2, delay=3000, t=time.time())
34-
time.sleep(6)
35+
apply_async(t1, callback=callback, name="hello", value="word")
36+
apply_async(t2, callback=callback, delay=3000, t=time.time())
37+
time.sleep(12)
3538
tornado.ioloop.IOLoop.current().stop()

0 commit comments

Comments
 (0)