Skip to content

Commit 64459a6

Browse files
Gabriel MonroyMatthew Fisher
authored andcommitted
refactor(scheduler): coreos scheduler implementation
Note an SSH wrapper is used to get around fleetctl ssh-agent requirement. We will ultimately want to transition to an HTTP API for talking to fleet. Until then, we should explore modifying fleetctl to remove the ssh-agent requirement for tunneling. add cluster setup and teardown announce to /deis/services/{container_name} with a sidekick container - add pull before start - add stop and remove in ExecStop - use hardcoded expose port 5000 for deis/slugrunner parallelize scheduler tasks, add appropriate blocking exclude scheduler from max-line-length add sidekick container for logging, unit cleanup add unit name regex, use it when templating units add domain and options to SchedulerClient fix destroy bug update readme to reflect new scheduler workflow, with placeholders for not-yet-implemented features first pass at systemd units for components coreos contrib scripts, units and user-data helpers
1 parent c9db8b7 commit 64459a6

10 files changed

Lines changed: 423 additions & 45 deletions

File tree

Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ RUN wget -qO- https://raw.github.com/pypa/pip/1.5.4/contrib/get-pip.py | python
1515
# install requirements before ADD to cache layer and speed build
1616
RUN pip install boto==2.23.0 celery==3.1.8 Django==1.6.2 django-allauth==0.15.0 django-guardian==1.1.1 django-json-field==0.5.5 django-yamlfield==0.5 djangorestframework==2.3.12 dop==0.1.4 gevent==1.0 gunicorn==18.0 paramiko==1.12.1 psycopg2==2.5.2 pycrypto==2.6.1 python-etcd==0.3.0 pyrax==1.6.2 PyYAML==3.10 redis==2.8.0 static==1.0.2 South==0.8.4
1717

18+
# install openssh-client for temporary fleetctl wrapper
19+
RUN apt-get install -yq openssh-client
20+
1821
# clone the project into /app
1922
ADD . /app
2023

