Skip to content

Commit 69d9b0b

Browse files
author
Matthew Fisher
committed
Merge pull request #1708 from bacongobbler/publisher
ref(scheduler): introduce deis/publisher
2 parents 2459c2c + 9c203ca commit 69d9b0b

18 files changed

Lines changed: 327 additions & 119 deletions

File tree

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
include includes.mk
66

7-
COMPONENTS=builder cache controller database logger registry router
8-
START_ORDER=logger database cache registry controller builder router
7+
COMPONENTS=builder cache controller database logger publisher registry router
8+
START_ORDER=publisher logger database cache registry controller builder router
99
CLIENTS=client deisctl
1010

1111
all: build run

controller/api/models.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -436,9 +436,6 @@ def _get_command(self):
436436

437437
_command = property(_get_command)
438438

439-
def _command_announceable(self):
440-
return self._command.lower() in ['start web', '']
441-
442439
def clone(self, release):
443440
c = Container.objects.create(owner=self.owner,
444441
app=self.app,
@@ -459,7 +456,7 @@ def create(self):
459456
name=job_id,
460457
image=image,
461458
command=self._command,
462-
use_announcer=self._command_announceable(), **kwargs)
459+
**kwargs)
463460
except Exception as e:
464461
err = '{} (create): {}'.format(job_id, e)
465462
log_event(self.app, err, logging.ERROR)
@@ -469,7 +466,7 @@ def create(self):
469466
def start(self):
470467
job_id = self._job_id
471468
try:
472-
self._scheduler.start(job_id, self._command_announceable())
469+
self._scheduler.start(job_id)
473470
except Exception as e:
474471
err = '{} (start): {}'.format(job_id, e)
475472
log_event(self.app, err, logging.WARNING)
@@ -479,7 +476,7 @@ def start(self):
479476
def stop(self):
480477
job_id = self._job_id
481478
try:
482-
self._scheduler.stop(job_id, self._command_announceable())
479+
self._scheduler.stop(job_id)
483480
except Exception as e:
484481
err = '{} (stop): {}'.format(job_id, e)
485482
log_event(self.app, err, logging.ERROR)
@@ -489,7 +486,7 @@ def stop(self):
489486
def destroy(self):
490487
job_id = self._job_id
491488
try:
492-
self._scheduler.destroy(job_id, self._command_announceable())
489+
self._scheduler.destroy(job_id)
493490
except Exception as e:
494491
err = '{} (destroy): {}'.format(job_id, e)
495492
log_event(self.app, err, logging.ERROR)

controller/scheduler/chaos.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,28 @@ def tearDown(self):
2525

2626
# job api
2727

28-
def create(self, name, image, command, use_announcer, **kwargs):
28+
def create(self, name, image, command, **kwargs):
2929
if random.random() < CREATE_ERROR_RATE:
3030
raise RuntimeError
3131
return True
3232

33-
def start(self, name, use_announcer):
33+
def start(self, name):
3434
"""
3535
Start an idle job
3636
"""
3737
if random.random() < START_ERROR_RATE:
3838
raise RuntimeError
3939
return True
4040

41-
def stop(self, name, use_announcer):
41+
def stop(self, name):
4242
"""
4343
Stop a running job
4444
"""
4545
if random.random() < STOP_ERROR_RATE:
4646
raise RuntimeError
4747
return True
4848

49-
def destroy(self, name, use_announcer):
49+
def destroy(self, name):
5050
"""
5151
Destroy an existing job
5252
"""

controller/scheduler/coreos.py

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,12 @@ def _get_machines(self):
101101

102102
# container api
103103

104-
def create(self, name, image, command='', template=None, use_announcer=True, **kwargs):
104+
def create(self, name, image, command='', template=None, **kwargs):
105105
"""Create a container"""
106106
self._create_container(name, image, command,
107107
template or copy.deepcopy(CONTAINER_TEMPLATE), **kwargs)
108108
self._create_log(name, image, command, copy.deepcopy(LOG_TEMPLATE))
109109

