Skip to content

Commit 6add94a

Browse files
committed
Merge pull request #359 from opdemand/fix-celery-deadlock
Fix Celery deadlocks by removing all blocking tasks from parent tasks
2 parents 252c127 + 48ac3ee commit 6add94a

3 files changed

Lines changed: 23 additions & 52 deletions

File tree

api/models.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,33 @@ def flat(self):
185185
'nodes': self.nodes}
186186

187187
def build(self):
188-
tasks.build_formation.delay(self).wait()
188+
return
189189

190190
def destroy(self, *args, **kwargs):
191-
tasks.destroy_formation.delay(self).wait()
191+
app_tasks = [tasks.destroy_app.si(a) for a in self.app_set.all()]
192+
node_tasks = [tasks.destroy_node.si(n) for n in self.node_set.all()]
193+
layer_tasks = [tasks.destroy_layer.si(l) for l in self.layer_set.all()]
194+
group(app_tasks + node_tasks).apply_async().join()
195+
group(layer_tasks).apply_async().join()
196+
CM.purge_formation(self.flat())
197+
self.delete()
198+
tasks.converge_controller.apply_async().wait()
192199

193200
def publish(self):
194201
data = self.calculate()
195202
CM.publish_formation(self.flat(), data)
196203
return data
197204

198-
def converge(self, **kwargs):
205+
def converge(self, controller=False, **kwargs):
199206
databag = self.publish()
200-
tasks.converge_formation.delay(self).wait()
207+
nodes = self.node_set.all()
208+
subtasks = []
209+
for n in nodes:
210+
subtask = tasks.converge_node.si(n)
211+
subtasks.append(subtask)
212+
if controller is True:
213+
subtasks.append(tasks.converge_controller.si())
214+
group(*subtasks).apply_async().join()
201215
return databag
202216

203217
def calculate(self):
@@ -459,10 +473,11 @@ def build(self):
459473
Release.objects.create(
460474
version=1, owner=self.owner, app=self, config=config, build=build)
461475
self.formation.publish()
462-
tasks.build_app.delay(self).wait()
463476

464477
def destroy(self):
465-
tasks.destroy_app.delay(self).wait()
478+
CM.purge_app(self.flat())
479+
self.delete()
480+
self.formation.publish()
466481

467482
def publish(self):
468483
"""Publish the application to configuration management"""

api/tasks.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import importlib
44

55
from celery import task
6-
from celery.canvas import group
76

87
from deis import settings
98
from provider import import_provider_module
@@ -58,44 +57,6 @@ def run_node(node, command):
5857
return output, rc
5958

6059

61-
@task
62-
def build_formation(formation):
63-
return
64-
65-
66-
@task
67-
def destroy_formation(formation):
68-
app_tasks = [destroy_app.si(a) for a in formation.app_set.all()]
69-
node_tasks = [destroy_node.si(n) for n in formation.node_set.all()]
70-
layer_tasks = [destroy_layer.si(l) for l in formation.layer_set.all()]
71-
group(app_tasks + node_tasks).apply_async().join()
72-
group(layer_tasks).apply_async().join()
73-
CM.purge_formation(formation.flat())
74-
formation.delete()
75-
76-
77-
@task
78-
def converge_formation(formation):
79-
nodes = formation.node_set.all()
80-
subtasks = []
81-
for n in nodes:
82-
subtask = converge_node.si(n)
83-
subtasks.append(subtask)
84-
group(*subtasks).apply_async().join()
85-
86-
87-
@task
88-
def build_app(app):
89-
return
90-
91-
92-
@task
93-
def destroy_app(app):
94-
CM.purge_app(app.flat())
95-
app.delete()
96-
app.formation.publish()
97-
98-
9960
@task
10061
def converge_controller():
10162
CM.converge_controller()

api/views.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import json
88

99
from Crypto.PublicKey import RSA
10-
from celery.canvas import group
1110
from django.contrib.auth.models import AnonymousUser, User
1211
from django.utils import timezone
1312
from rest_framework import permissions, status, viewsets
@@ -18,7 +17,6 @@
1817

1918
from api import models
2019
from api import serializers
21-
from api import tasks
2220

2321

2422
class AnonymousAuthentication(BaseAuthentication):
@@ -200,7 +198,6 @@ def converge(self, request, **kwargs):
200198
def destroy(self, request, **kwargs):
201199
formation = self.get_object()
202200
formation.destroy()
203-
tasks.converge_controller.delay().wait()
204201
return Response(status=status.HTTP_204_NO_CONTENT)
205202

206203

@@ -299,8 +296,7 @@ class AppViewSet(OwnerViewSet):
299296
def post_save(self, app, created=False, **kwargs):
300297
if created:
301298
app.build()
302-
group(*[tasks.converge_formation.si(app.formation), # @UndefinedVariable
303-
tasks.converge_controller.si()]).apply_async().join() # @UndefinedVariable
299+
app.formation.converge(controller=True)
304300

305301
def pre_save(self, app, created=False, **kwargs):
306302
if not app.pk and not app.formation.domain and app.formation.app_set.count() > 0:
@@ -372,8 +368,7 @@ def run(self, request, **kwargs):
372368
def destroy(self, request, **kwargs):
373369
app = self.get_object()
374370
app.destroy()
375-
group(*[tasks.converge_formation.si(app.formation), # @UndefinedVariable
376-
tasks.converge_controller.si()]).apply_async().join() # @UndefinedVariable
371+
app.formation.converge(controller=True)
377372
return Response(status=status.HTTP_204_NO_CONTENT)
378373

379374

0 commit comments

Comments
 (0)