-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathpod.py
More file actions
121 lines (96 loc) · 3.61 KB
/
pod.py
File metadata and controls
121 lines (96 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import string
import json
import time
from .base import Base
from .client import error, unhealthy
class Pod(Base):
TEMPLATE = """\
{
"kind": "Pod",
"apiVersion": "$version",
"metadata": {
"name": "$id"
},
"spec": {
"containers": [
{
"name": "$id",
"image": "$image"
}
],
"restartPolicy": "Never"
}
}
"""
def create(self, name, image, template=None):
app_name = name.split('_')[0]
name = name.replace('.', '-').replace('_', '-')
# In case a whole template object is passed down
if not template:
template = json.loads(string.Template(self.TEMPLATE).substitute({
'id': name,
'version': self.api.version,
'image': self.registry + '/' + image,
}))
create = self.api.post("/namespaces/{}/pods", app_name, json=template)
# Wait for 5 seconds until pod is ready
for _ in xrange(5):
response = self.get(name, app_name, check=False)
if unhealthy(response.status_code):
time.sleep(1)
continue
break
if unhealthy(response.status_code):
error(create, 'create Pod in Namespace "{}"', app_name)
return response
def run(self, name, image, entrypoint, command):
"""run a one off command that creates a pod and then tears it down"""
if command.startswith('-c '):
args = command.split(' ', 1)
args[1] = args[1][1:-1]
else:
args = [command[1:-1]]
template = json.loads(string.Template(self.TEMPLATE).substitute({
'id': name,
'version': self.api.version,
'image': self.registry + '/' + image,
}))
template['spec']['containers'][0]['command'] = [entrypoint]
template['spec']['containers'][0]['args'] = args
response = self.create(name, image, template)
# Wait until pod is in the correct state
app_name = name.split('_')[0]
item = response.json()
while(1):
if item['status']['phase'] == 'Succeeded':
response = self.log(name, app_name)
self.delete(name, app_name)
return 0, response.text
elif item['status']['phase'] == 'Failed':
pod_state = item['status']['containerStatuses'][0]['state']
err_code = pod_state['terminated']['exitCode']
self.delete(name, app_name)
return err_code, response.text
time.sleep(1)
return 0, response.text
def get(self, name, namespace, check=True):
return self.api.get('/namespaces/{}/pods/{}', namespace, name, check=check)
def status(self, name, namespace):
"""Returns the status code of a pod"""
return self.get(name, namespace, check=False).status_code
def all(self, namespace):
return self.api.get('/namespaces/{}/pods', namespace)
def delete(self, name, namespace):
delete = self.api.delete('/namespaces/{}/pods/{}', namespace, name)
# Verify the pod has been deleted. Give it 5 seconds.
for _ in xrange(5):
# Fetching without health check to check for 404
status = self.status(name, namespace)
if status == 404:
break
time.sleep(1)
# Pod was not deleted within the grace period.
if status != 404:
error(delete, 'delete Pod "{}" in Namespace "{}"', name, namespace)
def log(self, name, namespace):
return self.api.get("/namespaces/{}/pods/{}/log", namespace, name)