Skip to content

Commit 48ac3ee

Browse files
author
Gabriel Monroy
committed
fix celery deadlocks by removing all blocking tasks from other (parent) tasks, and moving the logic to inline model methods
i was able to confirm this works by setting a autoscale pool of 10,1 and watching it scale up/down when handling a large number of tasks.
1 parent 252c127 commit 48ac3ee

3 files changed

Lines changed: 23 additions & 52 deletions

File tree

api/models.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,33 @@ def flat(self):
185185
'nodes': self.nodes}
186186

187187
def build(self):
188-
tasks.build_formation.delay(self).wait()
188+
return
189189

190190
def destroy(self, *args, **kwargs):
191-
tasks.destroy_formation.delay(self).wait()
191+
app_tasks = [tasks.destroy_app.si(a) for a in self.app_set.all()]
192+
node_tasks = [tasks.destroy_node.si(n) for n in self.node_set.all()]
193+
layer_tasks = [tasks.destroy_layer.si(l) for l in self.layer_set.all()]
194+
group(app_tasks + node_tasks).apply_async().join()
195+
group(layer_tasks).apply_async().join()
196+
CM.purge_formation(self.flat())
197+
self.delete()
198+
tasks.converge_controller.apply_async().wait()
192199

193200
def publish(self):
194201
data = self.calculate()
195202
CM.publish_formation(self.flat(), data)
196203
return data
197204

198-
def converge(self, **kwargs):
205+
def converge(self, controller=False, **kwargs):
199206
databag = self.publish()
200-
tasks.converge_formation.delay(self).wait()
207+
nodes = self.node_set.all()
208+
subtasks = []
209+
for n in nodes:
210+
subtask = tasks.converge_node.si(n)
211+
subtasks.append(subtask)
212+
if controller is True:
213+
subtasks.append(tasks.converge_controller.si())
214+
group(*subtasks).apply_async().join()
201215
return databag
202216

203217
def calculate(self):
@@ -459,10 +473,11 @@ def build(self):
459473
Release.objects.create(
460474
version=1, owner=self.owner, app=self, config=config, build=build)
461475
self.formation.publish()
462-
tasks.build_app.delay(self).wait()
463476

464477
def destroy(self):
465-
tasks.destroy_app.delay(self).wait()
478+
CM.purge_app(self.flat())
479+
self.delete()
480+
self.formation.publish()
466481

467482
def publish(self):
468483
"""Publish the application to configuration management"""

api/tasks.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import importlib
44

55
from celery import task
6-
from celery.canvas import group
76

87
from deis import settings
98
from provider import import_provider_module
@@ -58,44 +57,6 @@ def run_node(node, command):
5857
return output, rc
5958

6059

61-
@task
62-
def build_formation(formation):
63-
return
64-
65-
66-
@task
67-
def destroy_formation(formation):
68-
app_tasks = [destroy_app.si(a) for a in formation.app_set.all()]
69-
node_tasks = [destroy_node.si(n) for n in formation.node_set.all()]
70-
layer_tasks = [destroy_layer.si(l) for l in formation.layer_set.all()]
71-
group(app_tasks + node_tasks).apply_async().join()
72-
group(layer_tasks).apply_async().join()
73-
CM.purge_formation(formation.flat())
74-
formation.delete()
75-
76-
77-
@task
78-
def converge_formation(formation):
79-
nodes = formation.node_set.all()
80-
subtasks = []
81-
for n in nodes:
82-
subtask = converge_node.si(n)
83-
subtasks.append(subtask)
84-
group(*subtasks).apply_async().join()
85-
86-
87-
@task
88-
def build_app(app):
89-
return
90-
91-
92-
@task
93-
def destroy_app(app):
94-
CM.purge_app(app.flat())
95-
app.delete()
96-
app.formation.publish()
97-
98-
9960
@task
10061
def converge_controller():
10162
CM.converge_controller()

api/views.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import json
88

99
from Crypto.PublicKey import RSA
10-
from celery.canvas import group
1110
from django.contrib.auth.models import AnonymousUser, User
1211
from django.utils import timezone
1312
from rest_framework import permissions, status, viewsets
@@ -18,7 +17,6 @@
1817

1918
from api import models
2019
from api import serializers
21-
from api import tasks
2220

2321

2422
class AnonymousAuthentication(BaseAuthentication):
@@ -200,7 +198,6 @@ def converge(self, request, **kwargs):
200198
def destroy(self, request, **kwargs):
201199
formation = self.get_object()
202200
formation.destroy()
203-
tasks.converge_controller.delay().wait()
204201
return Response(status=status.HTTP_204_NO_CONTENT)
205202

206203

@@ -299,8 +296,7 @@ class AppViewSet(OwnerViewSet):
299296
def post_save(self, app, created=False, **kwargs):
300297
if created:
301298
app.build()
302-
group(*[tasks.converge_formation.si(app.formation), # @UndefinedVariable
303-
tasks.converge_controller.si()]).apply_async().join() # @UndefinedVariable
299+
app.formation.converge(controller=True)
304300

305301
def pre_save(self, app, created=False, **kwargs):
306302
if not app.pk and not app.formation.domain and app.formation.app_set.count() > 0:
@@ -372,8 +368,7 @@ def run(self, request, **kwargs):
372368
def destroy(self, request, **kwargs):
373369
app = self.get_object()
374370
app.destroy()
375-
group(*[tasks.converge_formation.si(app.formation), # @UndefinedVariable
376-
tasks.converge_controller.si()]).apply_async().join() # @UndefinedVariable
371+
app.formation.converge(controller=True)
377372
return Response(status=status.HTTP_204_NO_CONTENT)
378373

379374

0 commit comments

Comments
 (0)