Skip to content

Commit d377483

Browse files
committed
Merge pull request #89 from helgi/domains_k8s
ref(models): add domains to k8s data for router to use
2 parents c2a3de7 + a796bb6 commit d377483

6 files changed

Lines changed: 300 additions & 35 deletions

File tree

rootfs/api/models.py

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import logging
1313
import re
1414
import time
15+
import json
1516
from threading import Thread
1617

1718
from django.conf import settings
@@ -30,7 +31,7 @@
3031

3132
from api import fields, utils, exceptions
3233
from registry import publish_release
33-
from utils import dict_diff, fingerprint
34+
from utils import dict_diff, dict_merge, fingerprint
3435

3536

3637
logger = logging.getLogger(__name__)
@@ -325,7 +326,13 @@ def _scale_containers(self, scale_types, to_remove):
325326
name=job_id,
326327
image=image,
327328
command=command,
328-
**kwargs)
329+
**kwargs
330+
)
331+
332+
# Attach the platform specific application sub domain
333+
# scheduler.scale creates the required service on apps:create
334+
if not Domain.objects.filter(owner=self.owner, app=self, domain=self).exists():
335+
Domain(owner=self.owner, app=self, domain=str(self)).save()
329336
except Exception as e:
330337
err = '{} (scale): {}'.format(job_id, e)
331338
log_event(self, err, logging.ERROR)
@@ -614,7 +621,8 @@ def create(self):
614621
name=self.job_id,
615622
image=image,
616623
command=self._command,
617-
**kwargs)
624+
**kwargs
625+
)
618626
except Exception as e:
619627
err = '{} (create): {}'.format(self.job_id, e)
620628
log_event(self.app, err, logging.ERROR)
@@ -965,6 +973,69 @@ class Domain(AuditedModel):
965973
app = models.ForeignKey('App')
966974
domain = models.TextField(blank=False, null=False, unique=True)
967975

976+
@property
977+
def _scheduler(self):
978+
mod = importlib.import_module(settings.SCHEDULER_MODULE)
979+
return mod.SchedulerClient(settings.SCHEDULER_URL,
980+
settings.SCHEDULER_AUTH,
981+
settings.SCHEDULER_OPTIONS)
982+
983+
def _load_service_config(self, app):
984+
# Get the service from k8s to attach the domain correctly
985+
svc = self._scheduler._get_service(app, app).json()
986+
# Get minimum structure going if it is missing on the service
987+
if 'metadata' not in svc or 'annotations' not in svc['metadata']:
988+
default = {'metadata': {'annotations': {}}}
989+
svc = dict_merge(svc, default)
990+
991+
# Check if any config has been set
992+
if 'deis.io/routerConfig' not in svc['metadata']['annotations']:
993+
config = {}
994+
else:
995+
config = json.loads(svc['metadata']['annotations']['deis.io/routerConfig'])
996+
997+
# See if domains are available
998+
if 'domains' not in config:
999+
config['domains'] = []
1000+
1001+
return svc, config
1002+
1003+
def save(self, *args, **kwargs):
1004+
app = str(self.app)
1005+
domain = str(self.domain)
1006+
1007+
# setup the service and config dict
1008+
svc, config = self._load_service_config(app)
1009+
if domain not in config['domains']:
1010+
config['domains'].append(domain)
1011+
1012+
# save as a JSON string since annotations don't take a structure on its keys
1013+
svc['metadata']['annotations']['deis.io/routerConfig'] = json.dumps(config)
1014+
1015+
# Update the k8s service for the application with new domain information
1016+
self._scheduler._update_service(app, app, svc)
1017+
1018+
# Save to DB
1019+
return super(Domain, self).save(*args, **kwargs)
1020+
1021+
def delete(self, *args, **kwargs):
1022+
app = str(self.app)
1023+
domain = str(self.domain)
1024+
1025+
# setup the service and config dict
1026+
svc, config = self._load_service_config(app)
1027+
if domain in config['domains']:
1028+
config['domains'].remove(domain)
1029+
1030+
# save as a JSON string since annotations don't take a structure on its keys
1031+
svc['metadata']['annotations']['deis.io/routerConfig'] = json.dumps(config)
1032+
1033+
# Update the k8s service for the application with new domain information
1034+
self._scheduler._update_service(app, app, svc)
1035+
1036+
# Delete from DB
1037+
return super(Domain, self).delete(*args, **kwargs)
1038+
9681039
def __str__(self):
9691040
return self.domain
9701041

@@ -1154,21 +1225,6 @@ def _etcd_purge_config(**kwargs):
11541225
except KeyError:
11551226
pass
11561227

