Skip to content

Commit 6676a4f

Browse files
committed
Merge pull request #4101 from mboersma/scheduler-superclass
ref(controller): share common scheduler code
2 parents b24dd75 + 92d898b commit 6676a4f

7 files changed

Lines changed: 131 additions & 183 deletions

File tree

controller/scheduler/__init__.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
2+
class AbstractSchedulerClient(object):
3+
"""
4+
A generic interface to a scheduler backend.
5+
"""
6+
7+
def __init__(self, target, auth, options, pkey):
8+
self.target = target
9+
self.auth = auth
10+
self.options = options
11+
self.pkey = pkey
12+
13+
def create(self, name, image, command, **kwargs):
14+
"""Create a new container."""
15+
raise NotImplementedError
16+
17+
def destroy(self, name):
18+
"""Destroy a container."""
19+
raise NotImplementedError
20+
21+
def run(self, name, image, entrypoint, command):
22+
"""Run a one-off command."""
23+
raise NotImplementedError
24+
25+
def start(self, name):
26+
"""Start a container."""
27+
raise NotImplementedError
28+
29+
def state(self, name):
30+
"""Display the given job's running state."""
31+
raise NotImplementedError
32+
33+
def stop(self, name):
34+
"""Stop a container."""
35+
raise NotImplementedError

controller/scheduler/chaos.py

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import random
2+
23
from .mock import MockSchedulerClient, jobs
34
from .states import JobState
45

@@ -12,52 +13,38 @@
1213
class ChaosSchedulerClient(MockSchedulerClient):
1314

1415
def create(self, name, image, command, **kwargs):
16+
"""Create a new container."""
1517
if random.random() < CREATE_ERROR_RATE:
16-
job = jobs.get(name, {})
17-
job.update({'state': JobState.error})
18-
jobs[name] = job
19-
return
20-
return super(ChaosSchedulerClient, self).create(name, image, command, **kwargs)
18+
jobs.setdefault(name, {})['state'] = JobState.error
19+
else:
20+
super(ChaosSchedulerClient, self).create(name, image, command, **kwargs)
2121

2222
def destroy(self, name):
23-
"""
24-
Destroy an existing job
25-
"""
23+
"""Destroy a container."""
2624
if random.random() < DESTROY_ERROR_RATE:
27-
job = jobs.get(name, {})
28-
job.update({'state': JobState.error})
29-
jobs[name] = job
30-
return
31-
return super(ChaosSchedulerClient, self).destroy(name)
25+
jobs.setdefault(name, {})['state'] = JobState.error
26+
else:
27+
super(ChaosSchedulerClient, self).destroy(name)
3228

3329
def run(self, name, image, entrypoint, command):
34-
"""
35-
Run a one-off command
36-
"""
30+
"""Run a one-off command."""
3731
if random.random() < CREATE_ERROR_RATE:
3832
raise RuntimeError('exit code 1')
39-
return super(ChaosSchedulerClient, self).run(name, image, entrypoint, command)
33+
else:
34+
super(ChaosSchedulerClient, self).run(name, image, entrypoint, command)
4035

4136
def start(self, name):
42-
"""
43-
Start an idle job
44-
"""
37+
"""Start a container."""
4538
if random.random() < START_ERROR_RATE:
46-
job = jobs.get(name, {})
47-
job.update({'state': JobState.crashed})
48-
jobs[name] = job
49-
return
50-
return super(ChaosSchedulerClient, self).start(name)
39+
jobs.setdefault(name, {})['state'] = JobState.crashed
40+
else:
41+
super(ChaosSchedulerClient, self).start(name)
5142

5243
def stop(self, name):
53-
"""
54-
Stop a running job
55-
"""
44+
"""Stop a container."""
5645
if random.random() < STOP_ERROR_RATE:
57-
job = jobs.get(name, {})
58-
job.update({'state': JobState.error})
59-
jobs[name] = job
60-
return
61-
return super(ChaosSchedulerClient, self).stop(name)
46+
jobs.setdefault(name, {})['state'] = JobState.crashed
47+
else:
48+
super(ChaosSchedulerClient, self).stop(name)
6249

