-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtask.py
More file actions
59 lines (47 loc) · 1.59 KB
/
task.py
File metadata and controls
59 lines (47 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import json
import logging
import threading
import nsq
from functools import wraps
logger = logging.getLogger(__name__)
def _message_handler(message):
def _method():
data = json.loads(message.body)
fun = TASKS[data["target_id"]]
fun(*data["args"], **data["kwargs"])
message.finish()
message.enable_async()
threading.Thread(target=_method).start()
TASKS = {}
NSQD_ADDRS = os.environ.get('DRYCC_NSQD_ADDRS', '127.0.0.1:4150').split(",")
NSQ_TOPIC = os.environ.get('DRYCC_NSQ_TASKS_TOPIC', 'drycc-tasks-topic')
NSQ_CHANNEL = os.environ.get('DRYCC_NSQ_TASKS_CHANNEL', 'drycc-tasks-channel')
NSQD_WRITER = nsq.Writer(NSQD_ADDRS)
NSQD_READER = nsq.Reader(
message_handler=_message_handler,
nsqd_tcp_addresses=NSQD_ADDRS,
topic=NSQ_TOPIC,
channel=NSQ_CHANNEL,
lookupd_poll_interval=15,
)
def task(func):
target_id = "%s.%s" % (func.__module__, func.__name__)
TASKS[target_id] = func
@wraps(func)
def register_task(*args, **kwargs):
return func(*args, **kwargs)
return register_task
def apply_async(target, delay=0, callback=None, args=(), kwargs=None):
target_id = "%s.%s" % (target.__module__, target.__name__)
if target_id not in TASKS:
raise NotImplemented("This task is not registered.")
message = json.dumps({
"target_id": target_id,
"args": args,
"kwargs": {} if kwargs is None else kwargs
}).encode("utf-8")
if delay <= 0:
NSQD_WRITER.pub(NSQ_TOPIC, message, callback=callback)
else:
NSQD_WRITER.dpub(NSQ_TOPIC, delay, message, callback=callback)