1157-
1158-
def _etcd_publish_domains(**kwargs):
1159-
domain = kwargs['instance']
1160-
_etcd_client.write('/deis/domains/{}'.format(domain), domain.app)
1161-
1162-
1163-
def _etcd_purge_domains(**kwargs):
1164-
domain = kwargs['instance']
1165-
try:
1166-
_etcd_client.delete('/deis/domains/{}'.format(domain),
1167-
prevExist=True, dir=True, recursive=True)
1168-
except KeyError:
1169-
pass
1170-
1171-
11721228
# Log significant app-related events
11731229
post_save.connect(_log_build_created, sender=Build, dispatch_uid='api.models.log')
11741230
post_save.connect(_log_release_created, sender=Release, dispatch_uid='api.models.log')
@@ -1193,8 +1249,6 @@ def create_auth_token(sender, instance=None, created=False, **kwargs):
11931249
post_save.connect(_etcd_publish_key, sender=Key, dispatch_uid='api.models')
11941250
post_delete.connect(_etcd_purge_key, sender=Key, dispatch_uid='api.models')
11951251
post_delete.connect(_etcd_purge_user, sender=get_user_model(), dispatch_uid='api.models')
1196-
post_save.connect(_etcd_publish_domains, sender=Domain, dispatch_uid='api.models')
1197-
post_delete.connect(_etcd_purge_domains, sender=Domain, dispatch_uid='api.models')
11981252
post_save.connect(_etcd_publish_app, sender=App, dispatch_uid='api.models')
11991253
post_delete.connect(_etcd_purge_app, sender=App, dispatch_uid='api.models')
12001254
post_save.connect(_etcd_publish_cert, sender=Certificate, dispatch_uid='api.models')

rootfs/api/tests/test_utils.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import unittest
2+
from api import utils
3+
4+
5+
class TestUtils(unittest.TestCase):
6+
"""Test utils functions"""
7+
8+
def test_dict_merge_simple(self):
9+
a = {'key': 'value'}
10+
b = {'key': 'value'}
11+
12+
c = utils.dict_merge(a, b)
13+
assert c == {'key': 'value'}
14+
15+
a = {'key': 'value'}
16+
b = {'key2': 'value'}
17+
18+
c = utils.dict_merge(a, b)
19+
assert c == {'key': 'value', 'key2': 'value'}
20+
21+
def test_dict_merge_deeper(self):
22+
a = {'key': 'value', 'here': {'without': 'you'}}
23+
b = {'this': 'that', 'here': {'with': 'me'}, 'other': {'magic', 'unicorn'}}
24+
25+
c = utils.dict_merge(a, b)
26+
assert c == {
27+
'key': 'value',
28+
'this': 'that',
29+
'here': {
30+
'with': 'me',
31+
'without': 'you'
32+
},
33+
'other': {'magic', 'unicorn'}
34+
}
35+
36+
def test_dict_merge_even_deeper(self):
37+
a = {
38+
'key': 'value',
39+
'here': {'without': 'you'},
40+
'other': {'scrubs': {'char3': 'Cox'}}
41+
42+
}
43+
44+
b = {
45+
'this': 'that',
46+
'here': {'with': 'me'},
47+
'other': {'magic': 'unicorn', 'scrubs': {'char1': 'JD', 'char2': 'Turk'}}
48+
}
49+
50+
c = utils.dict_merge(a, b)
51+
assert c == {
52+
'key': 'value',
53+
'this': 'that',
54+
'here': {'with': 'me', 'without': 'you'},
55+
'other': {
56+
'magic': 'unicorn',
57+
'scrubs': {
58+
'char1': 'JD',
59+
'char2': 'Turk',
60+
'char3': 'Cox'
61+
}
62+
}
63+
}
64+
65+
def test_dict_merge_with_list(self):
66+
a = {'key': 'value', 'names': ['bob', 'kyle', 'kenny', 'jimbo']}
67+
b = {'key': 'value', 'names': ['kenny', 'cartman', 'stan']}
68+
69+
c = utils.dict_merge(a, b)
70+
assert c == {'key': 'value', 'names': ['bob', 'kyle', 'kenny', 'jimbo', 'cartman', 'stan']}
71+
72+
def test_dict_merge_bad_merge(self):
73+
"""Returns b because it isn't a dict"""
74+
a = {'key': 'value'}
75+
b = 'duh'
76+
77+
c = utils.dict_merge(a, b)
78+
assert c == b

