Skip to content

Commit 8feed8a

Browse files
author
Matthew Fisher
committed
Merge pull request #51 from bacongobbler/wait-on-start
feat(start): wait on containers to start
2 parents d455df1 + 077429c commit 8feed8a

3 files changed

Lines changed: 207 additions & 58 deletions

File tree

client/start.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package client
22

3-
import "github.com/coreos/fleet/job"
3+
import (
4+
"strings"
5+
6+
"github.com/coreos/fleet/job"
7+
"github.com/coreos/fleet/schema"
8+
)
49

510
// Start launches target units and blocks until active
611
func (c *FleetClient) Start(target string) (err error) {
@@ -14,8 +19,16 @@ func (c *FleetClient) Start(target string) (err error) {
1419
if err != nil {
1520
return err
1621
}
17-
outchan, errchan := waitForUnitStates(units, desiredState)
18-
err = printUnitState(name, outchan, errchan)
22+
// wait for systemd to tell us that it's running, not fleet
23+
var errchan chan error
24+
var outchan chan *schema.Unit
25+
// data containers are special snowflakes who just exit
26+
if strings.Contains(name, "-data.service") {
27+
outchan, errchan = waitForUnitSubStates(units, "exited")
28+
} else {
29+
outchan, errchan = waitForUnitSubStates(units, "running")
30+
}
31+
err = printUnitSubState(name, outchan, errchan)
1932
if err != nil {
2033
return err
2134
}

client/state.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,85 @@
11
package client
22

33
import (
4+
"errors"
45
"fmt"
56
"sync"
67
"time"
78

89
"github.com/coreos/fleet/schema"
910
)
1011

12+
// waitForUnitStates polls each of the indicated jobs until each of their
13+
// systemd substates is equal to that which the caller indicates
14+
func waitForUnitSubStates(units []string, desiredState string) (outchan chan *schema.Unit, errchan chan error) {
15+
var wg sync.WaitGroup
16+
errchan = make(chan error)
17+
outchan = make(chan *schema.Unit)
18+
19+
// check each unit for desired state
20+
for _, name := range units {
21+
wg.Add(1)
22+
go checkUnitSubState(name, desiredState, outchan, errchan, &wg)
23+
}
24+
25+
// wait for all jobs to complete
26+
go func() {
27+
wg.Wait()
28+
close(outchan)
29+
close(errchan)
30+
}()
31+
32+
return outchan, errchan
33+
}
34+
35+
func checkUnitSubState(name string, desiredState string, outchan chan *schema.Unit, errchan chan error, wg *sync.WaitGroup) {
36+
defer wg.Done()
37+
for {
38+
if assertUnitSubState(name, desiredState, outchan, errchan) {
39+
return
40+
}
41+
time.Sleep(100 * time.Millisecond)
42+
}
43+
}
44+
45+
func assertUnitSubState(name string, desiredState string, outchan chan *schema.Unit, errchan chan error) bool {
46+
u, err := cAPI.Unit(name)
47+
if err != nil {
48+
errchan <- fmt.Errorf("Error retrieving Job(%s) from Registry: %v", name, err)
49+
return false
50+
}
51+
52+
// send unit across the output channel
53+
outchan <- u
54+
55+
unitState, err := unitState(name)
56+
if err != nil {
57+
errchan <- fmt.Errorf("Error retrieving Unit state from Registry: %v", err)
58+
return false
59+
}
60+
61+
if unitState.SystemdSubState == desiredState {
62+
return true
63+
}
64+
return false
65+
}
66+
67+
// unitState retrieves a UnitState based on its name.
68+
// FIXME: add this to fleet's API
69+
func unitState(name string) (*schema.UnitState, error) {
70+
unitStates, err := cAPI.UnitStates()
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
for _, state := range unitStates {
76+
if state.Name == name {
77+
return state, nil
78+
}
79+
}
80+
return nil, errors.New("Could not find unit state: " + name)
81+
}
82+
1183
// waitForUnitStates polls each of the indicated jobs until each of their
1284
// states is equal to that which the caller indicates
1385
func waitForUnitStates(units []string, desiredState string) (outchan chan *schema.Unit, errchan chan error) {
@@ -59,6 +131,39 @@ func assertUnitState(name string, desiredState string, outchan chan *schema.Unit
59131
return false
60132
}
61133

134+
func printUnitSubState(name string, outchan chan *schema.Unit, errchan chan error) error {
135+
// print output while jobs are transitioning
136+
defer fmt.Printf("\n")
137+
for {
138+
select {
139+
case u := <-outchan:
140+
// return on closed channel
141+
if u == nil {
142+
return nil
143+
}
144+
// ignore units that don't match our unit
145+
if u.Name != name {
146+
continue
147+
}
148+
// retrieve the unit's state
149+
unitState, err := unitState(u.Name)
150+
if err != nil {
151+
return err
152+
}
153+
fmt.Printf("\033[0;33m%v:\033[0m %v \r",
154+
u.Name, unitState.SystemdSubState)
155+
// read from error channel
156+
case err := <-errchan:
157+
// continue processing if error channel closed
158+
if err == nil {
159+
continue
160+
}
161+
return err
162+
}
163+
time.Sleep(200 * time.Millisecond)
164+
}
165+
}
166+
62167
func printUnitState(name string, outchan chan *schema.Unit, errchan chan error) error {
63168
// print output while jobs are transitioning
64169
defer fmt.Printf("\n")

cmd/cmd.go

Lines changed: 86 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ import (
1717
"github.com/docopt/docopt-go"
1818
)
1919

20+
const (
21+
PlatformInstallCommand string = "platform"
22+
)
23+
24+
var (
25+
DefaultDataContainers = []string{
26+
"database-data",
27+
"registry-data",
28+
"logger-data",
29+
"builder-data",
30+
}
31+
)
32+
2033
func ListUnits(c client.Client) error {
2134
err := c.ListUnits()
2235
return err
@@ -42,6 +55,10 @@ func Scale(c client.Client, targets []string) error {
4255
}
4356

4457
func Start(c client.Client, targets []string) error {
58+
// if target is platform, install all services
59+
if len(targets) == 1 && targets[0] == PlatformInstallCommand {
60+
return StartPlatform(c)
61+
}
4562
for _, target := range targets {
4663
err := c.Start(target)
4764
if err != nil {
@@ -51,6 +68,51 @@ func Start(c client.Client, targets []string) error {
5168
return nil
5269
}
5370

71+
func StartPlatform(c client.Client) error {
72+
fmt.Println("Starting Platform...")
73+
if err := startDataContainers(c); err != nil {
74+
return err
75+
}
76+
if err := startDefaultServices(c); err != nil {
77+
return err
78+
}
79+
fmt.Println("Platform started.")
80+
return nil
81+
}
82+
83+
func startDataContainers(c client.Client) error {
84+
fmt.Println("Launching data containers...")
85+
for _, dataContainer := range DefaultDataContainers {
86+
err := c.Start(dataContainer)
87+
if err != nil {
88+
return err
89+
}
90+
}
91+
fmt.Println("Data containers launched.")
92+
return nil
93+
}
94+
95+
func startDefaultServices(c client.Client) error {
96+
fmt.Println("Launching service containers...")
97+
if err := Start(c, []string{"logger", "cache", "database"}); err != nil {
98+
return err
99+
}
100+
if err := Start(c, []string{"registry"}); err != nil {
101+
return err
102+
}
103+
if err := Start(c, []string{"controller"}); err != nil {
104+
return err
105+
}
106+
if err := Start(c, []string{"builder"}); err != nil {
107+
return err
108+
}
109+
if err := Start(c, []string{"router"}); err != nil {
110+
return err
111+
}
112+
fmt.Println("Service containers launched.")
113+
return nil
114+
}
115+
54116
func Stop(c client.Client, targets []string) error {
55117
for _, target := range targets {
56118
err := c.Stop(target)
@@ -97,53 +159,41 @@ func Journal(c client.Client, targets []string) error {
97159

98160
func Install(c client.Client, targets []string) error {
99161
// if target is platform, install all services
100-
if len(targets) == 1 && targets[0] == "platform" {
101-
err := installDataContainers(c)
102-
if err != nil {
103-
return err
104-
}
105-
err = installDefaultServices(c)
106-
if err != nil {
107-
return err
108-
}
162+
if len(targets) == 1 && targets[0] == PlatformInstallCommand {
163+
return InstallPlatform(c)
109164
} else {
110-
// otherwise create and start the specific targets
165+
// otherwise create the specific targets
111166
for _, target := range targets {
112167
err := c.Create(target)
113168
if err != nil {
114169
return err
115170
}
116-
err = c.Start(target)
117-
if err != nil {
118-
return err
119-
}
120171
}
121172
}
122173
return nil
123174
}
124175

125-
func installDataContainers(c client.Client) error {
126-
// data containers
127-
dataContainers := []string{
128-
"database-data",
129-
"registry-data",
130-
"logger-data",
131-
"builder-data",
176+
func InstallPlatform(c client.Client) error {
177+
err := installDataContainers(c)
178+
if err != nil {
179+
return err
132180
}
133-
fmt.Println("\nScheduling data containers...")
134-
for _, dataContainer := range dataContainers {
135-
err := c.Create(dataContainer)
136-
if err != nil {
137-
return err
138-
}
181+
err = installDefaultServices(c)
182+
if err != nil {
183+
return err
139184
}
140-
fmt.Println("\nLaunching data containers...")
141-
for _, dataContainer := range dataContainers {
142-
err := c.Start(dataContainer)
185+
return nil
186+
}
187+
188+
func installDataContainers(c client.Client) error {
189+
fmt.Println("Scheduling data containers...")
190+
for _, dataContainer := range DefaultDataContainers {
191+
err := c.Create(dataContainer)
143192
if err != nil {
144193
return err
145194
}
146195
}
196+
fmt.Println("Data containers scheduled.")
147197
return nil
148198
}
149199

@@ -157,36 +207,17 @@ func installDefaultServices(c client.Client) error {
157207
"controller=1",
158208
"builder=1",
159209
"router=1"}
160-
fmt.Println("\nScheduling service containers...")
161-
err := Scale(c, targets)
162-
fmt.Println("\nLaunching service containers...")
163-
err = Start(c, []string{"logger", "cache", "database"})
164-
if err != nil {
165-
return err
166-
}
167-
err = Start(c, []string{"registry"})
168-
if err != nil {
169-
return err
170-
}
171-
err = Start(c, []string{"controller"})
172-
if err != nil {
173-
return err
174-
}
175-
err = Start(c, []string{"builder"})
176-
if err != nil {
177-
return err
178-
}
179-
err = Start(c, []string{"router"})
180-
if err != nil {
210+
fmt.Println("Scheduling service containers...")
211+
if err := Scale(c, targets); err != nil {
181212
return err
182213
}
183-
fmt.Println("Done.")
214+
fmt.Println("Service containers scheduled.")
184215
return nil
185216
}
186217

187218
func Uninstall(c client.Client, targets []string) error {
188219
// if target is platform, uninstall all services
189-
if len(targets) == 1 && targets[0] == "platform" {
220+
if len(targets) == 1 && targets[0] == PlatformInstallCommand {
190221
err := uninstallAllServices(c)
191222
if err != nil {
192223
return err
@@ -212,9 +243,9 @@ func uninstallAllServices(c client.Client) error {
212243
"controller=0",
213244
"builder=0",
214245
"router=0"}
215-
fmt.Println("\nDestroying service containers...")
246+
fmt.Println("Destroying service containers...")
216247
err := Scale(c, targets)
217-
fmt.Println("Done.")
248+
fmt.Println("Service containers destroyed.")
218249
return err
219250
}
220251

0 commit comments

Comments
 (0)