Skip to content

Commit 13f56ee

Browse files
author
Gabriel Monroy
committed
ref(deisctl): switch to async interface for backends
1 parent 8d647dc commit 13f56ee

9 files changed

Lines changed: 485 additions & 517 deletions

File tree

deisctl/backend/backend.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package backend
22

3+
import "sync"
4+
35
// Backend interface is used to interact with the cluster control plane
46
type Backend interface {
5-
Create([]string) error
6-
Destroy([]string) error
7-
Start([]string) error
8-
Stop([]string) error
7+
Create([]string, *sync.WaitGroup, chan string, chan error)
8+
Destroy([]string, *sync.WaitGroup, chan string, chan error)
9+
Start([]string, *sync.WaitGroup, chan string, chan error)
10+
Stop([]string, *sync.WaitGroup, chan string, chan error)
911
Scale(string, int) error
1012
ListUnits() error
1113
ListUnitFiles() error

deisctl/backend/fleet/create.go

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,58 @@ package fleet
33
import (
44
"fmt"
55
"strings"
6+
"sync"
67

78
"github.com/coreos/fleet/job"
89
"github.com/coreos/fleet/schema"
910
"github.com/coreos/fleet/unit"
1011
)
1112

1213
// Create schedules a new unit for the given component
13-
// and blocks until the unit is loaded
14-
func (c *FleetClient) Create(targets []string) error {
14+
func (c *FleetClient) Create(targets []string, wg *sync.WaitGroup, outchan chan string, errchan chan error) {
15+
1516
units := make([]*schema.Unit, len(targets))
16-
desiredState := string(job.JobStateLoaded)
17+
1718
for i, target := range targets {
1819
unitName, unitFile, err := c.createUnitFile(target)
1920
if err != nil {
20-
return err
21+
errchan <- err
22+
return
2123
}
2224
units[i] = &schema.Unit{
2325
Name: unitName,
2426
Options: schema.MapUnitFileToSchemaUnitOptions(unitFile),
2527
}
2628
}
29+
2730
for _, unit := range units {
28-
// schedule unit
29-
if err := c.Fleet.CreateUnit(unit); err != nil {
30-
// ignore units that already exist
31-
if err.Error() != "job already exists" {
32-
return fmt.Errorf("failed creating job %s: %v", unit.Name, err)
33-
}
34-
}
35-
if err := c.Fleet.SetUnitTargetState(unit.Name, desiredState); err != nil {
36-
return err
37-
}
31+
wg.Add(1)
32+
go doCreate(c, unit, wg, outchan, errchan)
3833
}
39-
for _, unit := range units {
40-
outchan, errchan := waitForUnitStates([]string{unit.Name}, desiredState)
41-
if err := printUnitState(unit.Name, outchan, errchan); err != nil {
42-
return err
34+
}
35+
36+
func doCreate(c *FleetClient, unit *schema.Unit, wg *sync.WaitGroup, outchan chan string, errchan chan error) {
37+
defer wg.Done()
38+
39+
// create unit definition
40+
if err := c.Fleet.CreateUnit(unit); err != nil {
41+
// ignore units that already exist
42+
if err.Error() != "job already exists" {
43+
errchan <- err
44+
return
4345
}
4446
}
45-
return nil
47+
48+
desiredState := string(job.JobStateLoaded)
49+
out := fmt.Sprintf("\033[0;33m%v:\033[0m loaded \r", unit.Name)
50+
51+
// schedule the unit
52+
if err := c.Fleet.SetUnitTargetState(unit.Name, desiredState); err != nil {
53+
errchan <- err
54+
return
55+
}
56+
57+
outchan <- out
4658
}
4759

4860
func (c *FleetClient) createUnitFile(target string) (unitName string, uf *unit.UnitFile, err error) {

deisctl/backend/fleet/destroy.go

Lines changed: 42 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,72 +3,62 @@ package fleet
33
import (
44
"fmt"
55
"strings"
6-
7-
"github.com/coreos/fleet/job"
6+
"sync"
7+
"time"
88
)
99

1010
// Destroy units for a given target
11-
func (c *FleetClient) Destroy(targets []string) error {
11+
func (c *FleetClient) Destroy(targets []string, wg *sync.WaitGroup, outchan chan string, errchan chan error) {
1212
for _, target := range targets {
13-
// check if the unit exists
14-
_, err := c.Units(target)
15-
if err != nil {
16-
if strings.Contains(err.Error(), "could not find unit") {
17-
return nil
18-
}
19-
return err
20-
}
21-
component, num, err := splitTarget(target)
22-
if err != nil {
23-
return err
24-
}
25-
if strings.HasSuffix(component, "-data") {
26-
err = c.destroyDataUnit(component)
27-
} else {
28-
err = c.destroyServiceUnit(component, num)
29-
}
30-
if err != nil && !strings.Contains(err.Error(), "could not find unit") {
31-
return err
32-
}
13+
wg.Add(1)
14+
go doDestroy(c, target, wg, outchan, errchan)
3315
}
34-
return nil
16+
return
3517
}
3618

37-
func (c *FleetClient) destroyServiceUnit(component string, num int) error {
38-
name, err := formatUnitName(component, num)
19+
func doDestroy(c *FleetClient, target string, wg *sync.WaitGroup, outchan chan string, errchan chan error) {
20+
defer wg.Done()
21+
22+
// prepare string representation
23+
component, num, err := splitTarget(target)
3924
if err != nil {
40-
return err
25+
errchan <- err
26+
return
4127
}
42-
desiredState := string(job.JobStateInactive)
43-
err = c.Fleet.SetUnitTargetState(name, desiredState)
28+
name, err := formatUnitName(component, num)
4429
if err != nil {
45-
return err
46-
}
47-
outchan, errchan := waitForUnitStates([]string{name}, desiredState)
48-
if err := printUnitState(name, outchan, errchan); err != nil {
49-
return err
30+
errchan <- err
31+
return
5032
}
51-
if err = c.Fleet.DestroyUnit(name); err != nil {
52-
return fmt.Errorf("failed destroying job %s: %v", name, err)
53-
}
54-
return nil
55-
}
33+
destroyed := fmt.Sprintf("\033[0;33m%v:\033[0m destroyed \r", name)
5634

57-
func (c *FleetClient) destroyDataUnit(component string) error {
58-
name, err := formatUnitName(component, 0)
59-
desiredState := string(job.JobStateInactive)
35+
// bail early if unit doesn't exist
36+
_, err = c.Units(name)
6037
if err != nil {
61-
return err
62-
}
63-
if err := c.Fleet.SetUnitTargetState(name, desiredState); err != nil {
64-
return err
38+
if strings.Contains(err.Error(), "could not find unit") {
39+
outchan <- destroyed
40+
}
41+
return
6542
}
66-
outchan, errchan := waitForUnitStates([]string{name}, desiredState)
67-
if err := printUnitState(name, outchan, errchan); err != nil {
68-
return err
43+
44+
// otherwise destroy it
45+
if err = c.Fleet.DestroyUnit(name); err != nil {
46+
// ignore already destroyed units
47+
if !strings.Contains(err.Error(), "could not find unit") {
48+
errchan <- err
49+
return
50+
}
6951
}
70-
if err := c.Fleet.DestroyUnit(name); err != nil {
71-
return fmt.Errorf("failed destroying job %s: %v", name, err)
52+
53+
// loop until actually destroyed
54+
for {
55+
_, err = c.Units(name)
56+
if err != nil {
57+
if strings.Contains(err.Error(), "could not find unit") {
58+
outchan <- destroyed
59+
return
60+
}
61+
}
62+
time.Sleep(250 * time.Millisecond)
7263
}
73-
return nil
7464
}

deisctl/backend/fleet/scale.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@ import (
55
"math"
66
"strconv"
77
"strings"
8+
"sync"
89
)
910

1011
// Scale creates or destroys units to match the desired number
1112
func (c *FleetClient) Scale(component string, requested int) error {
13+
14+
outchan := make(chan string)
15+
errchan := make(chan error)
16+
var wg sync.WaitGroup
17+
1218
if requested < 0 {
1319
return errors.New("cannot scale below 0")
1420
}
@@ -22,27 +28,29 @@ func (c *FleetClient) Scale(component string, requested int) error {
2228
}
2329

2430
timesToScale := int(math.Abs(float64(requested - len(components))))
31+
if timesToScale == 0 {
32+
return nil
33+
}
2534
if requested-len(components) > 0 {
26-
return scaleUp(c, component, len(components), timesToScale)
27-
} else {
28-
return scaleDown(c, component, len(components), timesToScale)
35+
return scaleUp(c, component, len(components), timesToScale, &wg, outchan, errchan)
2936
}
37+
return scaleDown(c, component, len(components), timesToScale, &wg, outchan, errchan)
3038
}
3139

32-
func scaleUp(c *FleetClient, component string, numExistingContainers, numTimesToScale int) error {
40+
func scaleUp(c *FleetClient, component string, numExistingContainers, numTimesToScale int,
41+
wg *sync.WaitGroup, outchan chan string, errchan chan error) error {
3342
for i := 0; i < numTimesToScale; i++ {
34-
if err := c.Create([]string{component + "@" + strconv.Itoa(numExistingContainers+i+1)}); err != nil {
35-
return err
36-
}
43+
target := component + "@" + strconv.Itoa(numExistingContainers+i+1)
44+
c.Create([]string{target}, wg, outchan, errchan)
3745
}
3846
return nil
3947
}
4048

41-
func scaleDown(c *FleetClient, component string, numExistingContainers, numTimesToScale int) error {
49+
func scaleDown(c *FleetClient, component string, numExistingContainers, numTimesToScale int,
50+
wg *sync.WaitGroup, outchan chan string, errchan chan error) error {
4251
for i := 0; i < numTimesToScale; i++ {
43-
if err := c.Destroy([]string{component + "@" + strconv.Itoa(numExistingContainers-i)}); err != nil {
44-
return err
45-
}
52+
target := component + "@" + strconv.Itoa(numExistingContainers-i)
53+
c.Destroy([]string{target}, wg, outchan, errchan)
4654
}
4755
return nil
4856
}

deisctl/backend/fleet/start.go

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,87 @@
11
package fleet
22

33
import (
4+
"fmt"
45
"strings"
6+
"sync"
7+
"time"
58

6-
"github.com/coreos/fleet/job"
79
"github.com/coreos/fleet/schema"
810
)
911

10-
// Start launches target units and blocks until active
11-
func (c *FleetClient) Start(targets []string) error {
12-
units := make([][]string, len(targets))
13-
for i, target := range targets {
14-
u, err := c.Units(target)
12+
// Start units and wait for their desiredState
13+
func (c *FleetClient) Start(targets []string, wg *sync.WaitGroup, outchan chan string, errchan chan error) {
14+
for _, target := range targets {
15+
wg.Add(1)
16+
go doStart(c, target, wg, outchan, errchan)
17+
}
18+
return
19+
}
20+
21+
func doStart(c *FleetClient, target string, wg *sync.WaitGroup, outchan chan string, errchan chan error) {
22+
defer wg.Done()
23+
24+
// prepare string representation
25+
component, num, err := splitTarget(target)
26+
if err != nil {
27+
errchan <- err
28+
return
29+
}
30+
name, err := formatUnitName(component, num)
31+
if err != nil {
32+
errchan <- err
33+
return
34+
}
35+
36+
requestState := "launched"
37+
var desiredState string
38+
if strings.Contains(name, "-data.service") {
39+
desiredState = "exited"
40+
} else {
41+
desiredState = "running"
42+
}
43+
44+
if err := c.Fleet.SetUnitTargetState(name, requestState); err != nil {
45+
errchan <- err
46+
return
47+
}
48+
49+
// start with the likely subState to avoid sending it across the channel
50+
lastSubState := "dead"
51+
52+
for {
53+
// poll for unit states
54+
states, err := c.Fleet.UnitStates()
1555
if err != nil {
16-
return err
56+
errchan <- err
57+
return
1758
}
18-
units[i] = u
19-
}
20-
desiredState := string(job.JobStateLaunched)
21-
for _, names := range units {
22-
for _, name := range names {
23-
if err := c.Fleet.SetUnitTargetState(name, desiredState); err != nil {
24-
return err
59+
60+
// FIXME: fleet UnitStates API forces us to iterate for now
61+
var currentState *schema.UnitState
62+
for _, s := range states {
63+
if name == s.Name {
64+
currentState = s
65+
break
2566
}
2667
}
27-
}
28-
var errchan chan error
29-
var outchan chan *schema.Unit
30-
for _, names := range units {
31-
for _, name := range names {
32-
// wait for systemd to tell us that it's running, not fleet
33-
// data containers are special snowflakes who just exit
34-
if strings.Contains(name, "-data.service") {
35-
outchan, errchan = waitForUnitSubStates(names, "exited")
36-
} else {
37-
outchan, errchan = waitForUnitSubStates(names, "running")
38-
}
39-
if err := printUnitSubState(name, outchan, errchan); err != nil {
40-
return err
41-
}
68+
if currentState == nil {
69+
errchan <- fmt.Errorf("could not find unit: %v", name)
70+
return
4271
}
72+
73+
// if subState changed, send it across the output channel
74+
if lastSubState != currentState.SystemdSubState {
75+
outchan <- fmt.Sprintf("\033[0;33m%v:\033[0m %v/%v \r",
76+
name, currentState.SystemdActiveState, currentState.SystemdSubState)
77+
}
78+
79+
// break when desired state is reached
80+
if currentState.SystemdSubState == desiredState {
81+
return
82+
}
83+
84+
lastSubState = currentState.SystemdSubState
85+
time.Sleep(250 * time.Millisecond)
4386
}
44-
return nil
4587
}

0 commit comments

Comments
 (0)