Skip to content

Commit 25a9550

Browse files
段洪义lijianguo
authored andcommitted
feat(tasks): use celery replace nsqd
1 parent 08440c1 commit 25a9550

14 files changed

Lines changed: 119 additions & 162 deletions

File tree

rootfs/Dockerfile.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ RUN echo https://dl-cdn.alpinelinux.org/alpine/edge/testing >>/etc/apk/repositor
2929
shadow \
3030
nsq \
3131
postgresql \
32+
redis \
3233
&& mkdir -p /run/postgresql $PGDATA \
3334
&& chown -R postgres:postgres /run/postgresql $PGDATA \
3435
&& apk del .build-deps \

rootfs/api/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""
22
The **api** Django app presents a RESTful web API for interacting with the **drycc** system.
33
"""
4+
from .settings.celery import app as celery_app
45

56
__version__ = '2.3.0'
7+
__all__ = ('celery_app',)

rootfs/api/models/__init__.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,25 @@
1111
import re
1212
import urllib.parse
1313
import uuid
14+
import requests
1415

16+
from datetime import timedelta
1517
from django.conf import settings
1618
from django.db import models
1719
from django.db.models.signals import post_delete, post_save
18-
from django.dispatch import receiver
20+
from django.utils.timezone import now
21+
from django.dispatch import receiver, Signal
1922
from rest_framework.exceptions import ValidationError
2023
from rest_framework.authtoken.models import Token
21-
import requests
2224
from requests_toolbelt import user_agent
23-
24-
from api import __version__ as drycc_version
25-
from api.exceptions import DryccException, AlreadyExists, ServiceUnavailable, UnprocessableEntity # noqa
26-
from api.utils import dict_merge
27-
from scheduler import KubeException
25+
from scheduler.exceptions import KubeException
26+
from .. import __version__ as drycc_version
27+
from ..exceptions import DryccException, AlreadyExists, ServiceUnavailable, UnprocessableEntity # noqa
2828

2929

3030
logger = logging.getLogger(__name__)
31-
3231
session = None
32+
resource_changed = Signal(providing_args=["resource_id"])
3333

3434

3535
def get_session():
@@ -153,6 +153,9 @@ class Meta:
153153
from .volume import Volume # noqa
154154
from .resource import Resource # noqa
155155

156+
from ..tasks import retrieve_resource # noqa
157+
from ..utils import dict_merge # noqa
158+
156159
# define update/delete callbacks for synchronizing
157160
# models with the configuration management backend
158161

@@ -257,3 +260,15 @@ def _hook_release_created(**kwargs):
257260
def create_auth_token(sender, instance=None, created=False, **kwargs):
258261
if created:
259262
Token.objects.create(user=instance)
263+
264+
265+
@receiver(resource_changed)
266+
def resource_changed_handle(sender, **kwargs):
267+
data = {
268+
"task_id": uuid.uuid4().hex,
269+
"resource_id": kwargs.get("resource_id"),
270+
}
271+
retrieve_resource.apply_async(
272+
args=(data, ),
273+
eta=now() + timedelta(seconds=30)
274+
)

rootfs/api/models/resource.py

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
import logging
2-
import uuid
3-
import time
4-
from django.core import signals
52
from django.conf import settings
63
from django.db import models, transaction
74
from jsonfield import JSONField
85
from api.exceptions import DryccException, AlreadyExists, ServiceUnavailable
96
from api.models import UuidAuditedModel, validate_label
107
from scheduler import KubeException
11-
from tasks import task, apply_async
8+
from . import resource_changed
129

1310
logger = logging.getLogger(__name__)
1411

@@ -58,11 +55,7 @@ def attach(self, *args, **kwargs):
5855
self._scheduler.svcat.create_instance(
5956
self.app.id, self.name, **kwargs
6057
)
61-
data = {
62-
"task_id": uuid.uuid4().hex,
63-
"resource_id": str(self.uuid),
64-
}
65-
apply_async(retrieve_task, delay=30000, args=(data, ))
58+
resource_changed.send(sender=Resource, resource_id=str(self.uuid))
6659
except KubeException as e:
6760
msg = 'There was a problem creating the resource ' \
6861
'{} for {}'.format(self.name, self.app_id)
@@ -111,11 +104,7 @@ def bind(self, *args, **kwargs):
111104
try:
112105
self._scheduler.svcat.create_binding(
113106
self.app.id, self.name, **kwargs)
114-
data = {
115-
"task_id": uuid.uuid4().hex,
116-
"resource_id": str(self.uuid),
117-
}
118-
apply_async(retrieve_task, delay=30000, args=(data, ))
107+
resource_changed.send(sender=Resource, resource_id=str(self.uuid))
119108
except KubeException as e:
120109
msg = 'There was a problem binding the resource ' \
121110
'{} for {}'.format(self.name, self.app_id)
@@ -153,11 +142,7 @@ def attach_update(self, *args, **kwargs):
153142
self._scheduler.svcat.put_instance(
154143
self.app.id, self.name, version, **kwargs
155144
)
156-
data = {
157-
"task_id": uuid.uuid4().hex,
158-
"resource_id": str(self.uuid),
159-
}
160-
apply_async(retrieve_task, delay=30000, args=(data, ))
145+
resource_changed.send(sender=Resource, resource_id=str(self.uuid))
161146
except KubeException as e:
162147
msg = 'There was a problem update the resource ' \
163148
'{} for {}'.format(self.name, self.app_id)
@@ -214,27 +199,3 @@ def detach_resource(self, *args, **kwargs):
214199

215200
if (self.status != "Ready") or (not self.binding):
216201
self.delete()
217-
218-
219-
@task
220-
def retrieve_task(data):
221-
try:
222-
signals.request_started.send(sender=data['task_id'])
223-
try:
224-
resource = Resource.objects.get(uuid=data['resource_id'])
225-
except Resource.DoesNotExist:
226-
logger.info("retrieve task not found resource: {}".format(data['resource_id'])) # noqa
227-
return True
228-
_ = resource.retrieve()
229-
if _:
230-
return True
231-
else:
232-
t = time.time() - resource.created.timestamp()
233-
if t < 3600:
234-
apply_async(retrieve_task, delay=30000, args=(data, ))
235-
elif t < 3600 * 12:
236-
apply_async(retrieve_task, delay=1800000, args=(data, ))
237-
else:
238-
resource.detach_resource()
239-
finally:
240-
signals.request_finished.send(sender=data['task_id'])

rootfs/api/settings/celery.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import os
2+
from celery import Celery
3+
4+
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings.production')
5+
app = Celery('drycc')
6+
app.config_from_object('django.conf:settings', namespace='CELERY')
7+
app.conf.update(
8+
task_routes={
9+
'api.tasks.retrieve_resource': {'queue': 'priority.high'},
10+
},
11+
)
12+
app.autodiscover_tasks()

rootfs/api/settings/production.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,18 @@
443443
}
444444
}
445445

446+
REDIS_ADDRS = os.environ.get('REDIS_ADDRS', 'redis://127.0.0.1:6379/0').split(",")
447+
448+
CACHES = {
449+
"default": {
450+
"BACKEND": "django_redis.cache.RedisCache",
451+
"LOCATION": REDIS_ADDRS,
452+
"OPTIONS": {
453+
"CLIENT_CLASS": "django_redis.client.ShardClient",
454+
}
455+
}
456+
}
457+
446458
APP_URL_REGEX = '[a-z0-9-]+'
447459

448460
# LDAP settings taken from environment variables.
@@ -499,3 +511,12 @@
499511
AUTH_LDAP_MIRROR_GROUPS = True
500512
AUTH_LDAP_FIND_GROUP_PERMS = True
501513
AUTH_LDAP_CACHE_GROUPS = False
514+
515+
# Celery Configuration Options
516+
CELERY_TIMEZONE = "Asia/Shanghai"
517+
CELERY_TASK_TRACK_STARTED = True
518+
CELERY_TASK_TIME_LIMIT = 30 * 60
519+
CELERY_BROKER_URL = REDIS_ADDRS[0]
520+
CELERY_RESULT_BACKEND = REDIS_ADDRS[0]
521+
CELERY_CACHE_BACKEND = 'django-cache'
522+
CELERY_DEFAULT_QUEUE = 'priority.middle'

rootfs/api/tasks.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Create your tasks here
2+
import time
3+
import logging
4+
from django.core import signals
5+
from datetime import timedelta
6+
from django.utils.timezone import now
7+
from celery import shared_task
8+
9+
from .models.resource import Resource
10+
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
@shared_task
16+
def retrieve_resource(data):
17+
signals.request_started.send(sender=data['task_id'])
18+
try:
19+
resource = Resource.objects.get(uuid=data['resource_id'])
20+
if not resource.retrieve():
21+
t = time.time() - resource.created.timestamp()
22+
if t < 3600:
23+
retrieve_resource.apply_async(
24+
args=(data, ),
25+
eta=now() + timedelta(seconds=30))
26+
elif t < 3600 * 12:
27+
retrieve_resource.apply_async(
28+
args=(data, ),
29+
eta=now() + timedelta(seconds=1800))
30+
else:
31+
resource.detach_resource()
32+
except Resource.DoesNotExist:
33+
logger.info("retrieve task not found resource: {}".format(data['resource_id'])) # noqa
34+
finally:
35+
signals.request_finished.send(sender=data['task_id'])

rootfs/bin/boot

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ echo "Loading database information to Kubernetes in the background"
3737
echo "Log of the run can be found in /app/data/logs/load_db_state_to_k8s.log"
3838
# python -u avoids output buffering
3939
nohup python -u /app/manage.py load_db_state_to_k8s > /app/data/logs/load_db_state_to_k8s.log &
40-
40+
# celery
41+
nohup su-exec nobody celery -A api worker -Q priority.low --loglevel=WARNING > /app/data/logs/celery.log 2>&1 &
42+
nohup su-exec nobody celery -A api worker -Q priority.middle --loglevel=WARNING > /app/data/logs/celery.log 2>&1 &
43+
nohup su-exec nobody celery -A api worker -Q priority.high --loglevel=WARNING > /app/data/logs/celery.log 2>&1 &
4144
# smart shutdown on SIGTERM (SIGINT is handled by gunicorn)
4245
function on_exit() {
4346
GUNICORN_PID=$(cat /tmp/gunicorn.pid)

rootfs/bin/test-unit

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,22 @@ function start_nsq() {
1212
cd -
1313
}
1414

15+
function start_redis() {
16+
cd /tmp
17+
nohup redis-server > /var/log/redis.log 2>&1 &
18+
cd -
19+
}
20+
21+
function start_celery() {
22+
cd /tmp
23+
nohup su-exec nobody celery -A api worker -Q priority.low,priority.middle,priority.high --loglevel=info > /var/log/celery.log 2>&1 &
24+
cd -
25+
}
26+
1527
su-exec postgres pg_ctl -D "$PGDATA" start
1628
start_nsq
29+
start_redis
30+
start_celery
1731
python3 manage.py check
18-
coverage run manage.py test --settings=api.settings.testing --noinput tasks.tests api scheduler.tests
32+
coverage run manage.py test --settings=api.settings.testing --noinput api scheduler.tests
1933
coverage report -m

rootfs/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ pyOpenSSL==19.1.0
2121
pytz==2020.1
2222
requests==2.24.0
2323
requests-toolbelt==0.9.1
24+
celery==5.0.2
25+
django_redis==4.12.1

0 commit comments

Comments
 (0)