Skip to content

Commit ba52f66

Browse files
committed
fix(helmbroker): rabbitmq sharding err
1 parent a9ffe43 commit ba52f66

1 file changed

Lines changed: 25 additions & 8 deletions

File tree

rootfs/helmbroker/celery.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import os
2+
from kombu import Exchange, Queue
23
from celery import Celery
34

45

5-
# The queue and exchange names cannot be the same
6-
# otherwise an error will occur when enabling the sharding plugin
7-
class Config:
6+
class Config(object):
87
# Celery Configuration Options
98
timezone = "Asia/Shanghai"
109
enable_utc = True
@@ -19,20 +18,38 @@ class Config:
1918
task_time_limit = 30 * 60
2019
worker_max_tasks_per_child = 200
2120
result_expires = 24 * 60 * 60
21+
broker_url = os.environ.get("DRYCC_RABBITMQ_URL", 'amqp://guest:guest@127.0.0.1:5672/') # noqa
2222
broker_connection_retry_on_startup = True
2323
task_default_queue = 'low'
2424
task_default_exchange = 'helmbroker.priority'
2525
task_default_routing_key = 'helmbroker.priority.low'
26+
broker_connection_retry_on_startup = True
2627
worker_cancel_long_running_tasks_on_connection_loss = True
2728

2829

29-
app = Celery(
30-
'helmbroker',
31-
broker=os.environ.get("DRYCC_RABBITMQ_URL"),
32-
include=['helmbroker.tasks']
30+
app = Celery('helmbroker')
31+
app.config_from_object(Config())
32+
app.conf.update(
33+
task_routes={
34+
'helmbroker.tasks': {
35+
'queue': 'low',
36+
'exchange': 'helmbroker.priority',
37+
'routing_key': 'helmbroker.priority.high',
38+
},
39+
},
40+
task_queues=(
41+
Queue(
42+
'low',
43+
exchange=Exchange('helmbroker.priority', type="direct"),
44+
routing_key='helmbroker.priority.low',
45+
queue_arguments={'x-max-priority': 16},
46+
),
47+
),
3348
)
49+
app.autodiscover_tasks()
50+
3451

35-
app.config_from_object(Config)
52+
app.config_from_object(Config())
3653

3754
if __name__ == '__main__':
3855
app.start()

0 commit comments

Comments
 (0)