-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathswarm.py
More file actions
145 lines (125 loc) · 4.98 KB
/
swarm.py
File metadata and controls
145 lines (125 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import re
import time
from django.conf import settings
from docker import Client
from .states import JobState
MATCH = re.compile(
r'(?P<app>[a-z0-9-]+)_?(?P<version>v[0-9]+)?\.?(?P<c_type>[a-z-_]+)?.(?P<c_num>[0-9]+)')
class SwarmClient(object):
def __init__(self, target, auth, options, pkey):
self.target = settings.SWARM_HOST
# single global connection
self.registry = settings.REGISTRY_HOST + ':' + settings.REGISTRY_PORT
self.docker_cli = Client("tcp://{}:2395".format(self.target),
timeout=1200, version='1.17')
def create(self, name, image, command='', template=None, **kwargs):
"""Create a container"""
cimage = self.registry + '/' + image
affinity = "affinity:container!=~/{}*/".format(re.split(r'_v\d.', name)[0])
l = locals().copy()
l.update(re.match(MATCH, name).groupdict())
mem = kwargs.get('memory', {}).get(l['c_type'])
if mem:
mem = mem.lower()
if mem[-2:-1].isalpha() and mem[-1].isalpha():
mem = mem[:-1]
cpu = kwargs.get('cpu', {}).get(l['c_type'])
self.docker_cli.create_container(image=cimage, name=name,
command=command.encode('utf-8'), mem_limit=mem,
cpu_shares=cpu,
environment=[affinity])
self.docker_cli.stop(name)
def start(self, name):
"""
Start a container
"""
self.docker_cli.start(name, publish_all_ports=True)
return
def stop(self, name):
"""
Stop a container
"""
self.docker_cli.stop(name)
return
def destroy(self, name):
"""
Destroy a container
"""
self.stop(name)
self.docker_cli.remove_container(name)
return
def run(self, name, image, entrypoint, command):
"""
Run a one-off command
"""
cimage = self.registry + '/' + image
# use affinity for nodes that already have the image
affinity = "affinity:image==~{}".format(cimage)
self.docker_cli.create_container(image=cimage, name=name,
command=command.encode('utf-8'),
environment=[affinity],
entrypoint=[entrypoint])
time.sleep(2)
self.start(name)
rc = 0
while (True):
if self._get_container_state(name) == JobState.created:
break
time.sleep(1)
try:
output = self.docker_cli.logs(name)
return rc, output
except:
rc = 1
return rc, output
def _get_container_state(self, name):
try:
if self.docker_cli.inspect_container(name)['State']['Running']:
return JobState.up
else:
return JobState.created
except:
return JobState.destroyed
def state(self, name):
try:
# NOTE (bacongobbler): this call to ._get_unit() acts as a pre-emptive check to
# determine if the job no longer exists (will raise a RuntimeError on 404)
for _ in range(30):
return self._get_container_state(name)
time.sleep(1)
# FIXME (bacongobbler): when fleet loads a job, sometimes it'll automatically start and
# stop the container, which in our case will return as 'failed', even though
# the container is perfectly fine.
except KeyError:
# failed retrieving a proper response from the fleet API
return JobState.error
except RuntimeError:
# failed to retrieve a response from the fleet API,
# which means it does not exist
return JobState.destroyed
def attach(self, name):
"""
Attach to a job's stdin, stdout and stderr
"""
raise NotImplementedError
def _get_hostname(self, application_name):
hostname = settings.UNIT_HOSTNAME
if hostname == 'default':
return ''
elif hostname == 'application':
# replace underscore with dots, since underscore is not valid in DNS hostnames
dns_name = application_name.replace('_', '.')
return dns_name
elif hostname == 'server':
raise NotImplementedError
else:
raise RuntimeError('Unsupported hostname: ' + hostname)
def _get_portbindings(self, image):
dictports = self.docker_cli.inspect_image(image)['ContainerConfig']['ExposedPorts']
for port, mapping in dictports.items():
dictports[port] = None
return dictports
def _get_ports(self, image):
dictports = self.docker_cli.inspect_image(image)['ContainerConfig']['ExposedPorts']
return [int(port.split('/')[0]) for port in dictports.iterkeys()]
SchedulerClient = SwarmClient