Skip to content

Commit be09088

Browse files
Keerthan Malamboersma
authored andcommitted
feat(k8s): integrate kubernetes scheduler with deis.
1 parent 09d053b commit be09088

14 files changed

Lines changed: 512 additions & 229 deletions

File tree

contrib/coreos/user-data.example

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ write_files:
159159
- path: /etc/deis-release
160160
content: |
161161
DEIS_RELEASE=v1.8.0
162+
- path: /etc/k8environment
163+
content: |
164+
K8S_VERSION=v0.21.0
162165
- path: /etc/motd
163166
content: " \e[31m* * \e[34m* \e[32m***** \e[39mddddd eeeeeee iiiiiii ssss\n\e[31m* * \e[34m* * \e[32m* * \e[39md d e e i s s\n \e[31m* * \e[34m***** \e[32m***** \e[39md d e i s\n\e[32m***** \e[31m* * \e[34m* \e[39md d e i s\n\e[32m* * \e[31m* * \e[34m* * \e[39md d eee i sss\n\e[32m***** \e[31m* * \e[34m***** \e[39md d e i s\n \e[34m* \e[32m***** \e[31m* * \e[39md d e i s\n \e[34m* * \e[32m* * \e[31m* * \e[39md d e e i s s\n\e[34m***** \e[32m***** \e[31m* * \e[39mddddd eeeeeee iiiiiii ssss\n\n\e[39mWelcome to Deis\t\t\tPowered by Core\e[38;5;45mO\e[38;5;206mS\e[39m\n"
164167
- path: /etc/profile.d/nse-function.sh
@@ -316,4 +319,38 @@ write_files:
316319
# removing the node from etcd
317320
NODE=$($ETCDCTL member list | grep `cat /etc/machine-id` | cut -d ':' -f 1)
318321
$ETCDCTL member remove $NODE
322+
- path: /opt/bin/download-k8s-binary
323+
permissions: '0755'
324+
content: |
325+
#!/bin/bash
326+
327+
. /etc/k8environment
328+
329+
mkdir -p /opt/bin
330+
331+
FILE=$1
332+
if [ ! -f /opt/bin/$FILE ]; then
333+
curl -sSL -o /opt/bin/$FILE https://storage.googleapis.com/kubernetes-release/release/${K8S_VERSION}/bin/linux/amd64/$FILE
334+
chmod +x /opt/bin/$FILE
335+
else
336+
# we check the version of the binary
337+
INSTALLED_VERSION=$(/opt/bin/$FILE --version)
338+
MATCH=$(echo "${INSTALLED_VERSION}" | grep -c "${K8S_VERSION}")
339+
if [ $MATCH -eq 0 ]; then
340+
# the version is different
341+
curl -sSL -o /opt/bin/$FILE https://storage.googleapis.com/kubernetes-release/release/${K8S_VERSION}/bin/linux/amd64/$FILE
342+
chmod +x /opt/bin/$FILE
343+
fi
344+
fi
345+
- path: /opt/bin/kube-serviceaccount
346+
permissions: '0755'
347+
content: |
348+
#!/bin/bash
349+
350+
if etcdctl get /kube-serviceaccount >/dev/null 2>&1; then
351+
etcdctl get /kube-serviceaccount > /opt/bin/kube-serviceaccount.key
352+
else
353+
/bin/openssl genrsa -out /opt/bin/kube-serviceaccount.key 2048 2>/dev/null
354+
etcdctl set /kube-serviceaccount < /opt/bin/kube-serviceaccount.key
355+
fi
319356
manage_etc_hosts: localhost

controller/api/models.py

Lines changed: 97 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,28 @@ def __str__(self):
179179
def url(self):
180180
return self.id + '.' + settings.DEIS_DOMAIN
181181