6350
SchedulerClient = ChaosSchedulerClient

controller/scheduler/fleet.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
import cStringIO
21
import base64
32
import copy
3+
import cStringIO
44
import httplib
55
import json
66
import paramiko
7-
import socket
87
import re
8+
import socket
99
import time
1010

1111
from django.conf import settings
1212

13+
from . import AbstractSchedulerClient
1314
from .states import JobState
1415

1516

@@ -32,13 +33,10 @@ def connect(self):
3233
self.sock = sock
3334

3435

35-
class FleetHTTPClient(object):
36+
class FleetHTTPClient(AbstractSchedulerClient):
3637

3738
def __init__(self, target, auth, options, pkey):
38-
self.target = target
39-
self.auth = auth
40-
self.options = options
41-
self.pkey = pkey
39+
super(FleetHTTPClient, self).__init__(target, auth, options, pkey)
4240
# single global connection
4341
self.conn = UHTTPConnection(self.target)
4442

@@ -119,7 +117,7 @@ def _get_machines(self):
119117
# container api
120118

121119
def create(self, name, image, command='', template=None, **kwargs):
122-
"""Create a container"""
120+
"""Create a container."""
123121
self._create_container(name, image, command,
124122
template or copy.deepcopy(CONTAINER_TEMPLATE), **kwargs)
125123

@@ -173,7 +171,7 @@ def _get_hostname(self, application_name):
173171
raise RuntimeError('Unsupported hostname: ' + hostname)
174172

175173
def start(self, name):
176-
"""Start a container"""
174+
"""Start a container."""
177175
self._put_unit(name, {'desiredState': 'launched'})
178176
self._wait_for_container_running(name)
179177

@@ -212,12 +210,12 @@ def _wait_for_destroy(self, name):
212210
raise RuntimeError('timeout on container destroy')
213211

214212
def stop(self, name):
215-
"""Stop a container"""
213+
"""Stop a container."""
216214
self._put_unit(name, {"desiredState": "loaded"})
217215
self._wait_for_job_state(name, JobState.created)
218216

219217
def destroy(self, name):
220-
"""Destroy a container"""
218+
"""Destroy a container."""
221219
# call all destroy functions, ignoring any errors
222220
try:
223221
self._destroy_container(name)
@@ -235,7 +233,7 @@ def _destroy_container(self, name):
235233
raise
236234

237235
def run(self, name, image, entrypoint, command): # noqa
238-
"""Run a one-off command"""
236+
"""Run a one-off command."""
239237
self._create_container(name, image, command, copy.deepcopy(RUN_TEMPLATE),
240238
entrypoint=entrypoint)
241239
# launch the container
@@ -337,13 +335,14 @@ def _do_ssh(cmd):
337335
return rc, output
338336

339337
def state(self, name):
338+
"""Display the given job's running state."""
340339
systemdActiveStateMap = {
341-
"active": "up",
342-
"reloading": "down",
343-
"inactive": "created",
344-
"failed": "crashed",
345-
"activating": "down",
346-
"deactivating": "down",
340+
'active': 'up',
341+
'reloading': 'down',
342+
'inactive': 'created',
343+
'failed': 'crashed',
344+
'activating': 'down',
345+
'deactivating': 'down',
347346
}
348347
try:
349348
# NOTE (bacongobbler): this call to ._get_unit() acts as a pre-emptive check to
@@ -354,9 +353,8 @@ def state(self, name):
354353
# FIXME (bacongobbler): when fleet loads a job, sometimes it'll automatically start and
355354
# stop the container, which in our case will return as 'failed', even though
356355
# the container is perfectly fine.
357-
if activeState == 'failed':
358-
if state['systemdLoadState'] == 'loaded':
359-
return JobState.created
356+
if activeState == 'failed' and state['systemdLoadState'] == 'loaded':
357+
return JobState.created
360358
return getattr(JobState, systemdActiveStateMap[activeState])
361359
except KeyError:
362360
# failed retrieving a proper response from the fleet API
@@ -366,12 +364,6 @@ def state(self, name):
366364
# which means it does not exist
367365
return JobState.destroyed
368366

