Skip to content

Commit f7363fa

Browse files
author
Gabriel Monroy
committed
fix(upstream): rebase against fleet upstream changes
1 parent 4c509b1 commit f7363fa

6 files changed

Lines changed: 94 additions & 38 deletions

File tree

client.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func NewClient() (*FleetClient, error) {
3434
if err != nil {
3535
return nil, err
3636
}
37+
// set global client
38+
cAPI = client
3739
return &FleetClient{Fleet: client}, nil
3840
}
3941

@@ -63,7 +65,7 @@ func (c *FleetClient) Create(component string, data bool) (err error) {
6365
if err != nil {
6466
return err
6567
}
66-
errchan := waitForJobStates(c.Fleet, []string{unitName}, testJobStateLoaded, 0, os.Stdout)
68+
errchan := waitForJobStates([]string{unitName}, testJobStateLoaded, 0, os.Stdout)
6769
for err := range errchan {
6870
return fmt.Errorf("error waiting for job %s: %v", unitName, err)
6971
}
@@ -169,7 +171,7 @@ func (c *FleetClient) Start(target string, data bool) (err error) {
169171
}
170172
}
171173
if data == false {
172-
errchan := waitForJobStates(c.Fleet, units, testUnitStateActive, 0, os.Stdout)
174+
errchan := waitForJobStates(units, testUnitStateActive, 0, os.Stdout)
173175
for err := range errchan {
174176
return fmt.Errorf("error waiting for active: %v", err)
175177
}
@@ -190,7 +192,7 @@ func (c *FleetClient) Stop(target string) (err error) {
190192
return err
191193
}
192194
}
193-
errchan := waitForJobStates(c.Fleet, units, testJobStateInactive, 0, os.Stdout)
195+
errchan := waitForJobStates(units, testJobStateInactive, 0, os.Stdout)
194196
for err := range errchan {
195197
return fmt.Errorf("error waiting for inactive: %v", err)
196198
}

fleet.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"net"
77
"net/http"
88
"strings"
9+
"time"
910

1011
"github.com/coreos/fleet/client"
12+
"github.com/coreos/fleet/machine"
1113
"github.com/coreos/fleet/registry"
1214
"github.com/coreos/fleet/ssh"
1315
)
@@ -25,6 +27,12 @@ var Flags = struct {
2527
Tunnel string
2628
}{}
2729

30+
// global API client used by commands
31+
var cAPI client.API
32+
// used to cache MachineStates
33+
var machineStates map[string]*machine.MachineState
34+
var requestTimeout time.Duration = time.Duration(10) * time.Second
35+
2836
func getTunnelFlag() string {
2937
tun := Flags.Tunnel
3038
if tun != "" && !strings.Contains(tun, ":") {
@@ -67,10 +75,9 @@ func getRegistryClient() (client.API, error) {
6775
InsecureSkipVerify: true,
6876
},
6977
}
70-
return client.NewRegistryClient(&trans, Flags.Endpoint, Flags.EtcdKeyPrefix)
78+
return client.NewRegistryClient(&trans, Flags.Endpoint, Flags.EtcdKeyPrefix, requestTimeout)
7179
}
7280

