Skip to content

Commit b2bd060

Browse files
author
Gabriel Monroy
committed
refactor(state): move to global error channel
1 parent a6dc5a9 commit b2bd060

4 files changed

Lines changed: 25 additions & 28 deletions

File tree

client/create.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ func (c *FleetClient) Create(target string) (err error) {
3030
if err != nil {
3131
return err
3232
}
33-
check := newStateCheck(testJobStateLoaded)
34-
err = waitForJobStates([]string{unitName}, check)
33+
err = waitForJobStates([]string{unitName}, jobStateLoaded)
3534
if err != nil {
3635
return err
3736
}

client/start.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ func (c *FleetClient) Start(target string) (err error) {
1515
return err
1616
}
1717
}
18-
check := newStateCheck(testUnitStateActive)
19-
err = waitForJobStates(units, check)
18+
err = waitForJobStates(units, unitStateActive)
2019
if err != nil {
2120
return err
2221
}

client/state.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,28 @@ import (
1010

1111
type testJob func(j *job.Job) bool
1212

13-
func testJobStateLoaded(j *job.Job) bool {
13+
func jobStateLoaded(j *job.Job) bool {
1414
if j == nil || j.State == nil {
1515
return false
1616
}
1717
return *(j.State) == job.JobStateLoaded
1818
}
1919

20-
func testJobStateLaunched(j *job.Job) bool {
20+
func jobStateLaunched(j *job.Job) bool {
2121
if j == nil || j.State == nil {
2222
return false
2323
}
2424
return *(j.State) == job.JobStateLaunched
2525
}
2626

27-
func testJobStateInactive(j *job.Job) bool {
27+
func jobStateInactive(j *job.Job) bool {
2828
if j == nil || j.State == nil {
2929
return false
3030
}
3131
return *(j.State) == job.JobStateInactive
3232
}
3333

34-
func testUnitStateActive(j *job.Job) bool {
34+
func unitStateActive(j *job.Job) bool {
3535
if j == nil || j.UnitState == nil {
3636
return false
3737
}
@@ -41,41 +41,41 @@ func testUnitStateActive(j *job.Job) bool {
4141

4242
// stateCheck defines how to monitor a job state
4343
type stateCheck struct {
44-
test testJob
45-
statechan chan *jobState
46-
errchan chan error
44+
test testJob
45+
state chan *jobState
4746
}
4847

4948
// newStateCheck returns a StateCheck struct with new channels for monitoring
5049
func newStateCheck(test testJob) *stateCheck {
51-
statechan := make(chan *jobState)
52-
errchan := make(chan error)
53-
return &stateCheck{test, statechan, errchan}
50+
state := make(chan *jobState)
51+
return &stateCheck{test, state}
5452
}
5553

5654
// waitForJobStates polls each of the indicated jobs until each of their
5755
// states is equal to that which the caller indicates via stateCheck test
58-
func waitForJobStates(jobs []string, check *stateCheck) error {
56+
func waitForJobStates(jobs []string, test testJob) error {
5957
var wg sync.WaitGroup
58+
errchan := make(chan error)
59+
check := newStateCheck(test)
6060

6161
// check each job with the stateCheck
6262
for _, name := range jobs {
6363
wg.Add(1)
64-
go checkJobState(name, check, &wg)
64+
go checkJobState(name, check, &wg, errchan)
6565
}
6666

6767
// wait for all jobs to complete
6868
go func() {
6969
wg.Wait()
70-
close(check.errchan)
70+
close(errchan)
7171
}()
7272

7373
// print output while jobs are transitioning
7474
defer fmt.Printf("\n")
7575
for {
7676
select {
7777
// read from state channel
78-
case state := <-check.statechan:
78+
case state := <-check.state:
7979
// return on closed channel
8080
if state == nil {
8181
return nil
@@ -89,18 +89,18 @@ func waitForJobStates(jobs []string, check *stateCheck) error {
8989
state.name, state.loaded, state.active, state.sub)
9090
}
9191
// read from error channel
92-
case err := <-check.errchan:
92+
case err := <-errchan:
9393
return err
9494
}
9595
time.Sleep(200 * time.Millisecond)
9696
}
9797
}
9898

99-
func checkJobState(jobName string, check *stateCheck, wg *sync.WaitGroup) {
99+
func checkJobState(jobName string, check *stateCheck, wg *sync.WaitGroup, errchan chan error) {
100100
defer wg.Done()
101101
sleep := 100 * time.Millisecond
102102
for {
103-
if assertJobState(jobName, check) {
103+
if assertJobState(jobName, check, errchan) {
104104
return
105105
}
106106
time.Sleep(sleep)
@@ -130,19 +130,19 @@ func newJobState(name string, j *job.Job) *jobState {
130130
return &jobState{name, loaded, active, sub}
131131
}
132132

133-
func assertJobState(name string, check *stateCheck) bool {
133+
func assertJobState(name string, check *stateCheck, errchan chan error) bool {
134134
j, err := cAPI.Job(name)
135135
if err != nil {
136-
check.errchan <- fmt.Errorf("Error retrieving Job(%s) from Registry: %v", name, err)
136+
errchan <- fmt.Errorf("Error retrieving Job(%s) from Registry: %v", name, err)
137137
return false
138138
}
139139

140140
// send current state to the output channel
141-
check.statechan <- newJobState(name, j)
141+
check.state <- newJobState(name, j)
142142

143-
// if test function
143+
// if state matches, close the channel and return
144144
if check.test(j) {
145-
close(check.statechan)
145+
close(check.state)
146146
return true
147147
}
148148
return false

client/stop.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ func (c *FleetClient) Stop(target string) (err error) {
1515
return err
1616
}
1717
}
18-
check := newStateCheck(testJobStateInactive)
19-
err = waitForJobStates(units, check)
18+
err = waitForJobStates(units, jobStateInactive)
2019
if err != nil {
2120
return err
2221
}

0 commit comments

Comments
 (0)