Skip to content

Commit 077429c

Browse files
author
Matthew Fisher
committed
feat(start): wait on containers to start
Previously, containers would start and would be considered "running" once Fleet launched the container and flagged the job as "launched". The job was launched, but that doesn't mean that the container was running. Dropping down to systemd's level (exposed through Fleet) allows us to wait until the container is actually running. In the case of data containers, it waits until they have exited. This commit also removes the start logic from `deisctl install` so that we can just load the unit files and configure the platform before starting it.
1 parent d455df1 commit 077429c

3 files changed

Lines changed: 207 additions & 58 deletions

File tree

client/start.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package client
22

3-
import "github.com/coreos/fleet/job"
3+
import (
4+
"strings"
5+
6+
"github.com/coreos/fleet/job"
7+
"github.com/coreos/fleet/schema"
8+
)
49

510
// Start launches target units and blocks until active
611
func (c *FleetClient) Start(target string) (err error) {
@@ -14,8 +19,16 @@ func (c *FleetClient) Start(target string) (err error) {
1419
if err != nil {
1520
return err
1621
}
17-
outchan, errchan := waitForUnitStates(units, desiredState)
18-
err = printUnitState(name, outchan, errchan)
22+
// wait for systemd to tell us that it's running, not fleet
23+
var errchan chan error
24+
var outchan chan *schema.Unit
25+
// data containers are special snowflakes who just exit
26+
if strings.Contains(name, "-data.service") {
27+
outchan, errchan = waitForUnitSubStates(units, "exited")
28+
} else {
29+
outchan, errchan = waitForUnitSubStates(units, "running")
30+
}
31+
err = printUnitSubState(name, outchan, errchan)
1932
if err != nil {
2033
return err
2134
}

client/state.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,85 @@
11
package client
22

33
import (
4+
"errors"
45
"fmt"
56
"sync"
67
"time"
78

89
"github.com/coreos/fleet/schema"
910
)
1011

12+
// waitForUnitStates polls each of the indicated jobs until each of their
13+
// systemd substates is equal to that which the caller indicates
14+
func waitForUnitSubStates(units []string, desiredState string) (outchan chan *schema.Unit, errchan chan error) {
15+
var wg sync.WaitGroup
16+
errchan = make(chan error)
17+
outchan = make(chan *schema.Unit)
18+
19+
// check each unit for desired state
20+
for _, name := range units {
21+
wg.Add(1)
22+
go checkUnitSubState(name, desiredState, outchan, errchan, &wg)
23+
}
24+
25+
// wait for all jobs to complete
26+
go func() {
27+
wg.Wait()
28+
close(outchan)
29+
close(errchan)
30+
}()
31+
32+
return outchan, errchan
33+
}
34+
35+
func checkUnitSubState(name string, desiredState string, outchan chan *schema.Unit, errchan chan error, wg *sync.WaitGroup) {
36+
defer wg.Done()
37+
for {
38+
if assertUnitSubState(name, desiredState, outchan, errchan) {
39+
return
40+
}
41+
time.Sleep(100 * time.Millisecond)
42+
}
43+
}
44+
45+
func assertUnitSubState(name string, desiredState string, outchan chan *schema.Unit, errchan chan error) bool {
46+
u, err := cAPI.Unit(name)
47+
if err != nil {
48+
errchan <- fmt.Errorf("Error retrieving Job(%s) from Registry: %v", name, err)
49+
return false
50+
}
51+
52+
// send unit across the output channel
53+
outchan <- u
54+
55+
unitState, err := unitState(name)
56+
if err != nil {
57+
errchan <- fmt.Errorf("Error retrieving Unit state from Registry: %v", err)
58+
return false
59+
}
60+
61+
if unitState.SystemdSubState == desiredState {
62+
return true
63+
}
64+
return false
65+
}
66+
67+
// unitState retrieves a UnitState based on its name.
68+
// FIXME: add this to fleet's API
69+
func unitState(name string) (*schema.UnitState, error) {
70+
unitStates, err := cAPI.UnitStates()
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
for _, state := range unitStates {
76+
if state.Name == name {
77+
return state, nil
78+
}
79+
}
80+
return nil, errors.New("Could not find unit state: " + name)
81+
}
82+
1183
// waitForUnitStates polls each of the indicated jobs until each of their
1284
// states is equal to that which the caller indicates
1385
func waitForUnitStates(units []string, desiredState string) (outchan chan *schema.Unit, errchan chan error) {
@@ -59,6 +131,39 @@ func assertUnitState(name string, desiredState string, outchan chan *schema.Unit
59131
return false
60132
}
61133

134+
func printUnitSubState(name string, outchan chan *schema.Unit, errchan chan error) error {
135+
// print output while jobs are transitioning
136+
defer fmt.Printf("\n")
137+
for {
138+
select {
139+
case u := <-outchan:
140+
// return on closed channel
141+
if u == nil {
142+
return nil
143+
}
144+
// ignore units that don't match our unit
145+
if u.Name != name {
146+
continue
147+
}
148+
// retrieve the unit's state
149+
unitState, err := unitState(u.Name)
150+
if err != nil {
151+
return err
152+
}
153+
fmt.Printf("\033[0;33m%v:\033[0m %v \r",
154+
u.Name, unitState.SystemdSubState)
155+
// read from error channel
156+
case err := <-errchan:
157+
// continue processing if error channel closed
158+
if err == nil {
159+
continue
160+
}
161+
return err
162+
}
163+
time.Sleep(200 * time.Millisecond)
164+
}
165+
}
166+
62167
func printUnitState(name string, outchan chan *schema.Unit, errchan chan error) error {
63168
// print output while jobs are transitioning
64169
defer fmt.Printf("\n")

cmd/cmd.go

Lines changed: 86 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ import (
1717
"github.com/docopt/docopt-go"
1818
)
1919

20+
const (
21+
PlatformInstallCommand string = "platform"
22+
)
23+
24+
var (
25+
DefaultDataContainers = []string{
26+
"database-data",
27+
"registry-data",
28+
"logger-data",
29+
"builder-data",
30+
}
31+
)
32+
2033
func ListUnits(c client.Client) error {
2134
err := c.ListUnits()
2235
return err
@@ -42,6 +55,10 @@ func Scale(c client.Client, targets []string) error {
4255
}
4356

4457
func Start(c client.Client, targets []string) error {
58+
// if target is platform, install all services
59+
if len(targets) == 1 && targets[0] == PlatformInstallCommand {
60+
return StartPlatform(c)
61+
}
4562
for _, target := range targets {
4663
err := c.Start(target)
4764
if err != nil {
@@ -51,6 +68,51 @@ func Start(c client.Client, targets []string) error {
5168
return nil
5269
}
5370

71+
func StartPlatform(c client.Client) error {
72+
fmt.Println("Starting Platform...")
73+
if err := startDataContainers(c); err != nil {
74+
return err
75+
}
76+
if err := startDefaultServices(c); err != nil {
77+
return err
78+
}
79+
fmt.Println("Platform started.")
80+
return nil
81+
}
82+
83+
func startDataContainers(c client.Client) error {
84+
fmt.Println("Launching data containers...")
85+
for _, dataContainer := range DefaultDataContainers {
86+
err := c.Start(dataContainer)
87+
if err != nil {
88+
return err
89+
}
90+
}
91+
fmt.Println("Data containers launched.")
92+
return nil
93+
}
94+
95+
func startDefaultServices(c client.Client) error {
96+
fmt.Println("Launching service containers...")
97+
if err := Start(c, []string{"logger", "cache", "database"}); err != nil {
98+
return err
99+
}
100+
if err := Start(c, []string{"registry"}); err != nil {
101+
return err
102+
}
103+
if err := Start(c, []string{"controller"}); err != nil {
104+
return err
105+
}
106+
if err := Start(c, []string{"builder"}); err != nil {
107+
return err
108+
}
109+
if err := Start(c, []string{"router"}); err != nil {
110+
return err
111+
}
112+
fmt.Println("Service containers launched.")
113+
return nil
114+
}
115+
54116
func Stop(c client.Client, targets []string) error {
55117
for _, target := range targets {
56118
err := c.Stop(target)
@@ -97,53 +159,41 @@ func Journal(c client.Client, targets []string) error {
97159

98160
func Install(c client.Client, targets []string) error {
99161
// if target is platform, install all services
100-
if len(targets) == 1 && targets[0] == "platform" {
101-
err := installDataContainers(c)
102-
if err != nil {
103-
return err
104-
}
105-
err = installDefaultServices(c)
106-
if err != nil {
107-
return err
108-
}
162+
if len(targets) == 1 && targets[0] == PlatformInstallCommand {
163+
return InstallPlatform(c)
109164
} else {
110-
// otherwise create and start the specific targets
165+
// otherwise create the specific targets
111166
for _, target := range targets {
112167
err := c.Create(target)
113168
if err != nil {
114169
return err
115170
}
116-
err = c.Start(target)
117-
if err != nil {
118-
return err
119-
}
120171
}
121172
}
122173
return nil
123174
}
124175

125-
func installDataContainers(c client.Client) error {
126-
// data containers
127-
dataContainers := []string{
128-
"database-data",
129-
"registry-data",
130-
"logger-data",
131-
"builder-data",
176+
func InstallPlatform(c client.Client) error {
177+
err := installDataContainers(c)
178+
if err != nil {
179+
return err
132180
}
133-
fmt.Println("\nScheduling data containers...")
134-
for _, dataContainer := range dataContainers {
135-
err := c.Create(dataContainer)
136-
if err != nil {
137-
return err
138-
}
181+
err = installDefaultServices(c)
182+
if err != nil {
183+
return err
139184
}
140-
fmt.Println("\nLaunching data containers...")
141-
for _, dataContainer := range dataContainers {
142-
err := c.Start(dataContainer)
185+
return nil
186+
}
187+
188+
func installDataContainers(c client.Client) error {
189+
fmt.Println("Scheduling data containers...")
190+
for _, dataContainer := range DefaultDataContainers {
191+
err := c.Create(dataContainer)
143192
if err != nil {
144193
return err
145194
}
146195
}
196+
fmt.Println("Data containers scheduled.")
147197
return nil
148198
}
149199

@@ -157,36 +207,17 @@ func installDefaultServices(c client.Client) error {
157207
"controller=1",
158208
"builder=1",
159209
"router=1"}
160-
fmt.Println("\nScheduling service containers...")
161-
err := Scale(c, targets)
162-
fmt.Println("\nLaunching service containers...")
163-
err = Start(c, []string{"logger", "cache", "database"})
164-
if err != nil {
165-
return err
166-
}
167-
err = Start(c, []string{"registry"})
168-
if err != nil {
169-
return err
170-
}
171-
err = Start(c, []string{"controller"})
172-
if err != nil {
173-
return err
174-
}
175-
err = Start(c, []string{"builder"})
176-
if err != nil {
177-
return err
178-
}
179-
err = Start(c, []string{"router"})
180-
if err != nil {
210+
fmt.Println("Scheduling service containers...")
211+
if err := Scale(c, targets); err != nil {
181212
return err
182213
}
183-
fmt.Println("Done.")
214+
fmt.Println("Service containers scheduled.")
184215
return nil
185216
}
186217

187218
func Uninstall(c client.Client, targets []string) error {
188219
// if target is platform, uninstall all services
189-
if len(targets) == 1 && targets[0] == "platform" {
220+
if len(targets) == 1 && targets[0] == PlatformInstallCommand {
190221
err := uninstallAllServices(c)
191222
if err != nil {
192223
return err
@@ -212,9 +243,9 @@ func uninstallAllServices(c client.Client) error {
212243
"controller=0",
213244
"builder=0",
214245
"router=0"}
215-
fmt.Println("\nDestroying service containers...")
246+
fmt.Println("Destroying service containers...")
216247
err := Scale(c, targets)
217-
fmt.Println("Done.")
248+
fmt.Println("Service containers destroyed.")
218249
return err
219250
}
220251

0 commit comments

Comments
 (0)