110-
if use_announcer:
111-
self._create_announcer(name, image, command, copy.deepcopy(ANNOUNCE_TEMPLATE))
112-
113110
def _create_container(self, name, image, command, unit, **kwargs):
114111
l = locals().copy()
115112
l.update(re.match(MATCH, name).groupdict())
@@ -146,22 +143,10 @@ def _create_log(self, name, image, command, unit):
146143
# post unit to fleet
147144
self._put_unit(name+'-log', {"desiredState": "launched", "options": unit})
148145

149-
def _create_announcer(self, name, image, command, unit):
150-
l = locals().copy()
151-
l.update(re.match(MATCH, name).groupdict())
152-
# construct unit from template
153-
for f in unit:
154-
f['value'] = f['value'].format(**l)
155-
# post unit to fleet
156-
self._put_unit(name+'-announce', {"desiredState": "launched", "options": unit})
157-
158-
def start(self, name, use_announcer=True):
146+
def start(self, name):
159147
"""Start a container"""
160148
self._wait_for_container(name)
161149

162-
if use_announcer:
163-
self._wait_for_announcer(name)
164-
165150
def _wait_for_container(self, name):
166151
# we bump to 20 minutes here to match the timeout on the router and in the app unit files
167152
for _ in range(1200):
@@ -177,24 +162,6 @@ def _wait_for_container(self, name):
177162
else:
178163
raise RuntimeError('container failed to start')
179164

180-
def _wait_for_announcer(self, name):
181-
# wait a bit for the announcer to come up, otherwise we may have hit
182-
# https://github.com/docker/docker/issues/8022
183-
for _ in range(30):
184-
states = self._get_state(name)
185-
if states and len(states.get('states', [])) == 1:
186-
state = states.get('states')[0]
187-
subState = state.get('systemdSubState')
188-
if subState == 'running':
189-
# wait for the router to be reconfigured
190-
time.sleep(10)
191-
break
192-
elif subState == 'failed':
193-
raise RuntimeError('announcer failed to start')
194-
time.sleep(1)
195-
else:
196-
raise RuntimeError('announcer timeout on start')
197-
198165
def _wait_for_destroy(self, name):
199166
for _ in range(30):
200167
states = self._get_state(name)
@@ -204,15 +171,13 @@ def _wait_for_destroy(self, name):
204171
else:
205172
raise RuntimeError('timeout on container destroy')
206173

207-
def stop(self, name, use_announcer=True):
174+
def stop(self, name):
208175
"""Stop a container"""
209176
raise NotImplementedError
210177

211-
def destroy(self, name, use_announcer=True):
178+
def destroy(self, name):
212179
"""Destroy a container"""
213180
funcs = []
214-
if use_announcer:
215-
funcs.append(functools.partial(self._destroy_announcer, name))
216181
funcs.append(functools.partial(self._destroy_container, name))
217182
funcs.append(functools.partial(self._destroy_log, name))
218183
# call all destroy functions, ignoring any errors
@@ -226,9 +191,6 @@ def destroy(self, name, use_announcer=True):
226191
def _destroy_container(self, name):
227192
return self._delete_unit(name)
228193

229-
def _destroy_announcer(self, name):
230-
return self._delete_unit(name+'-announce')
231-
232194
def _destroy_log(self, name):
233195
return self._delete_unit(name+'-log')
234196

@@ -350,18 +312,6 @@ def attach(self, name):
350312
]
351313

352314