api/models.py

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ class Cluster(UuidAuditedModel):
6060
Cluster used to run jobs
6161
"""
6262

63-
CLUSTER_TYPES = (('mock', 'Mock Cluster'),)
63+
CLUSTER_TYPES = (('mock', 'Mock Cluster'),
64+
('coreos', 'CoreOS Cluster'))
6465

6566
owner = models.ForeignKey(settings.AUTH_USER_MODEL)
6667
id = models.CharField(max_length=128, unique=True)
@@ -74,6 +75,26 @@ class Cluster(UuidAuditedModel):
7475
def __str__(self):
7576
return self.id
7677

78+
def _get_scheduler(self, *args, **kwargs):
79+
module_name = 'scheduler.' + self.type
80+
mod = importlib.import_module(module_name)
81+
return mod.SchedulerClient(self.id, self.hosts, self.auth,
82+
self.domain, self.options)
83+
84+
_scheduler = property(_get_scheduler)
85+
86+
def create(self):
87+
"""
88+
Initialize a cluster's router and log aggregator
89+
"""
90+
return tasks.create_cluster.delay(self).get()
91+
92+
def destroy(self):
93+
"""
94+
Destroy a cluster's router and log aggregator
95+
"""
96+
return tasks.destroy_cluster.delay(self).get()
97+
7798

7899
@python_2_unicode_compatible
79100
class App(UuidAuditedModel):
@@ -92,18 +113,17 @@ class Meta:
92113
def __str__(self):
93114
return self.id
94115

95-
def init(self, *args, **kwargs):
116+
def create(self, *args, **kwargs):
96117
config = Config.objects.create(owner=self.owner, app=self, values={})
97118
build = Build.objects.create(owner=self.owner, app=self, image=settings.DEFAULT_BUILD)
98119
Release.objects.create(version=1, owner=self.owner, app=self, config=config, build=build)
99120

100-
def delete(self, *args, **kwargs):
121+
def destroy(self, *args, **kwargs):
101122
for c in self.container_set.all():
102123
c.destroy()
103-
super(App, self).delete(*args, **kwargs)
104124

105125
def deploy(self, release):
106-
return tasks.deploy_release.delay(self, release)
126+
return tasks.deploy_release.delay(self, release).get()
107127

108128
def scale(self, **kwargs):
109129
"""Scale containers up or down to match requested."""
@@ -200,67 +220,67 @@ class Meta:
200220
get_latest_by = '-created'
201221
ordering = ['created']
202222

203-
def _scheduler(self, *args, **kwargs):
204-
module_name = 'scheduler.' + self.app.cluster.type
205-
mod = importlib.import_module(module_name)
206-
return mod.SchedulerClient(self.app.cluster.id,
207-
self.app.cluster.hosts,
208-
self.app.cluster.auth)
209-
210-
def job_id(self):
223+
def _get_job_id(self):
211224
app = self.app.id
212225
release = self.release
213226
version = "v{}".format(release.version)
214227
num = self.num
215228
c_type = self.type
216229
if not c_type:
217-
job_id = "{app}.{version}.{num}".format(**locals())
230+
job_id = "{app}_{version}.{num}".format(**locals())
218231
else:
219-
job_id = "{app}_{c_type}.{version}.{num}".format(**locals())
232+
job_id = "{app}_{version}.{c_type}.{num}".format(**locals())
220233
return job_id
221234

235+
_job_id = property(_get_job_id)
236+
237+
def _get_scheduler(self):
238+
return self.app.cluster._scheduler
239+
240+
_scheduler = property(_get_scheduler)
241+
222242
def create(self):
223243
image = self.release.build.image
224-
self._scheduler().create(self.job_id(), image, 'docker run {image}'.format(**locals()))
244+
self._scheduler.create(self._job_id, image, 'docker run {image}'.format(**locals()))
225245
self.state = 'created'
226246
self.save()
227247

228248
def start(self):
229249
self.state = 'starting'
230250
self.save()
231-
self._scheduler().start(self.job_id())
251+
self._scheduler.start(self._job_id)
232252
self.state = 'up'
233253
self.save()
234254

235255
def deploy(self, release):
236-
old_job_id = self.job_id()
256+
old_job_id = self._job_id
237257
self.state = 'deploying'
238258
self.save()
239259
# update release
240260
self.release = release
241261
self.save()
242262
# deploy new container
243-
new_job_id = self.job_id()
263+
new_job_id = self._job_id
244264
image = self.release.build.image
245-
self._scheduler().create(new_job_id, image, 'docker run {image}'.format(**locals()))
246-
self._scheduler().start(new_job_id)
265+
self._scheduler.create(new_job_id, image, 'docker run {image}'.format(**locals()))
266+
self._scheduler.start(new_job_id)
247267
# destroy old container
248-
self._scheduler().destroy(old_job_id)
268+
self._scheduler.destroy(old_job_id)
249269
self.state = 'up'
250270
self.save()
251271

252272
def stop(self):
253273
self.state = 'stopping'
254274
self.save()
255-
self._scheduler().stop(self.job_id())
275+
self._scheduler.stop(self._job_id)
256276
self.state = 'stopped'
257277
self.save()
258278

259279
def destroy(self):
260280
self.state = 'destroying'
261281
self.save()
262282
# TODO: add check for active connections before killing
263-
self._scheduler().destroy(self.job_id())
283+
self._scheduler.destroy(self._job_id)
264284
self.state = 'destroyed'
265285
self.save()
266286

api/tasks.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,72 @@
77

88
from __future__ import unicode_literals
99

10+
import threading
11+
1012
from celery import task
1113

1214

15+
@task
16+
def create_cluster(cluster):
17+
cluster._scheduler.setUp()
18+
19+
20+
@task
21+
def destroy_cluster(cluster):
22+
for app in cluster.app_set.all():
23+
app.destroy()
24+
cluster._scheduler.tearDown()
25+
26+
1327
@task
1428
def deploy_release(app, release):
1529
containers = app.container_set.all()
16-
# TODO: parallelize
30+
threads = []
1731
for c in containers:
18-
try:
19-
c.deploy(release)
20-
except Exception:
32+
threads.append(threading.Thread(target=c.deploy, args=(release,)))
33+
try:
34+
[t.start() for t in threads]
35+
[t.join() for t in threads]
36+
except Exception:
37+
for c in containers:
2138
c.state = 'error'
2239
c.save()
23-
raise
40+
raise
2441

2542

2643
@task
2744
def start_containers(containers):
28-
# TODO: parallelize
45+
create_threads = []
46+
start_threads = []
2947
for c in containers:
30-
try:
31-
c.create()
32-
c.start()
33-
except Exception:
48+
create_threads.append(threading.Thread(target=c.create))
49+
start_threads.append(threading.Thread(target=c.start))
50+
try:
51+
[t.start() for t in create_threads]
52+
[t.join() for t in create_threads]
53+
[t.start() for t in start_threads]
54+
[t.join() for t in start_threads]
55+
except Exception:
56+
for c in containers:
3457
c.state = 'error'
3558
c.save()
3659
raise
3760

3861

3962
@task
4063
def stop_containers(containers):
41-
# TODO: parallelize
64+
destroy_threads = []
65+
delete_threads = []
4266
for c in containers:
43-
try:
44-
c.destroy()
45-
c.delete()
46-
except Exception:
67+
destroy_threads.append(threading.Thread(target=c.destroy))
68+
delete_threads.append(threading.Thread(target=c.delete))
69+
try:
70+
[t.start() for t in destroy_threads]
71+
[t.join() for t in destroy_threads]
72+
[t.start() for t in delete_threads]
73+
[t.join() for t in delete_threads]
74+
except Exception:
75+
for c in containers:
4776
c.state = 'error'
4877
c.save()
4978
raise

api/tests/test_container.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
import json
1010

1111
from django.contrib.auth.models import User
12-
from django.test import TestCase
12+
from django.test import TransactionTestCase
1313
from django.test.utils import override_settings
1414

1515
from api.models import Container, App
1616

1717

1818
@override_settings(CELERY_ALWAYS_EAGER=True)
19-
class ContainerTest(TestCase):
19+
class ContainerTest(TransactionTestCase):
2020

2121
"""Tests creation of containers on nodes"""
2222

api/views.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ def pre_save(self, obj):
194194
if not hasattr(obj, 'owner'):
195195
obj.owner = self.request.user
196196

197+
def post_save(self, cluster, created=False, **kwargs):
198+
if created:
199+
cluster.create()
200+
201+
def pre_delete(self, cluster):
202+
cluster.destroy()
203+
197204

198205
class AppPermsViewSet(viewsets.ViewSet):
199206
"""RESTful views for sharing apps with collaborators."""
@@ -272,7 +279,7 @@ def get_queryset(self, **kwargs):
272279

273280
def post_save(self, app, created=False, **kwargs):
274281
if created:
275-
app.init()
282+
app.create()
276283

277284
def scale(self, request, **kwargs):
278285
new_structure = {}

0 commit comments

Comments
 (0)