-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmock.py
More file actions
86 lines (72 loc) · 2.17 KB
/
mock.py
File metadata and controls
86 lines (72 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import json
from cStringIO import StringIO
from .states import JobState, TransitionNotAllowed
# HACK: MockSchedulerClient is not persistent across requests
jobs = {}
class MockSchedulerClient(object):
def __init__(self, target, auth, options, pkey):
self.target = target
self.auth = auth
self.options = options
self.pkey = pkey
# container api
def attach(self, name):
"""
Attach to a job's stdin, stdout and stderr
"""
return StringIO(), StringIO(), StringIO()
def create(self, name, image, command, **kwargs):
"""
Create a new container
"""
job = jobs.get(name, {})
job.update({'state': JobState.created})
jobs[name] = job
return
def destroy(self, name):
"""
Destroy a container
"""
job = jobs.get(name, {})
job.update({'state': JobState.destroyed})
jobs[name] = job
return
def run(self, name, image, entrypoint, command):
"""
Run a one-off command
"""
# dump input into a json object for testing purposes
return 0, json.dumps({'name': name,
'image': image,
'entrypoint': entrypoint,
'command': command})
def start(self, name):
"""
Start a container
"""
if self.state(name) not in [JobState.created, JobState.up, JobState.down]:
raise TransitionNotAllowed
job = jobs.get(name, {})
job.update({'state': JobState.up})
jobs[name] = job
return
def state(self, name):
"""
Display the given job's running state
"""
state = JobState.initialized
job = jobs.get(name)
if job:
state = job.get('state')
return state
def stop(self, name):
"""
Stop a container
"""
job = jobs.get(name, {})
if job.get('state') != JobState.up:
raise TransitionNotAllowed
job.update({'state': JobState.stopped})
jobs[name] = job
return
SchedulerClient = MockSchedulerClient