11import os
2+ from urllib .parse import urlparse , parse_qs , urlencode
23from kombu import Exchange , Queue
34from celery import Celery
5+ from .config import VALKEY_URL
46
57
68class Config (object ):
79 # Celery Configuration Options
8- timezone = "Asia/Shanghai"
910 enable_utc = True
1011 task_serializer = 'pickle'
1112 accept_content = frozenset ([
12- 'application/data' ,
13- 'application/text' ,
14- 'application/json' ,
15- 'application/x-python-serialize' ,
13+ 'application/data' ,
14+ 'application/text' ,
15+ 'application/json' ,
16+ 'application/x-python-serialize' ,
1617 ])
1718 task_track_started = True
1819 task_time_limit = 30 * 60
1920 worker_max_tasks_per_child = 200
21+ worker_prefetch_multiplier = 1
2022 result_expires = 24 * 60 * 60
21- broker_url = os .environ .get ("HELMBROKER_RABBITMQ_URL" , 'amqp://guest:guest@127.0.0.1:5672/' )
22- broker_connection_retry_on_startup = True
23- task_default_queue = 'helmbroker.low'
23+ cache_backend = 'django-cache'
24+ task_default_queue = 'helmbroker.middle'
2425 task_default_exchange = 'helmbroker.priority'
25- task_default_routing_key = 'helmbroker.priority.low'
26+ task_default_routing_key = 'helmbroker.priority.middle'
27+ broker_transport_options = {"queue_order_strategy" : "sorted" }
28+ task_create_missing_queues = True
29+ task_inherit_parent_priority = True
2630 broker_connection_retry_on_startup = True
2731 worker_cancel_long_running_tasks_on_connection_loss = True
2832
2933
3034app = Celery ('helmbroker' )
3135app .config_from_object (Config ())
3236app .conf .update (
37+ timezone = os .environ .get ('TZ' , 'UTC' ),
3338 task_routes = {
3439 'helmbroker.tasks.provision' : {
3540 'queue' : 'helmbroker.high' ,
@@ -51,22 +56,36 @@ class Config(object):
5156 task_queues = (
5257 Queue (
5358 'helmbroker.low' , exchange = Exchange ('helmbroker.priority' , type = "direct" ),
54- routing_key = 'helmbroker.priority.low' , queue_arguments = { 'x-queue-type' : 'quorum' },
59+ routing_key = 'helmbroker.priority.low' ,
5560 ),
5661 Queue (
5762 'helmbroker.high' , exchange = Exchange ('helmbroker.priority' , type = "direct" ),
58- routing_key = 'helmbroker.priority.high' , queue_arguments = { 'x-queue-type' : 'quorum' },
63+ routing_key = 'helmbroker.priority.high' ,
5964 ),
6065 Queue (
6166 'helmbroker.middle' , exchange = Exchange ('helmbroker.priority' , type = "direct" ),
62- routing_key = 'helmbroker.priority.middle' , queue_arguments = { 'x-queue-type' : 'quorum' },
67+ routing_key = 'helmbroker.priority.middle' ,
6368 ),
6469 ),
6570)
6671app .autodiscover_tasks (("helmbroker.tasks" ,))
67-
68-
69- app .config_from_object (Config ())
72+ url = urlparse (VALKEY_URL )
73+ query = parse_qs (url .query )
74+ broker_transport_options = {"queue_order_strategy" : "sorted" , "visibility_timeout" : 43200 }
75+ result_backend_transport_options = {}
76+ if 'master_set' in query :
77+ master_name = query .pop ('master_set' )[0 ]
78+ password = url .netloc .split ("@" )[0 ].split (":" )[1 ]
79+ kwargs = {'sentinel_kwargs' : {'password' : password }, 'master_name' : master_name }
80+ broker_transport_options .update (kwargs )
81+ result_backend_transport_options .update (kwargs )
82+ VALKEY_URL = f"sentinel://{ url .netloc } { url .path } ?{ urlencode (query )} "
83+ app .conf .update (
84+ broker_url = VALKEY_URL ,
85+ result_backend = VALKEY_URL ,
86+ broker_transport_options = broker_transport_options ,
87+ result_backend_transport_options = result_backend_transport_options ,
88+ )
7089
7190if __name__ == '__main__' :
7291 app .start ()
0 commit comments