Skip to content

Commit ea3e8c6

Browse files
helgimboersma
authored andcommitted
ref(scheduler): split up k8s.py and make it more modular
Splits each resource type into its own class, creates a KubeHTTPClient that handles HTTP interactions and moves the k8s code up into __init__. Separation of responsibility in the code is stronger as the scheduler code the Deis API interfaces with doesn't touch HTTP and individual resource classes deal very little with it as well. Many HTTP interactions have been consolidated and by default return a RunTimeError Exception if a non-2xx code is returned unless the check=False flag is passed
1 parent 9fb9960 commit ea3e8c6

12 files changed

Lines changed: 789 additions & 653 deletions

File tree

rootfs/api/models.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,7 @@ class Meta:
164164
@property
165165
def _scheduler(self):
166166
mod = importlib.import_module(settings.SCHEDULER_MODULE)
167-
return mod.SchedulerClient(settings.SCHEDULER_URL,
168-
settings.SCHEDULER_AUTH,
169-
settings.SCHEDULER_OPTIONS)
167+
return mod.SchedulerClient()
170168

171169
def __str__(self):
172170
return self.id

rootfs/scheduler/__init__.py

Lines changed: 137 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,149 @@
1+
import logging
12

2-
class AbstractSchedulerClient(object):
3-
"""
4-
A generic interface to a scheduler backend.
5-
"""
3+
from .states import JobState
4+
from .abstract import AbstractSchedulerClient
5+
from .client import unhealthy
6+
from .replicationcontroller import ReplicationController
7+
from .pod import Pod
8+
from .service import Service
9+
from .namespace import Namespace
610

7-
def __init__(self, target, auth, options):
8-
self.target = target
9-
self.auth = auth
10-
self.options = options
11+
log = logging.getLogger(__name__)
12+
13+
14+
class KubeSchedulerClient(AbstractSchedulerClient):
15+
def deploy(self, name, image, command, **kwargs):
16+
log.debug('deploy %s, img %s, params %s, cmd "%s"', name, image, kwargs, command)
17+
app_name = kwargs.get('aname', {})
18+
app_type = name.split('.')[1]
19+
20+
controller = ReplicationController()
21+
old_rc = controller.old(app_name, app_type)
22+
new_rc = controller.create(name, image, command, **kwargs)
23+
if old_rc:
24+
desired = int(old_rc['spec']['replicas'])
25+
old_rc_name = old_rc['metadata']['name']
26+
else:
27+
desired = 1
28+
29+
new_rc_name = new_rc['metadata']['name']
30+
try:
31+
count = 1
32+
while desired >= count:
33+
new_rc = controller.scale(new_rc_name, count, app_name)
34+
if old_rc:
35+
old_rc = controller.scale(old_rc_name, desired-count, app_name)
36+
count += 1
37+
except Exception as e:
38+
controller.scale(new_rc['metadata']['name'], 0, app_name)
39+
controller.delete(new_rc['metadata']['name'], app_name)
40+
if old_rc:
41+
controller.scale(old_rc['metadata']['name'], desired, app_name)
42+
43+
raise RuntimeError('{} (deploy): {}'.format(name, e))
44+
45+
if old_rc:
46+
controller.delete(old_rc_name, app_name)
47+
48+
def scale(self, name, image, command, **kwargs):
49+
log.debug('scale %s, img %s, params %s, cmd "%s"', name, image, kwargs, command)
50+
app_name = kwargs.get('aname', {})
51+
rc_name = name.replace('.', '-').replace('_', '-')
52+
53+
controller = ReplicationController()
54+
# Create if ReplicationController doesn't exist
55+
if unhealthy(controller.status(rc_name, app_name)):
56+
self.create(name, image, command, **kwargs)
57+
return
58+
59+
name = name.replace('.', '-').replace('_', '-')
60+
num = kwargs.get('num', {})
61+
old_replicas = controller.get(name, app_name).json()['spec']['replicas']
62+
try:
63+
controller.scale(name, num, app_name)
64+
except Exception as e:
65+
controller.scale(name, old_replicas, app_name)
66+
raise RuntimeError('{} (Scale): {}'.format(name, e))
1167