353-
ANNOUNCE_TEMPLATE = [
354-
{"section": "Unit", "name": "Description", "value": "{name} announce"},
355-
{"section": "Unit", "name": "BindsTo", "value": "{name}.service"},
356-
{"section": "Service", "name": "EnvironmentFile", "value": "/etc/environment"},
357-
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "until docker inspect -f '{{{{range $i, $e := .NetworkSettings.Ports }}}}{{{{$p := index $e 0}}}}{{{{$p.HostPort}}}}{{{{end}}}}' {name} >/dev/null 2>&1; do sleep 2; done; port=$(docker inspect -f '{{{{range $i, $e := .NetworkSettings.Ports }}}}{{{{$p := index $e 0}}}}{{{{$p.HostPort}}}}{{{{end}}}}' {name}); if [[ -z $port ]]; then echo We have no port...; exit 1; fi; echo Waiting for $port/tcp...; until netstat -lnt | grep :$port >/dev/null; do sleep 1; done"'''}, # noqa
358-
{"section": "Service", "name": "ExecStart", "value": '''/bin/sh -c "port=$(docker inspect -f '{{{{range $i, $e := .NetworkSettings.Ports }}}}{{{{$p := index $e 0}}}}{{{{$p.HostPort}}}}{{{{end}}}}' {name}); echo Connected to $COREOS_PRIVATE_IPV4:$port/tcp, publishing to etcd...; while netstat -lnt | grep :$port >/dev/null; do etcdctl set /deis/services/{app}/{name} $COREOS_PRIVATE_IPV4:$port --ttl 60 >/dev/null; sleep 45; done"'''}, # noqa
359-
{"section": "Service", "name": "ExecStop", "value": "/usr/bin/etcdctl rm --recursive /deis/services/{app}/{name}"}, # noqa
360-
{"section": "Service", "name": "TimeoutStartSec", "value": "20m"},
361-
{"section": "X-Fleet", "name": "MachineOf", "value": "{name}.service"},
362-
]
363-
364-
365315
RUN_TEMPLATE = [
366316
{"section": "Unit", "name": "Description", "value": "{name} admin command"},
367317
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "IMAGE=$(etcdctl get /deis/registry/host 2>&1):$(etcdctl get /deis/registry/port 2>&1)/{image}; docker pull $IMAGE"'''}, # noqa

controller/scheduler/mock.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,25 @@ def tearDown(self):
2020

2121
# container api
2222

23-
def create(self, name, image, command, use_announcer, **kwargs):
23+
def create(self, name, image, command, **kwargs):
2424
"""
2525
Create a new container
2626
"""
2727
return
2828

29-
def start(self, name, use_announcer):
29+
def start(self, name):
3030
"""
3131
Start a container
3232
"""
3333
return
3434

35-
def stop(self, name, use_announcer):
35+
def stop(self, name):
3636
"""
3737
Stop a container
3838
"""
3939
return
4040

41-
def destroy(self, name, use_announcer):
41+
def destroy(self, name):
4242
"""
4343
Destroy a container
4444
"""