369-
def attach(self, name):
370-
"""
371-
Attach to a job's stdin, stdout and stderr
372-
"""
373-
raise NotImplementedError
374-
375367
SchedulerClient = FleetHTTPClient
376368

377369

controller/scheduler/k8s.py

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import copy
2-
import json
32
import httplib
4-
import time
3+
import json
4+
import random
55
import re
66
import string
7+
import time
8+
79
from django.conf import settings
8-
from .states import JobState
910
from docker import Client
10-
import random
11+
from .states import JobState
12+
from . import AbstractSchedulerClient
13+
1114

1215
POD_TEMPLATE = '''{
1316
"kind": "Pod",
@@ -95,9 +98,10 @@
9598
r'(?P<app>[a-z0-9-]+)_?(?P<version>v[0-9]+)?\.?(?P<c_type>[a-z-_]+)')
9699

97100

98-
class KubeHTTPClient():
101+
class KubeHTTPClient(AbstractSchedulerClient):
99102

100103
def __init__(self, target, auth, options, pkey):
104+
super(KubeHTTPClient, self).__init__(target, auth, options, pkey)
101105
self.target = settings.K8S_MASTER
102106
self.port = "8080"
103107
self.registry = settings.REGISTRY_HOST+":"+settings.REGISTRY_PORT
@@ -343,6 +347,7 @@ def _create_rc(self, name, image, command, **kwargs):
343347
return json.loads(data)
344348

345349
def create(self, name, image, command, **kwargs):
350+
"""Create a container."""
346351
self._create_rc(name, image, command, **kwargs)
347352
app_type = name.split(".")[1]
348353
name = name.replace(".", "-")
@@ -433,16 +438,12 @@ def _create_service(self, name, app_name, app_type):
433438
raise RuntimeError(errmsg)
434439

435440
def start(self, name):
436-
"""
437-
Start a container
438-
"""
439-
return
441+
"""Start a container."""
442+
pass
440443

441444
def stop(self, name):
442-
"""
443-
Stop a container
444-
"""
445-
return
445+
"""Stop a container."""
446+
pass
446447

447448
def _delete_rc(self, name):
448449
headers = {'Content-Type': 'application/json'}
@@ -460,9 +461,7 @@ def _delete_rc(self, name):
460461
raise RuntimeError(errmsg)
461462

462463
def destroy(self, name):
463-
"""
464-
Destroy a the app
465-
"""
464+
"""Destroy a container."""
466465
name = name.split(".")
467466
name = name[0]+'-'+name[1]
468467
name = name.replace("_", "-")
@@ -586,9 +585,7 @@ def logs(self, name):
586585
return log_data
587586

588587
def run(self, name, image, entrypoint, command):
589-
"""
590-
Run a one-off command
591-
"""
588+
"""Run a one-off command."""
592589
name = name.replace(".", "-")
593590
name = name.replace("_", "-")
594591
l = {}
@@ -666,17 +663,12 @@ def _get_pod_state(self, name):
666663
return JobState.destroyed
667664

668665
def state(self, name):
666+
"""Display the given job's running state."""
669667
try:
670668
return self._get_pod_state(name)
671669
except KeyError:
672670
return JobState.error
673671
except RuntimeError:
674672
return JobState.destroyed
675673

676-
def attach(self, name):
677-
"""
678-
Attach to a job's stdin, stdout and stderr
679-
"""
680-
return NotImplementedError
681-
682674
SchedulerClient = KubeHTTPClient

0 commit comments

Comments
 (0)