182+
def _get_job_id(self,container_type):
183+
app = self.id
184+
release = self.release_set.latest()
185+
version = "v{}".format(release.version)
186+
job_id = "{app}_{version}.{container_type}".format(**locals())
187+
return job_id
188+
189+
def _get_command(self,container_type):
190+
try:
191+
# if this is not procfile-based app, ensure they cannot break out
192+
# and run arbitrary commands on the host
193+
# FIXME: remove slugrunner's hardcoded entrypoint
194+
release = self.release_set.latest()
195+
if release.build.dockerfile or not release.build.sha:
196+
return "bash -c '{}'".format(release.build.procfile[container_type])
197+
else:
198+
return 'start {}'.format(container_type)
199+
# if the key is not present or if a parent attribute is None
200+
except (KeyError, TypeError, AttributeError):
201+
# handle special case for Dockerfile deployments
202+
return '' if container_type == 'cmd' else 'start {}'.format(container_type)
203+
182204
def log(self, message):
183205
"""Logs a message to the application's log file.
184206
@@ -242,6 +264,8 @@ def scale(self, user, structure): # noqa
242264
# iterate and scale by container type (web, worker, etc)
243265
changed = False
244266
to_add, to_remove = [], []
267+
scale_types = {}
268+
245269
# iterate on a copy of the container_type keys
246270
for container_type in requested_structure.keys():
247271
containers = list(self.container_set.filter(type=container_type).order_by('created'))
@@ -253,20 +277,11 @@ def scale(self, user, structure): # noqa
253277
if diff == 0:
254278
continue
255279
changed = True
280+
scale_types[container_type] = requested
256281
while diff < 0:
257282
c = containers.pop()
258-
if settings.SCHEDULER_MODULE == "scheduler.k8s" :
259-
if len(containers) == 0:
260-
t = Thread(target=c.scale)
261-
t.start()
262-
t.join()
263-
c.delete()
264-
else:
265-
to_remove.append(c)
283+
to_remove.append(c)
266284
diff += 1
267-
if diff == 0 and len(containers) != 0:
268-
c = containers.pop()
269-
self._start_containers([c])
270285
while diff > 0:
271286
# create a database record
272287
c = Container.objects.create(owner=self.owner,
@@ -277,14 +292,15 @@ def scale(self, user, structure): # noqa
277292
to_add.append(c)
278293
container_num += 1
279294
diff -= 1
295+
280296
if changed:
281-
if to_add:
282-
if settings.SCHEDULER_MODULE == "scheduler.k8s" :
283-
self._start_containers([to_add.pop()])
284-
else:
297+
if "scale" in dir(self._scheduler) :
298+
self._scale_containers(scale_types,to_remove)
299+
else :
300+
if to_add:
285301
self._start_containers(to_add)
286-
if to_remove:
287-
self._destroy_containers(to_remove)
302+
if to_remove:
303+
self._destroy_containers(to_remove)
288304
# save new structure to the database
289305
vals = self.container_set.exclude(type='run').values(
290306
'type').annotate(Count('pk')).order_by()
@@ -294,6 +310,31 @@ def scale(self, user, structure): # noqa
294310
self.save()
295311
return changed
296312

313+
def _scale_containers(self,scale_types,to_remove) :
314+
release = self.release_set.latest()
315+
for scale_type in scale_types :
316+
image = release.image
317+
version = "v{}".format(release.version)
318+
kwargs = {'memory': release.config.memory,
319+
'cpu': release.config.cpu,
320+
'tags': release.config.tags,
321+
'version': version,
322+
'aname': self.id,
323+
'num': scale_types[scale_type]}
324+
job_id = self._get_job_id(scale_type)
325+
command = self._get_command(scale_type)
326+
try:
327+
self._scheduler.scale(
328+
name=job_id,
329+
image=image,
330+
command=command,
331+
**kwargs)
332+
except Exception as e:
333+
err = '{} (scale): {}'.format(job_id, e)
334+
log_event(self, err, logging.ERROR)
335+
raise
336+
[c.delete() for c in to_remove]
337+
297338
def _start_containers(self, to_add):
298339
"""Creates and starts containers via the scheduler"""
299340
if not to_add:
@@ -302,15 +343,15 @@ def _start_containers(self, to_add):
302343
start_threads = [Thread(target=c.start) for c in to_add]
303344
[t.start() for t in create_threads]
304345
[t.join() for t in create_threads]
305-
if settings.SCHEDULER_MODULE != "scheduler.k8s" and any(c.state != 'created' for c in to_add):
306-
err = 'aborting, failed to create some containers '+c.state
346+
if any(c.state != 'created' for c in to_add):
347+
err = 'aborting, failed to create some containers'
307348
log_event(self, err, logging.ERROR)
308349
self._destroy_containers(to_add)
309350
raise RuntimeError(err)
310351
[t.start() for t in start_threads]
311352
[t.join() for t in start_threads]
312353
if set([c.state for c in to_add]) != set(['up']):
313-
err = 'warning, some containers failed to start'+c.state
354+
err = 'warning, some containers failed to start'
314355
log_event(self, err, logging.WARNING)
315356

316357
def _restart_containers(self, to_restart):
@@ -334,43 +375,64 @@ def _destroy_containers(self, to_destroy):
334375
"""Destroys containers via the scheduler"""
335376
if not to_destroy:
336377
return
337-
if settings.SCHEDULER_MODULE == "scheduler.k8s" :
338-
destroy_threads = [Thread(target=to_destroy[0].destroy)]
339-
else:
340-
destroy_threads = [Thread(target=c.destroy) for c in to_destroy]
378+
destroy_threads = [Thread(target=c.destroy) for c in to_destroy]
341379
[t.start() for t in destroy_threads]
342380
[t.join() for t in destroy_threads]
343-
if settings.SCHEDULER_MODULE == "scheduler.k8s" :
344-
[c.delete() for c in to_destroy]
345-
else :
346-
[c.delete() for c in to_destroy if c.state == 'destroyed']
381+
[c.delete() for c in to_destroy if c.state == 'destroyed']
347382
if any(c.state != 'destroyed' for c in to_destroy):
348-
err = 'aborting, failed to destroy some containers'+c.state
383+
err = 'aborting, failed to destroy some containers'
349384
log_event(self, err, logging.ERROR)
350385
raise RuntimeError(err)
351386

352387
def deploy(self, user, release):
353388
"""Deploy a new release to this application"""
354389
existing = self.container_set.exclude(type='run')
355390
new = []
391+
scale_types = set()
392+
old_name = ''
356393
for e in existing:
357394
n = e.clone(release)
358395
n.save()
359396
new.append(n)
397+
scale_types.add(e.type)
360398

361-
if new and settings.SCHEDULER_MODULE == "scheduler.k8s" :
362-
self._start_containers([new.pop()])
399+
if new and "deploy" in dir(self._scheduler) :
400+
self._deploy_app(scale_types,release,existing)
363401
else :
364402
self._start_containers(new)
365403

366-
# destroy old containers
367-
if existing:
368-
self._destroy_containers(existing)
404+
# destroy old containers
405+
if existing:
406+
self._destroy_containers(existing)
369407

370408
# perform default scaling if necessary
371409
if self.structure == {} and release.build is not None:
372410
self._default_scale(user, release)
373411

412+
def _deploy_app(self,scale_types,release,existing) :
413+
for scale_type in scale_types :
414+
image = release.image
415+
version = "v{}".format(release.version)
416+
kwargs = {'memory': release.config.memory,
417+
'cpu': release.config.cpu,
418+
'tags': release.config.tags,
419+
'aname': self.id,
420+
'num': 0,
421+
'version':version}
422+
job_id = self._get_job_id(scale_type)
423+
command = self._get_command(scale_type)
424+
try:
425+
self._scheduler.deploy(
426+
name=job_id,
427+
image=image,
428+
command=command,
429+
**kwargs)
430+
except Exception as e:
431+
err = '{} (deploy): {}'.format(job_id, e)
432+
log_event(self, err, logging.ERROR)
433+
raise
434+
[c.delete() for c in existing]
435+
374436
def _default_scale(self, user, release):
375437
"""Scale to default structure based on release type"""
376438
# if there is no SHA, assume a docker image is being promoted
@@ -504,9 +566,7 @@ def create(self):
504566
image = self.release.image
505567
kwargs = {'memory': self.release.config.memory,
506568
'cpu': self.release.config.cpu,
507-
'tags': self.release.config.tags,
508-
'aname': self.app.id,
509-
'num': self.num}
569+
'tags': self.release.config.tags}
510570
job_id = self._job_id
511571
try:
512572
self._scheduler.create(
@@ -519,20 +579,6 @@ def create(self):
519579
log_event(self.app, err, logging.ERROR)
520580
raise
521581

522-
@close_db_connections
523-
def scale(self):
524-
image = self.release.image
525-
job_id = self._job_id
526-
try:
527-
self._scheduler.scale(
528-
name=job_id,
529-
image=image,
530-
num=0)
531-
except Exception as e:
532-
err = '{} (scale): {}'.format(job_id, e)
533-
log_event(self.app, err, logging.ERROR)
534-
raise
535-
536582
@close_db_connections
537583
def start(self):
538584
job_id = self._job_id

0 commit comments

Comments
 (0)