Skip to content

Commit dded21e

Browse files
committed
Revert "ref(scheduler): split up k8s.py and make it more modular"
This reverts commit 218bfe25cbb6c1d80324a1be7919cf82e9c2e54c.
1 parent a3bcaab commit dded21e

12 files changed

Lines changed: 653 additions & 789 deletions

File tree

rootfs/api/models.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ class Meta:
163163
@property
164164
def _scheduler(self):
165165
mod = importlib.import_module(settings.SCHEDULER_MODULE)
166-
return mod.SchedulerClient()
166+
return mod.SchedulerClient(settings.SCHEDULER_URL,
167+
settings.SCHEDULER_AUTH,
168+
settings.SCHEDULER_OPTIONS)
167169

168170
def __str__(self):
169171
return self.id

rootfs/scheduler/__init__.py

Lines changed: 22 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,149 +1,34 @@
1-
import logging
21

3-
from .states import JobState
4-
from .abstract import AbstractSchedulerClient
5-
from .client import unhealthy
6-
from .replicationcontroller import ReplicationController
7-
from .pod import Pod
8-
from .service import Service
9-
from .namespace import Namespace
2+
class AbstractSchedulerClient(object):
3+
"""
4+
A generic interface to a scheduler backend.
5+
"""
106

11-
log = logging.getLogger(__name__)
12-
13-
14-
class KubeSchedulerClient(AbstractSchedulerClient):
15-
def deploy(self, name, image, command, **kwargs):
16-
log.debug('deploy %s, img %s, params %s, cmd "%s"', name, image, kwargs, command)
17-
app_name = kwargs.get('aname', {})
18-
app_type = name.split('.')[1]
19-
20-
controller = ReplicationController()
21-
old_rc = controller.old(app_name, app_type)
22-
new_rc = controller.create(name, image, command, **kwargs)
23-
if old_rc:
24-
desired = int(old_rc['spec']['replicas'])
25-
old_rc_name = old_rc['metadata']['name']
26-
else:
27-
desired = 1
28-
29-
new_rc_name = new_rc['metadata']['name']
30-
try:
31-
count = 1
32-
while desired >= count:
33-
new_rc = controller.scale(new_rc_name, count, app_name)
34-
if old_rc:
35-
old_rc = controller.scale(old_rc_name, desired-count, app_name)
36-
count += 1
37-
except Exception as e:
38-
controller.scale(new_rc['metadata']['name'], 0, app_name)
39-
controller.delete(new_rc['metadata']['name'], app_name)
40-
if old_rc:
41-
controller.scale(old_rc['metadata']['name'], desired, app_name)
42-
43-
raise RuntimeError('{} (deploy): {}'.format(name, e))
44-
45-
if old_rc:
46-
controller.delete(old_rc_name, app_name)
47-
48-
def scale(self, name, image, command, **kwargs):
49-
log.debug('scale %s, img %s, params %s, cmd "%s"', name, image, kwargs, command)
50-
app_name = kwargs.get('aname', {})
51-
rc_name = name.replace('.', '-').replace('_', '-')
52-
53-
controller = ReplicationController()
54-
# Create if ReplicationController doesn't exist
55-
if unhealthy(controller.status(rc_name, app_name)):
56-
self.create(name, image, command, **kwargs)
57-
return
58-
59-
name = name.replace('.', '-').replace('_', '-')
60-
num = kwargs.get('num', {})
61-
old_replicas = controller.get(name, app_name).json()['spec']['replicas']
62-
try:
63-
controller.scale(name, num, app_name)
64-
except Exception as e:
65-
controller.scale(name, old_replicas, app_name)
66-
raise RuntimeError('{} (Scale): {}'.format(name, e))
7+
def __init__(self, target, auth, options):
8+
self.target = target
9+
self.auth = auth
10+
self.options = options
6711

6812
def create(self, name, image, command, **kwargs):
69-
"""Create a container."""
70-
log.debug('create %s, img %s, params %s, cmd "%s"', name, image, kwargs, command)
71-
controller = ReplicationController()
72-
controller.create(name, image, command, **kwargs)
73-
app_type = name.split('.')[1]
74-
name = name.replace('.', '-').replace('_', '-')
75-
app_name = kwargs.get('aname', {})
76-
try:
77-
Service().create(name, app_name, app_type)
78-
except:
79-
controller.scale(name, 0, app_name)
80-
controller.delete(name, app_name)
81-
raise
82-
83-
def start(self, name):
84-
"""Start a container."""
85-
pass
86-
87-
def stop(self, name):
88-
"""Stop a container."""
89-
pass
13+
"""Create a new container."""
14+
raise NotImplementedError
9015

9116
def destroy(self, name):
92-
"""Destroy a application by deleting its namespace."""
93-
log.debug('destroy %s', name)
94-
namespace = Namespace()
95-
response = namespace.get(name, check=False)
96-
if response.status_code == 404:
97-
log.warn('delete Namespace "%s": not found', name)
98-
else:
99-
namespace.delete(name)
100-
101-
def logs(self, name):
102-
"""Aggregate logs from all pods in the namespace"""
103-
log.debug("logs %s", name)
104-
app_name = name.split('_')[0]
105-
name = name.replace('.', '-').replace('_', '-')
106-
107-
pod = Pod()
108-
items = pod.all(app_name).json()['items']
109-
log_data = ''
110-
for data in items:
111-
if name in data['metadata']['generateName'] and data['status']['phase'] == 'Running':
112-
log_data += pod.log(pod['metadata']['name'], app_name).text
113-
114-
return log_data
17+
"""Destroy a container."""
18+
raise NotImplementedError
11519

11620
def run(self, name, image, entrypoint, command):
11721
"""Run a one-off command."""
118-
log.debug('run %s, img %s, entypoint %s, cmd "%s"', name, image, entrypoint, command)
119-
return Pod().run(name, image, entrypoint, command)
120-
121-
def state(self, name):
122-
"""Display the state of a container."""
123-
# See "Pod Phase" at http://kubernetes.io/v1.1/docs/user-guide/pod-states.html
124-
phase_states = {
125-
'Pending': JobState.initialized,
126-
'Running': JobState.up,
127-
'Succeeded': JobState.down,
128-
'Failed': JobState.crashed,
129-
'Unknown': JobState.error,
130-
}
22+
raise NotImplementedError
13123

132-
try:
133-
app_name = name.split('_')[0]
134-
name = name.split('.')
135-
name = name[0] + '-' + name[1]
136-
name = name.replace('_', '-')
137-
138-
items = Pod().all(app_name).json()['items']
139-
for data in items:
140-
if data['metadata']['generateName'] == name + '-':
141-
phase = data['status']['phase']
142-
return phase_states[phase]
24+
def start(self, name):
25+
"""Start a container."""
26+
raise NotImplementedError
14327

144-
return JobState.destroyed
145-
except Exception as err:
146-
log.warn(err)
147-
return JobState.error
28+
def state(self, name):
29+
"""Display the given job's running state."""
30+
raise NotImplementedError
14831

149-
SchedulerClient = KubeSchedulerClient
32+
def stop(self, name):
33+
"""Stop a container."""
34+
raise NotImplementedError

rootfs/scheduler/abstract.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

rootfs/scheduler/base.py

Lines changed: 0 additions & 8 deletions
This file was deleted.

rootfs/scheduler/client.py

Lines changed: 0 additions & 130 deletions
This file was deleted.

rootfs/scheduler/events.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)