1268
def create(self, name, image, command, **kwargs):
13-
"""Create a new container."""
14-
raise NotImplementedError
69+
"""Create a container."""
70+
log.debug('create %s, img %s, params %s, cmd "%s"', name, image, kwargs, command)
71+
controller = ReplicationController()
72+
controller.create(name, image, command, **kwargs)
73+
app_type = name.split('.')[1]
74+
name = name.replace('.', '-').replace('_', '-')
75+
app_name = kwargs.get('aname', {})
76+
try:
77+
Service().create(name, app_name, app_type)
78+
except:
79+
controller.scale(name, 0, app_name)
80+
controller.delete(name, app_name)
81+
raise
82+
83+
def start(self, name):
84+
"""Start a container."""
85+
pass
86+
87+
def stop(self, name):
88+
"""Stop a container."""
89+
pass
1590

1691
def destroy(self, name):
17-
"""Destroy a container."""
18-
raise NotImplementedError
92+
"""Destroy a application by deleting its namespace."""
93+
log.debug('destroy %s', name)
94+
namespace = Namespace()
95+
response = namespace.get(name, check=False)
96+
if response.status_code == 404:
97+
log.warn('delete Namespace "%s": not found', name)
98+
else:
99+
namespace.delete(name)
100+
101+
def logs(self, name):
102+
"""Aggregate logs from all pods in the namespace"""
103+
log.debug("logs %s", name)
104+
app_name = name.split('_')[0]
105+
name = name.replace('.', '-').replace('_', '-')
106+
107+
pod = Pod()
108+
items = pod.all(app_name).json()['items']
109+
log_data = ''
110+
for data in items:
111+
if name in data['metadata']['generateName'] and data['status']['phase'] == 'Running':
112+
log_data += pod.log(pod['metadata']['name'], app_name).text
113+
114+
return log_data
19115

20116
def run(self, name, image, entrypoint, command):
21117
"""Run a one-off command."""
22-
raise NotImplementedError
23-
24-
def start(self, name):
25-
"""Start a container."""
26-
raise NotImplementedError
118+
log.debug('run %s, img %s, entypoint %s, cmd "%s"', name, image, entrypoint, command)
119+
return Pod().run(name, image, entrypoint, command)
27120

28121
def state(self, name):
29-
"""Display the given job's running state."""
30-
raise NotImplementedError
122+
"""Display the state of a container."""
123+
# See "Pod Phase" at http://kubernetes.io/v1.1/docs/user-guide/pod-states.html
124+
phase_states = {
125+
'Pending': JobState.initialized,
126+
'Running': JobState.up,
127+
'Succeeded': JobState.down,
128+
'Failed': JobState.crashed,
129+
'Unknown': JobState.error,
130+
}
31131

32-
def stop(self, name):
33-
"""Stop a container."""
34-
raise NotImplementedError
132+
try:
133+
app_name = name.split('_')[0]
134+
name = name.split('.')
135+
name = name[0] + '-' + name[1]
136+
name = name.replace('_', '-')
137+
138+
items = Pod().all(app_name).json()['items']
139+
for data in items:
140+
if data['metadata']['generateName'] == name + '-':
141+
phase = data['status']['phase']
142+
return phase_states[phase]
143+
144+
return JobState.destroyed
145+
except Exception as err:
146+
log.warn(err)
147+
return JobState.error
148+
149+
SchedulerClient = KubeSchedulerClient

rootfs/scheduler/abstract.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
class AbstractSchedulerClient(object):
3+
"""
4+
A generic interface to a scheduler backend.
5+
"""
6+
7+
def create(self, name, image, command, **kwargs):
8+
"""Create a new container."""
9+
raise NotImplementedError
10+
11+
def destroy(self, name):
12+
"""Destroy a container."""
13+
raise NotImplementedError
14+
15+
def run(self, name, image, entrypoint, command):
16+
"""Run a one-off command."""
17+
raise NotImplementedError
18+
19+
def start(self, name):
20+
"""Start a container."""
21+
raise NotImplementedError
22+
23+
def state(self, name):
24+
"""Display the given job's running state."""
25+
raise NotImplementedError
26+
27+
def stop(self, name):
28+
"""Stop a container."""
29+
raise NotImplementedError

rootfs/scheduler/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.conf import settings
2+
from .client import KubeHTTPClient
3+
4+
5+
class Base(object):
6+
def __init__(self):
7+
self.registry = settings.REGISTRY_URL
8+
self.api = KubeHTTPClient(settings.SCHEDULER_URL)