rootfs/api/utils.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""
22
Helper functions used by the Deis server.
33
"""
4+
from __future__ import unicode_literals
45
import base64
56
import hashlib
67
import random
8+
from copy import deepcopy
79

810

911
def generate_app_name():
@@ -116,6 +118,57 @@ def encode(obj):
116118
return obj
117119

118120

121+
def dict_merge(origin, merge):
122+
"""
123+
Recursively merges dict's. not just simple a["key"] = b["key"], if
124+
both a and b have a key who's value is a dict then dict_merge is called
125+
on both values and the result stored in the returned dictionary.
126+
Also handles merging lists if they occur within the dict
127+
"""
128+
if not isinstance(merge, dict):
129+
return merge
130+
131+
result = deepcopy(origin)
132+
for key, value in merge.iteritems():
133+
if key in result and isinstance(result[key], dict):
134+
result[key] = dict_merge(result[key], value)
135+
else:
136+
if isinstance(value, list):
137+
if key not in result:
138+
result[key] = value
139+
else:
140+
# merge lists without leaving potential duplicates
141+
# result[key] = list(set(result[key] + value)) # changes the order as well
142+
for item in value:
143+
if item in result[key]:
144+
continue
145+
146+
result[key].append(item)
147+
else:
148+
result[key] = deepcopy(value)
149+
return result
150+
151+
152+
def flatten(collection):
153+
"""
154+
Flatten an arbitrarily deep structure of dicts, lists and tuples and
155+
extract the strings at the leaves of the structures.
156+
"""
157+
vals = []
158+
if isinstance(collection, basestring):
159+
vals.append(collection)
160+
elif isinstance(collection, (list, tuple)):
161+
for item in collection:
162+
vals.extend(flatten(item))
163+
elif isinstance(collection, dict):
164+
vals.extend(flatten(collection.values()))
165+
else:
166+
raise Exception(
167+
"Can't extract values from a %s" % collection.__class__
168+
)
169+
return vals
170+
171+
119172
if __name__ == "__main__":
120173
import doctest
121174
doctest.testmod()

rootfs/scheduler/k8s.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from __future__ import unicode_literals
12
import json
23
import logging
34
import re
@@ -10,6 +11,8 @@
1011
from .states import JobState
1112
from . import AbstractSchedulerClient
1213
import requests
14+
from .utils import dict_merge
15+
1316

1417
logger = logging.getLogger(__name__)
1518

@@ -82,7 +85,8 @@
8285
"name": "$name",
8386
"labels": {
8487
"app": "$name"
85-
}
88+
},
89+
"annotations": {}
8690
},
8791
"spec": {
8892
"ports": [
@@ -392,25 +396,32 @@ def create(self, name, image, command, **kwargs):
392396
name = name.replace('.', '-').replace('_', '-')
393397
app_name = kwargs.get('aname', {})
394398
try:
395-
self._create_service(name, app_name, app_type)
399+
# Make sure the router knows what to do with this
400+
data = {}
401+
# TODO this should potentially be higher up in the flow
402+
# see http://docs.deis.io/en/latest/using_deis/process-types/#web-vs-cmd-process-types
403+
if app_type in ['web', 'cmd']:
404+
data = {'metadata': {'labels': {'routable': 'true'}}}
405+
406+
self._create_service(name, app_name, app_type, data)
396407
except:
397408
self._scale_app(name, 0, app_name)
398409
self._delete_rc(name, app_name)
399410
raise
400411

401412
def _get_service(self, name, namespace):
402413
url = self._api("/namespaces/{}/services/{}", namespace, name)
403-
resp = self.session.get(url)
404-
if unhealthy(resp.status_code):
405-
error(resp, 'get Service "{}" in Namespace "{}"', name, namespace)
414+
response = self.session.get(url)
415+
if unhealthy(response.status_code):
416+
error(response, 'get Service "{}" in Namespace "{}"', name, namespace)
406417

407-
return resp.status_code, resp.text, resp.reason
418+
return response
408419

409-
def _create_service(self, name, app_name, app_type):
420+
def _create_service(self, name, app_name, app_type, data={}):
410421
actual_pod = {}
411422
for _ in xrange(300):
412-
status, data, reason = self._get_pods(app_name)
413-
parsed_json = json.loads(data)
423+
status, json_data, reason = self._get_pods(app_name)
424+
parsed_json = json.loads(json_data)
414425
for pod in parsed_json['items']:
415426
if('generateName' in pod['metadata'] and
416427
pod['metadata']['generateName'] == name + '-'):
@@ -441,24 +452,29 @@ def _create_service(self, name, app_name, app_type):
441452
"name": app_name,
442453
}
443454

444-
template = string.Template(SERVICE_TEMPLATE).substitute(l)
455+
# Merge external data on to the prefined template
456+
template = json.loads(string.Template(SERVICE_TEMPLATE).substitute(l))
457+
data = dict_merge(template, data)
458+
445459
url = self._api("/namespaces/{}/services", app_name)
446-
resp = self.session.post(url, json=json.loads(template))
460+
resp = self.session.post(url, json=data)
447461
if resp.status_code == 409:
448-
status, data, reason = self._get_service(app_name, app_name)
449-
srv = json.loads(data)
462+
srv = self._get_service(app_name, app_name).json()
450463
if srv['spec']['selector']['type'] == 'web':
451464
return
452465

453466
srv['spec']['selector']['type'] = app_type
454467
srv['spec']['ports'][0]['targetPort'] = port
455-
url = self._api("/namespaces/{}/services/{}", app_name, app_name)
456-
resp2 = self.session.put(url, json=srv)
468+
resp2 = self._update_service(app_name, app_name, srv)
457469
if unhealthy(resp2.status_code):
458470
error(resp, 'update Service "{}" in Namespace "{}"', app_name, app_name)
459471
elif unhealthy(resp.status_code):
460472
error(resp, 'create Service "{}" in Namespace "{}"', app_name, app_name)
461473

474+
def _update_service(self, namespace, app, data):
475+
url = self._api("/namespaces/{}/services/{}", namespace, app)
476+
return self.session.put(url, json=data)
477+
462478
def start(self, name):
463479
"""Start a container."""
464480
pass

0 commit comments

Comments
 (0)