1- import logging
2- from docker import Client
1+ import re
2+ import time
33
44from django .conf import settings
5+ from docker import Client
6+
7+ from .states import JobState
58
69
10+ MATCH = re .compile (
11+ r'(?P<app>[a-z0-9-]+)_?(?P<version>v[0-9]+)?\.?(?P<c_type>[a-z-_]+)?.(?P<c_num>[0-9]+)' )
12+
713
814class SwarmClient (object ):
9- def __init__ (self ,target , auth , options , pkey ):
10- self .target = settings .SWARM_HOST
15+
16+ def __init__ (self , target , auth , options , pkey ):
17+ self .target = settings .SWARM_HOST
1118 # single global connection
12- self .registry = settings .REGISTRY_HOST + ":" + settings .REGISTRY_PORT
13- self .docker_cli = Client (base_url = 'tcp://' + self .target + ':' + "2395" ,timeout = 1200 )
19+ self .registry = settings .REGISTRY_HOST + ':' + settings .REGISTRY_PORT
20+ self .docker_cli = Client ("tcp://{}:2395" .format (self .target ),
21+ timeout = 1200 , version = '1.17' )
1422
1523 def create (self , name , image , command = '' , template = None , ** kwargs ):
1624 """Create a container"""
17- cimage = self .registry + "/" + image
18- cname = name
19- ccommand = command
20- # self.docker_cli.pull(cimage, stream=False,insecure_registry=True)
21- self .docker_cli .create_container (image = cimage ,name = cname ,command = ccommand )#,hostname=self._get_hostname(cname),ports=self._get_ports(cimage))
22- self .docker_cli .start (cname , port_bindings = self ._get_portbindings (cimage ),publish_all_ports = True )
25+ cimage = self .registry + '/' + image
26+ affinity = "affinity:container!=~/{}*/" .format (re .split (r'_v\d.' , name )[0 ])
27+ l = locals ().copy ()
28+ l .update (re .match (MATCH , name ).groupdict ())
29+ mem = kwargs .get ('memory' , {}).get (l ['c_type' ])
30+ if mem :
31+ mem = mem .lower ()
32+ if mem [- 2 :- 1 ].isalpha () and mem [- 1 ].isalpha ():
33+ mem = mem [:- 1 ]
34+ cpu = kwargs .get ('cpu' , {}).get (l ['c_type' ])
35+ self .docker_cli .create_container (image = cimage , name = name ,
36+ command = command .encode ('utf-8' ), mem_limit = mem ,
37+ cpu_shares = cpu ,
38+ environment = [affinity ])
39+ self .docker_cli .stop (name )
2340
2441 def start (self , name ):
2542 """
2643 Start a container
2744 """
28- self .docker_cli .start (name )
45+ self .docker_cli .start (name , publish_all_ports = True )
46+
2947 return
3048
3149 def stop (self , name ):
@@ -34,54 +52,94 @@ def stop(self, name):
3452 """
3553 self .docker_cli .stop (name )
3654 return
55+
3756 def destroy (self , name ):
3857 """
3958 Destroy a container
4059 """
41- self .docker_cli . stop (name )
60+ self .stop (name )
4261 self .docker_cli .remove_container (name )
4362 return
4463
4564 def run (self , name , image , entrypoint , command ):
4665 """
4766 Run a one-off command
4867 """
49- # dump input into a json object for testing purposes
50- return 0 , json .dumps ({'name' : name ,
51- 'image' : image ,
52- 'entrypoint' : entrypoint ,
53- 'command' : command })
68+ cimage = self .registry + '/' + image
69+ # use affinity for nodes that already have the image
70+ affinity = "affinity:image==~{}" .format (cimage )
71+ self .docker_cli .create_container (image = cimage , name = name ,
72+ command = command .encode ('utf-8' ),
73+ environment = [affinity ],
74+ entrypoint = [entrypoint ])
75+ time .sleep (2 )
76+ self .start (name )
77+ rc = 0
78+ while (True ):
79+ if self ._get_container_state (name ) == JobState .created :
80+ break
81+ time .sleep (1 )
82+ try :
83+ output = self .docker_cli .logs (name )
84+ return rc , output
85+ except :
86+ rc = 1
87+ return rc , output
88+
89+ def _get_container_state (self , name ):
90+ try :
91+ if self .docker_cli .inspect_container (name )['State' ]['Running' ]:
92+ return JobState .up
93+ else :
94+ return JobState .created
95+ except :
96+ return JobState .destroyed
97+
98+ def state (self , name ):
99+ try :
100+ # NOTE (bacongobbler): this call to ._get_unit() acts as a pre-emptive check to
101+ # determine if the job no longer exists (will raise a RuntimeError on 404)
102+ for _ in range (30 ):
103+ return self ._get_container_state (name )
104+ time .sleep (1 )
105+ # FIXME (bacongobbler): when fleet loads a job, sometimes it'll automatically start and
106+ # stop the container, which in our case will return as 'failed', even though
107+ # the container is perfectly fine.
108+ except KeyError :
109+ # failed retrieving a proper response from the fleet API
110+ return JobState .error
111+ except RuntimeError :
112+ # failed to retrieve a response from the fleet API,
113+ # which means it does not exist
114+ return JobState .destroyed
54115
55116 def attach (self , name ):
56117 """
57118 Attach to a job's stdin, stdout and stderr
58119 """
59- return StringIO (), StringIO (), StringIO ()
120+ raise NotImplementedError
60121
61122 def _get_hostname (self , application_name ):
62123 hostname = settings .UNIT_HOSTNAME
63- if hostname == " default" :
124+ if hostname == ' default' :
64125 return ''
65- elif hostname == " application" :
126+ elif hostname == ' application' :
66127 # replace underscore with dots, since underscore is not valid in DNS hostnames
67- dns_name = application_name .replace ("_" , "." )
128+ dns_name = application_name .replace ('_' , '.' )
68129 return dns_name
69- elif hostname == " server" :
130+ elif hostname == ' server' :
70131 raise NotImplementedError
71132 else :
72133 raise RuntimeError ('Unsupported hostname: ' + hostname )
73134
74- def _get_portbindings (self ,image ):
75- dictports = self .docker_cli .inspect_image (image )[" ContainerConfig" ][ " ExposedPorts" ]
76- for port ,mapping in dictports .items ():
77- dictports [port ]= None
135+ def _get_portbindings (self , image ):
136+ dictports = self .docker_cli .inspect_image (image )[' ContainerConfig' ][ ' ExposedPorts' ]
137+ for port , mapping in dictports .items ():
138+ dictports [port ] = None
78139 return dictports
79140
80- def _get_ports (self ,image ):
81- ports = []
82- dictports = self .docker_cli .inspect_image (image )["ContainerConfig" ]["ExposedPorts" ]
83- for port ,mapping in dictports .items ():
84- ports .append (int (port .split ('/' )[0 ]))
85- return ports
141+ def _get_ports (self , image ):
142+ dictports = self .docker_cli .inspect_image (image )['ContainerConfig' ]['ExposedPorts' ]
143+ return [int (port .split ('/' )[0 ]) for port in dictports .iterkeys ()]
86144
87145SchedulerClient = SwarmClient
0 commit comments