rootfs/scheduler/client.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import requests
2+
import urlparse
3+
4+
5+
def error(response, errmsg, *args):
6+
# reversing since URL vs Messages are constructed the opposite way
7+
args = tuple(reversed(args))
8+
errmsg = errmsg.format(*args)
9+
errmsg = "failed to {}: {} {}\n{}".format(
10+
errmsg,
11+
response.status_code,
12+
response.reason,
13+
response.json()
14+
)
15+
raise RuntimeError(errmsg)
16+
17+
18+
def error_message(method, url):
19+
""" Get good error message based on URL and method used """
20+
errmsg = method
21+
count = url.count('{}')
22+
segments = url.strip('/').split('/')
23+
24+
# Format is /namespace/{}/<resource_type>/{}/<sub_resource>
25+
# Determine if resource type should be plural or singular
26+
resource_type = segments[3][0:-1] if count > 1 else segments[3]
27+
# Check if it is a sub resource
28+
if len(segments) > 4:
29+
errmsg += ' %s for' % segments.pop()
30+
31+
errmsg += ' %s "{}"' % resource_type.capitalize()
32+
33+
# Requesting a namespaced resource
34+
if count > 1:
35+
errmsg += ' in Namespace "{}"'
36+
37+
return errmsg
38+
39+
40+
def unhealthy(status_code):
41+
"""Status is considered unhealthy if it is not 2xx"""
42+
if not 200 <= status_code <= 299:
43+
return True
44+
45+
return False
46+
47+
48+
class KubeHTTPClient(object):
49+
version = 'v1'
50+
session = None
51+
52+
def __init__(self, target):
53+
self.target = target
54+
55+
# TODO singleton this if it makes sense
56+
if not self.session:
57+
self.connect()
58+
59+
def connect(self):
60+
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as token_file:
61+
token = token_file.read()
62+
63+
session = requests.Session()
64+
session.headers = {
65+
'Authorization': 'Bearer ' + token,
66+
'Content-Type': 'application/json',
67+
}
68+
session.verify = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
69+
self.session = session
70+
71+
def url(self, tmpl, *args):
72+
"""Return a fully-qualified Kubernetes API URL from a string template with args."""
73+
url = "/api/{}".format(self.version) + tmpl.format(*args)
74+
return urlparse.urljoin(self.target, url)
75+
76+
def get(self, url, *args, **kwargs):
77+
response = self.session.get(self.url(url, *args), **kwargs)
78+
check = kwargs.get('check', True) # Check response and fail
79+
if check and unhealthy(response.status_code):
80+
error(response, error_message('get', url), *args)
81+
82+
return response
83+
84+
def delete(self, url, *args, **kwargs):
85+
response = self.session.delete(self.url(url, *args), **kwargs)
86+
check = kwargs.get('check', True) # Check response and fail
87+
if check and unhealthy(response.status_code):
88+
error(response, error_message('delete', url), *args)
89+
90+
return response
91+
92+
def post(self, url, *args, **kwargs):
93+
response = self.session.post(self.url(url, *args), **kwargs)
94+
check = kwargs.get('check', True) # Check response and fail
95+
if check and unhealthy(response.status_code):
96+
error(response, error_message('create', url), *args)
97+
98+
return response
99+
100+
def put(self, url, *args, **kwargs):
101+
response = self.session.put(self.url(url, *args), **kwargs)
102+
check = kwargs.get('check', True) # Check response and fail
103+
if check and unhealthy(response.status_code):
104+
error(response, error_message('update', url), *args)
105+
106+
return response
107+
108+
def head(self, url, check=True, *args, **kwargs):
109+
response = self.session.head(self.url(url, *args), **kwargs)
110+
check = kwargs.get('check', True) # Check response and fail
111+
if check and unhealthy(response.status_code):
112+
error(response, error_message('head', url), *args)
113+
114+
return response
115+
116+
def options(self, url, *args, **kwargs):
117+
response = self.session.head(self.url(url, *args), **kwargs)
118+
check = kwargs.get('check', True) # Check response and fail
119+
if check and unhealthy(response.status_code):
120+
error(response, error_message('options', url), *args)
121+
122+
return response
123+
124+
def patch(self, url, *args, **kwargs):
125+
response = self.session.patch(self.url(url, *args), **kwargs)
126+
check = kwargs.get('check', True) # Check response and fail
127+
if check and unhealthy(response.status_code):
128+
error(response, error_message('patch', url), *args)
129+
130+
return response

rootfs/scheduler/events.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .base import Base
2+
3+
4+
class Events(Base):
5+
def get(self, namespace):
6+
return self.api.get('/namespaces/{}/events', namespace)

0 commit comments

Comments
 (0)