Skip to content

Commit 0dcd5a0

Browse files
author
Gabriel Monroy
committed
refactor(state): cleanup state polling and output
1 parent 265cc3d commit 0dcd5a0

4 files changed

Lines changed: 113 additions & 83 deletions

File tree

client/create.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package client
22

33
import (
44
"fmt"
5-
"os"
65
"strings"
76

87
"github.com/coreos/fleet/job"
@@ -31,9 +30,10 @@ func (c *FleetClient) Create(target string) (err error) {
3130
if err != nil {
3231
return err
3332
}
34-
errchan := waitForJobStates([]string{unitName}, testJobStateLoaded, 0, os.Stdout)
35-
for err := range errchan {
36-
return fmt.Errorf("error waiting for job %s: %v", unitName, err)
33+
check := newStateCheck(testJobStateLoaded)
34+
err = waitForJobStates([]string{unitName}, check)
35+
if err != nil {
36+
return err
3737
}
3838
return nil
3939
}

client/start.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
package client
22

3-
import (
4-
"fmt"
5-
"os"
6-
7-
"github.com/coreos/fleet/job"
8-
)
3+
import "github.com/coreos/fleet/job"
94

105
// Start launches target units and blocks until active
116
func (c *FleetClient) Start(target string) (err error) {
@@ -20,9 +15,10 @@ func (c *FleetClient) Start(target string) (err error) {
2015
return err
2116
}
2217
}
23-
errchan := waitForJobStates(units, testUnitStateActive, 0, os.Stdout)
24-
for err := range errchan {
25-
return fmt.Errorf("error waiting for active: %v", err)
18+
check := newStateCheck(testUnitStateActive)
19+
err = waitForJobStates(units, check)
20+
if err != nil {
21+
return err
2622
}
2723
return nil
2824
}

client/state.go

Lines changed: 99 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package client
22

33
import (
44
"fmt"
5-
"io"
6-
"os"
75
"sync"
86
"time"
97

@@ -13,99 +11,139 @@ import (
1311
type testJob func(j *job.Job) bool
1412

1513
func testJobStateLoaded(j *job.Job) bool {
16-
return j == nil || j.State == nil || *(j.State) != job.JobStateLoaded
14+
if j == nil || j.State == nil {
15+
return false
16+
}
17+
return *(j.State) == job.JobStateLoaded
1718
}
1819

1920
func testJobStateLaunched(j *job.Job) bool {
20-
return j == nil || j.State == nil || *(j.State) != job.JobStateLaunched
21+
if j == nil || j.State == nil {
22+
return false
23+
}
24+
return *(j.State) == job.JobStateLaunched
2125
}
2226

2327
func testJobStateInactive(j *job.Job) bool {
24-
return j == nil || j.State == nil || *(j.State) != job.JobStateInactive
28+
if j == nil || j.State == nil {
29+
return false
30+
}
31+
return *(j.State) == job.JobStateInactive
2532
}
2633

2734
func testUnitStateActive(j *job.Job) bool {
28-
return j == nil || j.UnitState == nil || j.UnitState.ActiveState != "active"
35+
if j == nil || j.UnitState == nil {
36+
return false
37+
}
38+
return j.UnitState.ActiveState == "active"
2939

3040
}
3141

32-
// TODO: refactor to separate presentation to io.Writer from status polling
42+
// stateCheck defines how to monitor a job state
43+
type stateCheck struct {
44+
test testJob
45+
statechan chan *jobState
46+
errchan chan error
47+
}
3348

34-
// waitForJobStates polls each of the indicated jobs until each of their
35-
// states is equal to that which the caller indicates, or until the
36-
// polling operation times out. waitForJobStates will retry forever, or
37-
// up to maxAttempts times before timing out if maxAttempts is greater
38-
// than zero. Returned is an error channel used to communicate when
39-
// timeouts occur. The returned error channel will be closed after all
40-
// polling operation is complete.
41-
func waitForJobStates(jobs []string, test testJob, maxAttempts int, out io.Writer) chan error {
49+
// newStateCheck returns a StateCheck struct with new channels for monitoring
50+
func newStateCheck(test testJob) *stateCheck {
51+
statechan := make(chan *jobState)
4252
errchan := make(chan error)
53+
return &stateCheck{test, statechan, errchan}
54+
}
55+
56+
// waitForJobStates polls each of the indicated jobs until each of their
57+
// states is equal to that which the caller indicates via stateCheck test
58+
func waitForJobStates(jobs []string, check *stateCheck) error {
4359
var wg sync.WaitGroup
60+
61+
// check each job with the stateCheck
4462
for _, name := range jobs {
4563
wg.Add(1)
46-
go checkJobState(name, test, maxAttempts, out, &wg, errchan)
64+
go checkJobState(name, check, &wg)
4765
}
66+
67+
// wait for all jobs to complete
4868
go func() {
4969
wg.Wait()
50-
close(errchan)
70+
close(check.errchan)
5171
}()
52-
return errchan
53-
}
5472

55-
func checkJobState(jobName string, test testJob, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
56-
defer wg.Done()
57-
sleep := 100 * time.Millisecond
58-
if maxAttempts < 1 {
59-
for {
60-
if assertJobState(jobName, test, out) {
61-
return
73+
// print output while jobs are transitioning
74+
defer fmt.Printf("\n")
75+
for {
76+
select {
77+
// read from state channel
78+
case state := <-check.statechan:
79+
// return on closed channel
80+
if state == nil {
81+
return nil
6282
}
63-
time.Sleep(sleep)
64-
}
65-
} else {
66-
for attempt := 0; attempt < maxAttempts; attempt++ {
67-
if assertJobState(jobName, test, out) {
68-
return
83+
// otherwise print output
84+
if state.sub == "" || state.sub == "dead" {
85+
fmt.Printf("\033[0;33m%v:\033[0m %v, %v \r",
86+
state.name, state.loaded, state.active)
87+
} else {
88+
fmt.Printf("\033[0;33m%v:\033[0m %v, %v (%v) \r",
89+
state.name, state.loaded, state.active, state.sub)
6990
}
70-
time.Sleep(sleep)
91+
// read from error channel
92+
case err := <-check.errchan:
93+
return err
7194
}
72-
errchan <- fmt.Errorf("timed out waiting for job %s to report state", jobName)
95+
time.Sleep(200 * time.Millisecond)
7396
}
7497
}
7598

76-
func assertJobState(name string, test testJob, out io.Writer) (ret bool) {
77-
j, err := cAPI.Job(name)
78-
if err != nil {
79-
fmt.Fprintf(os.Stderr, "Error retrieving Job(%s) from Registry: %v", name, err)
80-
return
81-
}
82-
if test(j) {
83-
if j.State != nil && j.UnitState != nil && j.UnitState.ActiveState != "" && j.UnitState.SubState != "" {
84-
fmt.Fprintf(out, "\033[0;33m%v:\033[0m %v, %v (%v) \r", j.Name, *(j.State), j.UnitState.ActiveState, j.UnitState.SubState)
99+
func checkJobState(jobName string, check *stateCheck, wg *sync.WaitGroup) {
100+
defer wg.Done()
101+
sleep := 100 * time.Millisecond
102+
for {
103+
if assertJobState(jobName, check) {
104+
return
85105
}
86-
return
106+
time.Sleep(sleep)
87107
}
88-
ret = true
108+
}
109+
110+
type jobState struct {
111+
name string
112+
loaded string
113+
active string
114+
sub string
115+
}
89116

90-
var msg string
91-
if j.State != nil && j.UnitState != nil && j.UnitState.ActiveState != "" && j.UnitState.SubState != "" {
92-
msg = fmt.Sprintf("\033[0;33m%v:\033[0m %v, %v (%v)", name, *(j.State), j.UnitState.ActiveState, j.UnitState.SubState)
93-
} else {
94-
msg = fmt.Sprintf("\033[0;33m%v:\033[0m %v", name, *(j.State))
117+
func newJobState(name string, j *job.Job) *jobState {
118+
var (
119+
loaded string
120+
active string
121+
sub string
122+
)
123+
if j.State != nil {
124+
loaded = fmt.Sprintf("%v", *(j.State))
95125
}
126+
if j.UnitState != nil {
127+
active = j.UnitState.ActiveState
128+
sub = j.UnitState.SubState
129+
}
130+
return &jobState{name, loaded, active, sub}
131+
}
96132

97-
machines, err := cAPI.Machines()
133+
func assertJobState(name string, check *stateCheck) bool {
134+
j, err := cAPI.Job(name)
98135
if err != nil {
99-
fmt.Fprintf(os.Stderr, "Failed retrieving list of Machines from Registry: %v", err)
100-
}
101-
for _, ms := range machines {
102-
if ms.ID != j.TargetMachineID {
103-
continue
104-
}
105-
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(ms, false))
106-
break
136+
check.errchan <- fmt.Errorf("Error retrieving Job(%s) from Registry: %v", name, err)
137+
return false
107138
}
108139

109-
fmt.Fprintln(out, msg)
110-
return
140+
// send current state to the output channel
141+
check.statechan <- newJobState(name, j)
142+
143+
// if test function
144+
if check.test(j) {
145+
close(check.statechan)
146+
return true
147+
}
148+
return false
111149
}

client/stop.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
package client
22

3-
import (
4-
"fmt"
5-
"os"
6-
7-
"github.com/coreos/fleet/job"
8-
)
3+
import "github.com/coreos/fleet/job"
94

105
// Stop sets target units to inactive and blocks until complete
116
func (c *FleetClient) Stop(target string) (err error) {
@@ -20,9 +15,10 @@ func (c *FleetClient) Stop(target string) (err error) {
2015
return err
2116
}
2217
}
23-
errchan := waitForJobStates(units, testJobStateInactive, 0, os.Stdout)
24-
for err := range errchan {
25-
return fmt.Errorf("error waiting for inactive: %v", err)
18+
check := newStateCheck(testJobStateInactive)
19+
err = waitForJobStates(units, check)
20+
if err != nil {
21+
return err
2622
}
2723
return nil
2824
}

0 commit comments

Comments
 (0)