deisctl/backend/fleet/create.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
// and blocks until the unit is loaded
1414
func (c *FleetClient) Create(targets []string) error {
1515
units := make([]*schema.Unit, len(targets))
16+
desiredState := string(job.JobStateLoaded)
1617
for i, target := range targets {
1718
unitName, unitFile, err := c.createUnitFile(target)
1819
if err != nil {
@@ -31,13 +32,11 @@ func (c *FleetClient) Create(targets []string) error {
3132
return fmt.Errorf("failed creating job %s: %v", unit.Name, err)
3233
}
3334
}
34-
desiredState := string(job.JobStateLoaded)
3535
if err := c.Fleet.SetUnitTargetState(unit.Name, desiredState); err != nil {
3636
return err
3737
}
3838
}
3939
for _, unit := range units {
40-
desiredState := string(job.JobStateLoaded)
4140
outchan, errchan := waitForUnitStates([]string{unit.Name}, desiredState)
4241
if err := printUnitState(unit.Name, outchan, errchan); err != nil {
4342
return err

deisctl/backend/fleet/destroy.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@ import (
1111
func (c *FleetClient) Destroy(targets []string) error {
1212
for _, target := range targets {
1313
// check if the unit exists
14-
units, err := c.Units(target)
14+
_, err := c.Units(target)
1515
if err != nil {
1616
return err
1717
}
1818
component, num, err := splitTarget(target)
1919
if err != nil {
2020
return err
2121
}
22-
// if no number is specified, destroy ALL THE UNITS!
23-
if num == 0 {
24-
num = len(units)
25-
}
2622
if strings.HasSuffix(component, "-data") {
2723
err = c.destroyDataUnit(component)
2824
} else {
@@ -35,7 +31,7 @@ func (c *FleetClient) Destroy(targets []string) error {
3531
return nil
3632
}
3733

38-
func (c *FleetClient) destroyServiceUnit(component string, num int) (err error) {
34+
func (c *FleetClient) destroyServiceUnit(component string, num int) error {
3935
name, err := formatUnitName(component, num)
4036
if err != nil {
4137
return err
@@ -46,34 +42,30 @@ func (c *FleetClient) destroyServiceUnit(component string, num int) (err error)
4642
return err
4743
}
4844
outchan, errchan := waitForUnitStates([]string{name}, desiredState)
49-
err = printUnitState(name, outchan, errchan)
50-
if err != nil {
45+
if err := printUnitState(name, outchan, errchan); err != nil {
5146
return err
5247
}
5348
if err = c.Fleet.DestroyUnit(name); err != nil {
5449
return fmt.Errorf("failed destroying job %s: %v", name, err)
5550
}
56-
return err
51+
return nil
5752
}
5853

59-
func (c *FleetClient) destroyDataUnit(component string) (err error) {
54+
func (c *FleetClient) destroyDataUnit(component string) error {
6055
name, err := formatUnitName(component, 0)
56+
desiredState := string(job.JobStateInactive)
6157
if err != nil {
6258
return err
6359
}
64-
desiredState := string(job.JobStateInactive)
65-
err = c.Fleet.SetUnitTargetState(name, desiredState)
66-
if err != nil {
60+
if err := c.Fleet.SetUnitTargetState(name, desiredState); err != nil {
6761
return err
6862
}
6963
outchan, errchan := waitForUnitStates([]string{name}, desiredState)
70-
err = printUnitState(name, outchan, errchan)
71-
if err != nil {
64+
if err := printUnitState(name, outchan, errchan); err != nil {
7265
return err
7366
}
74-
if err = c.Fleet.DestroyUnit(name); err != nil {
67+
if err := c.Fleet.DestroyUnit(name); err != nil {
7568
return fmt.Errorf("failed destroying job %s: %v", name, err)
7669
}
77-
return err
78-
70+
return nil
7971
}

deisctl/backend/fleet/state.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ func assertUnitState(name string, desiredState string, outchan chan *schema.Unit
126126
// send unit across the output channel
127127
outchan <- u
128128

129+
// HACK: global units have no soul
130+
if u.CurrentState == "" {
131+
return true
132+
}
133+
129134
if u.DesiredState == u.CurrentState {
130135
return true
131136
}
@@ -182,6 +187,10 @@ func printUnitState(name string, outchan chan *schema.Unit, errchan chan error)
182187
if u.Name != name {
183188
continue
184189
}
190+
// HACK: global units have no soul
191+
if u.CurrentState == "" {
192+
continue
193+
}
185194
// otherwise print output
186195
if u.CurrentState != u.DesiredState {
187196
fmt.Printf("\033[0;33m%v:\033[0m %v (pending) \r",

deisctl/backend/fleet/unit.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"io/ioutil"
66
"os"
77
"path"
8-
"regexp"
98
"strconv"
109
"strings"
1110

@@ -25,18 +24,18 @@ func (c *FleetClient) Units(target string) (units []string, err error) {
2524
if err != nil {
2625
return
2726
}
28-
var r *regexp.Regexp
2927
if strings.HasSuffix(target, "-data") {
30-
r = regexp.MustCompile(`deis\-(` + target + `)\.service`)
31-
} else if strings.Contains(target, "@") {
32-
r = regexp.MustCompile(`deis\-(` + target + `)\.service`)
28+
for _, u := range allUnits {
29+
if strings.Contains(u.Name, target) {
30+
units = []string{u.Name}
31+
return
32+
}
33+
}
3334
} else {
34-
r = regexp.MustCompile(`deis\-(` + target + `)@([\d]+)\.service`)
35-
}
36-
for _, u := range allUnits {
37-
match := r.MatchString(u.Name)
38-
if match {
39-
units = append(units, u.Name)
35+
for _, u := range allUnits {
36+
if strings.Contains(u.Name, target) && !strings.HasSuffix(u.Name, "-data.service") {
37+
units = append(units, u.Name)
38+
}
4039
}
4140
}
4241
if len(units) == 0 {

0 commit comments

Comments
 (0)