-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmock.py
More file actions
131 lines (106 loc) · 4.01 KB
/
mock.py
File metadata and controls
131 lines (106 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import json
import requests
import base64
from .abstract import AbstractSchedulerClient
from .states import JobState, TransitionError
# HACK: MockSchedulerClient is not persistent across requests
jobs = {}
secrets = {}
class MockSchedulerClient(AbstractSchedulerClient):
def create(self, name, image, command, **kwargs):
"""Create a new container."""
jobs.setdefault(name, {})['state'] = JobState.created
def destroy(self, name):
"""Destroy a container."""
jobs.setdefault(name, {})['state'] = JobState.destroyed
def run(self, name, image, entrypoint, command):
"""Run a one-off command."""
# dump input into a json object for testing purposes
return 0, json.dumps({
'name': name,
'image': image,
'entrypoint': entrypoint,
'command': command,
})
def start(self, name):
"""Start a container."""
if self.state(name) not in [JobState.created,
JobState.up,
JobState.down,
JobState.crashed,
JobState.error]:
raise TransitionError(self.state(name),
JobState.up,
'the container must be stopped or up to start')
jobs.setdefault(name, {})['state'] = JobState.up
def state(self, name):
"""Display the given job's running state."""
return jobs.get(name, {}).get('state', JobState.initialized)
def stop(self, name):
"""Stop a container."""
job = jobs.get(name, {})
if job.get('state') not in [JobState.up, JobState.crashed, JobState.error]:
raise TransitionError(job.get('state'),
JobState.up,
'the container must be up to stop')
job['state'] = JobState.down
# Related to k8s services
def resolve_state(self, pod):
# See "Pod Phase" at http://kubernetes.io/v1.1/docs/user-guide/pod-states.html
states = {
"Pending": JobState.initialized,
"Running": JobState.up,
"Succeeded": JobState.down,
"Failed": JobState.crashed,
"Unknown": JobState.error,
}
return states[pod["status"]["phase"]]
def _get_service(self, namespace, name):
resp = requests.Response()
resp.status_code = 200
resp._content = b'{}'
return resp
def _update_service(self, namespace, name, data):
pass
def _get_nodes(self, **kwargs):
resp = requests.Response()
resp.status_code = 200
resp._content = b'{"items": [{"metadata": {"labels": {"env": "prod"}}}]}'
return resp
def _get_pods(self, namespace, **kwargs):
data = {
"items": [
{
"metadata": {
"labels": {
"app": "foo",
"version": "v2",
"type": "web"
},
"name": "this-is-my-pod"
},
"status": {"phase": "Running"}
}
]
}
resp = requests.Response()
resp.status_code = 200
resp._content = json.dumps(data)
return resp
def _get_secret(self, namespace, name):
# Fake out the secrets
resp = requests.Response()
resp.status_code = 200
resp._content = json.dumps(secrets.get(name, {})).encode('UTF-8')
return resp
def _create_secret(self, namespace, name, data):
items = {
'data': {}
}
for key, item in data.items():
items['data'][key] = base64.b64encode(bytes(item, 'UTF-8')).decode()
secrets.setdefault(name, items)
def _delete_secret(self, namespace, name):
if name in secrets:
secrets.pop(name)
SchedulerClient = MockSchedulerClient