73-
7481
// randomMachineID return a random machineID from the Fleet cluster
7582
func randomMachineID(c *FleetClient) (machineID string, err error) {
7683
machineState, err := c.Fleet.Machines()

list.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ var (
3030
}
3131
return "-"
3232
},
33+
"dstate": func(j *job.Job, full bool) string {
34+
return string(j.TargetState)
35+
},
3336
"load": func(j *job.Job, full bool) string {
3437
us := j.UnitState
3538
if us == nil {
@@ -60,16 +63,34 @@ var (
6063
},
6164
"machine": func(j *job.Job, full bool) string {
6265
us := j.UnitState
63-
if us == nil || us.MachineState == nil {
66+
if us == nil || us.MachineID == "" {
6467
return "-"
6568
}
66-
return machineFullLegend(*us.MachineState, full)
69+
ms := cachedMachineState(us.MachineID)
70+
if ms == nil {
71+
ms = &machine.MachineState{ID: us.MachineID}
72+
}
73+
return machineFullLegend(*ms, full)
74+
},
75+
"tmachine": func(j *job.Job, full bool) string {
76+
if j.TargetMachineID == "" {
77+
return "-"
78+
}
79+
ms := cachedMachineState(j.TargetMachineID)
80+
if ms == nil {
81+
ms = &machine.MachineState{ID: j.TargetMachineID}
82+
}
83+
return machineFullLegend(*ms, full)
6784
},
6885
"hash": func(j *job.Job, full bool) string {
86+
us := j.UnitState
87+
if us == nil || us.UnitHash == "" {
88+
return "-"
89+
}
6990
if !full {
70-
return j.UnitHash.Short()
91+
return us.UnitHash[:7]
7192
}
72-
return j.UnitHash.String()
93+
return us.UnitHash
7394
},
7495
}
7596
)

ssh.go

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,30 @@ import (
1111
"github.com/coreos/fleet/ssh"
1212
)
1313

14-
// runCommand will attempt to run a command on a given machine
15-
// It will attempt to SSH to the machine if it is identified as being remote.
16-
func runCommand(cmd string, ms *machine.MachineState) (retcode int) {
14+
// runCommand will attempt to run a command on a given machine. It will attempt
15+
// to SSH to the machine if it is identified as being remote.
16+
func runCommand(cmd string, machID string) (retcode int) {
1717
var err error
18-
if machine.IsLocalMachineState(ms) {
18+
if machine.IsLocalMachineID(machID) {
1919
err, retcode = runLocalCommand(cmd)
2020
if err != nil {
2121
fmt.Printf("Error running local command: %v\n", err)
2222
}
2323
} else {
24-
err, retcode = runRemoteCommand(cmd, ms.PublicIP)
25-
if err != nil {
26-
fmt.Printf("Error running remote command: %v\n", err)
24+
ms, err := machineState(machID)
25+
if err != nil || ms == nil {
26+
fmt.Printf("Error getting machine IP: %v\n", err)
27+
} else {
28+
err, retcode = runRemoteCommand(cmd, ms.PublicIP)
29+
if err != nil {
30+
fmt.Printf("Error running remote command: %v\n", err)
31+
}
2732
}
2833
}
2934
return
3035
}
3136

32-
// runLocalCommand runs the given command locally
33-
// and returns any error encountered and the exit code of the command
37+
// runLocalCommand runs the given command locally and returns any error encountered and the exit code of the command
3438
func runLocalCommand(cmd string) (error, int) {
3539
cmdSlice := strings.Split(cmd, " ")
3640
osCmd := exec.Command(cmdSlice[0], cmdSlice[1:]...)
@@ -51,8 +55,8 @@ func runLocalCommand(cmd string) (error, int) {
5155
return nil, 0
5256
}
5357

54-
// runRemoteCommand runs the given command over SSH on the given IP
55-
// and returns any error encountered and the exit status of the command
58+
// runRemoteCommand runs the given command over SSH on the given IP, and returns
59+
// any error encountered and the exit status of the command
5660
func runRemoteCommand(cmd string, addr string) (err error, exit int) {
5761
var sshClient *ssh.SSHForwardingClient
5862
if tun := getTunnelFlag(); tun != "" {
@@ -63,6 +67,38 @@ func runRemoteCommand(cmd string, addr string) (err error, exit int) {
6367
if err != nil {
6468
return err, -1
6569
}
70+
6671
defer sshClient.Close()
72+
6773
return ssh.Execute(sshClient, cmd)
6874
}
75+
76+
func machineState(machID string) (*machine.MachineState, error) {
77+
machines, err := cAPI.Machines()
78+
if err != nil {
79+
return nil, err
80+
}
81+
for _, ms := range machines {
82+
if ms.ID == machID {
83+
return &ms, nil
84+
}
85+
}
86+
return nil, nil
87+
}
88+
89+
// cachedMachineState makes a best-effort to retrieve the MachineState of the given machine ID.
90+
// It memoizes MachineState information for the life of a fleetctl invocation.
91+
// Any error encountered retrieving the list of machines is ignored.
92+
func cachedMachineState(machID string) (ms *machine.MachineState) {
93+
if machineStates == nil {
94+
machineStates = make(map[string]*machine.MachineState)
95+
ms, err := cAPI.Machines()
96+
if err != nil {
97+
return nil
98+
}
99+
for i, m := range ms {
100+
machineStates[m.ID] = &ms[i]
101+
}
102+
}
103+
return machineStates[machID]
104+
}

state.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"sync"
88
"time"
99

10-
"github.com/coreos/fleet/client"
1110
"github.com/coreos/fleet/job"
1211
)
1312

@@ -38,12 +37,12 @@ func testUnitStateActive(j *job.Job) bool {
3837
// than zero. Returned is an error channel used to communicate when
3938
// timeouts occur. The returned error channel will be closed after all
4039
// polling operation is complete.
41-
func waitForJobStates(cAPI client.API, jobs []string, test testJob, maxAttempts int, out io.Writer) chan error {
40+
func waitForJobStates(jobs []string, test testJob, maxAttempts int, out io.Writer) chan error {
4241
errchan := make(chan error)
4342
var wg sync.WaitGroup
4443
for _, name := range jobs {
4544
wg.Add(1)
46-
go checkJobState(cAPI, name, test, maxAttempts, out, &wg, errchan)
45+
go checkJobState(name, test, maxAttempts, out, &wg, errchan)
4746
}
4847
go func() {
4948
wg.Wait()
@@ -52,19 +51,19 @@ func waitForJobStates(cAPI client.API, jobs []string, test testJob, maxAttempts
5251
return errchan
5352
}
5453

55-
func checkJobState(cAPI client.API, jobName string, test testJob, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
54+
func checkJobState(jobName string, test testJob, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
5655
defer wg.Done()
5756
sleep := 100 * time.Millisecond
5857
if maxAttempts < 1 {
5958
for {
60-
if assertJobState(cAPI, jobName, test, out) {
59+
if assertJobState(jobName, test, out) {
6160
return
6261
}
6362
time.Sleep(sleep)
6463
}
6564
} else {
6665
for attempt := 0; attempt < maxAttempts; attempt++ {
67-
if assertJobState(cAPI, jobName, test, out) {
66+
if assertJobState(jobName, test, out) {
6867
return
6968
}
7069
time.Sleep(sleep)
@@ -73,7 +72,7 @@ func checkJobState(cAPI client.API, jobName string, test testJob, maxAttempts in
7372
}
7473
}
7574

76-
func assertJobState(cAPI client.API, name string, test testJob, out io.Writer) (ret bool) {
75+
func assertJobState(name string, test testJob, out io.Writer) (ret bool) {
7776
j, err := cAPI.Job(name)
7877
if err != nil {
7978
fmt.Fprintf(os.Stderr, "Error retrieving Job(%s) from Registry: %v", name, err)
@@ -94,21 +93,12 @@ func assertJobState(cAPI client.API, name string, test testJob, out io.Writer) (
9493
msg = fmt.Sprintf("\033[0;33m%v:\033[0m %v", name, *(j.State))
9594
}
9695

97-
tgt, err := cAPI.JobTarget(name)
98-
if err != nil {
99-
fmt.Fprintf(os.Stderr, "Error retrieving target information for Job(%s) from Registry: %v", name, err)
100-
return
101-
}
102-
if tgt == "" {
103-
return
104-
}
105-
10696
machines, err := cAPI.Machines()
10797
if err != nil {
10898
fmt.Fprintf(os.Stderr, "Failed retrieving list of Machines from Registry: %v", err)
10999
}
110100
for _, ms := range machines {
111-
if ms.ID != tgt {
101+
if ms.ID != j.TargetMachineID {
112102
continue
113103
}
114104
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(ms, false))

status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@ func printUnitStatus(cAPI client.API, jobName string) int {
2222
return 1
2323
}
2424
cmd := fmt.Sprintf("systemctl status -l %s", jobName)
25-
return runCommand(cmd, j.UnitState.MachineState)
25+
return runCommand(cmd, j.UnitState.MachineID)
2626
}

0 commit comments

Comments
 (0)