Skip to content

Commit 975ac6b

Browse files
committed
feat(deisctl) : add new feautes and update core-os updatectl to updateservicectl
add timeout for cmdInstall during updateservice function add lock instance during update move core-os updatectl package to updateservicectl package
1 parent b366679 commit 975ac6b

9 files changed

Lines changed: 337 additions & 28 deletions

File tree

cmd/cmd.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ func List(c client.Client) error {
1515
}
1616

1717
func PullImage(service string) error {
18-
dockercli, _, _ := utils.GetNewClient()
18+
//dockercli, _, _ := utils.GetNewClient()
1919
fmt.Println("pulling image :" + strings.Replace(strings.Split(service, ".")[0], "-", "/", 1) + ":latest")
20-
err := utils.PullImage(dockercli, strings.Replace(strings.Split(service, ".")[0], "-", "/", 1)+":latest")
20+
err := utils.PullImage(strings.Replace(strings.Split(service, ".")[0], "-", "/", 1) + ":latest")
2121
if err != nil {
2222
return err
2323
}

constant/constants.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package constant
22

3+
import "time"
4+
35
const (
4-
UnitsDir = "/var/lib/deis/units/"
5-
HooksDir = "/var/lib/deis/hooks/"
6-
Version = "/etc/deis-version"
7-
MachineId = "/etc/machine-id"
8-
UpdatekeyDir = "/deis/update/"
6+
UnitsDir = "/var/lib/deis/units/"
7+
HooksDir = "/var/lib/deis/hooks/"
8+
Version = "/etc/deis-version"
9+
MachineId = "/etc/machine-id"
10+
UpdatekeyDir = "/deis/update/"
11+
InitialInterval = time.Second * 10
12+
MaxInterval = time.Minute * 7
913
)

lock/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package lock
2+
3+
type LockClient interface {
4+
Init() error
5+
Get() (*Semaphore, error)
6+
Set(*Semaphore) error
7+
}

lock/etcd.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package lock
2+
3+
import (
4+
"encoding/json"
5+
6+
etcdError "github.com/coreos/locksmith/third_party/github.com/coreos/etcd/error"
7+
"github.com/coreos/locksmith/third_party/github.com/coreos/go-etcd/etcd"
8+
)
9+
10+
const (
11+
keyPrefix = "coreos.com/updateengine/rebootlock"
12+
holdersPrefix = keyPrefix + "/holders"
13+
SemaphorePrefix = keyPrefix + "/semaphore"
14+
)
15+
16+
// EtcdLockClient is a wrapper around the go-etcd client that provides
17+
// simple primitives to operate on the internal semaphore and holders
18+
// structs through etcd.
19+
type EtcdLockClient struct {
20+
client *etcd.Client
21+
}
22+
23+
func NewEtcdLockClient(machines []string) (client *EtcdLockClient, err error) {
24+
ec := etcd.NewClient(machines)
25+
client = &EtcdLockClient{ec}
26+
err = client.Init()
27+
28+
return client, err
29+
}
30+
31+
// Init sets an initial copy of the semaphore if it doesn't exist yet.
32+
func (c *EtcdLockClient) Init() (err error) {
33+
sem := newSemaphore()
34+
b, err := json.Marshal(sem)
35+
if err != nil {
36+
return err
37+
}
38+
39+
_, err = c.client.Create(SemaphorePrefix, string(b), 0)
40+
if err != nil {
41+
eerr, ok := err.(*etcd.EtcdError)
42+
if ok && eerr.ErrorCode == etcdError.EcodeNodeExist {
43+
return nil
44+
}
45+
}
46+
47+
return err
48+
}
49+
50+
// Get fetches the Semaphore from etcd.
51+
func (c *EtcdLockClient) Get() (sem *Semaphore, err error) {
52+
resp, err := c.client.Get(SemaphorePrefix, false, false)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
sem = &Semaphore{}
58+
err = json.Unmarshal([]byte(resp.Node.Value), sem)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
sem.Index = resp.Node.ModifiedIndex
64+
65+
return sem, nil
66+
}
67+
68+
// Set sets a Semaphore in etcd.
69+
func (c *EtcdLockClient) Set(sem *Semaphore) (err error) {
70+
b, err := json.Marshal(sem)
71+
if err != nil {
72+
return err
73+
}
74+
75+
_, err = c.client.CompareAndSwap(SemaphorePrefix, string(b), 0, "", sem.Index)
76+
77+
return err
78+
}

lock/lock.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package lock
2+
3+
type Lock struct {
4+
id string
5+
client LockClient
6+
}
7+
8+
func New(id string, client LockClient) (lock *Lock) {
9+
return &Lock{id, client}
10+
}
11+
12+
func (l *Lock) store(f func(*Semaphore) error) (err error) {
13+
sem, err := l.client.Get()
14+
if err != nil {
15+
return err
16+
}
17+
18+
if err := f(sem); err != nil {
19+
return err
20+
}
21+
22+
err = l.client.Set(sem)
23+
if err != nil {
24+
return err
25+
}
26+
27+
return nil
28+
}
29+
30+
func (l *Lock) Get() (sem *Semaphore, err error) {
31+
sem, err = l.client.Get()
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
return sem, nil
37+
}
38+
39+
func (l *Lock) SetMax(max int) (sem *Semaphore, oldMax int, err error) {
40+
var (
41+
semRet *Semaphore
42+
old int
43+
)
44+
45+
return semRet, old, l.store(func(sem *Semaphore) error {
46+
old = sem.Max
47+
semRet = sem
48+
return sem.SetMax(max)
49+
})
50+
}
51+
52+
func (l *Lock) Lock() (err error) {
53+
return l.store(func(sem *Semaphore) error {
54+
return sem.Lock(l.id)
55+
})
56+
}
57+
58+
func (l *Lock) Unlock() error {
59+
return l.store(func(sem *Semaphore) error {
60+
return sem.Unlock(l.id)
61+
})
62+
}

lock/semaphore.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package lock
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"fmt"
7+
"sort"
8+
)
9+
10+
var (
11+
ErrExist = errors.New("holder exists")
12+
ErrNotExist = errors.New("holder does not exist")
13+
)
14+
15+
type Semaphore struct {
16+
Index uint64 `json:"-"`
17+
Semaphore int `json:"semaphore"`
18+
Max int `json:"max"`
19+
Holders []string `json:"holders"`
20+
}
21+
22+
func (s *Semaphore) SetMax(max int) error {
23+
diff := s.Max - max
24+
25+
s.Semaphore = s.Semaphore - diff
26+
s.Max = s.Max - diff
27+
28+
return nil
29+
}
30+
31+
func (s *Semaphore) String() string {
32+
b, _ := json.Marshal(s)
33+
return string(b)
34+
}
35+
36+
func (s *Semaphore) addHolder(h string) error {
37+
loc := sort.SearchStrings(s.Holders, h)
38+
switch {
39+
case loc == len(s.Holders):
40+
s.Holders = append(s.Holders, h)
41+
case s.Holders[loc] == h:
42+
return ErrExist
43+
default:
44+
s.Holders = append(s.Holders[:loc], append([]string{h}, s.Holders[loc:]...)...)
45+
}
46+
47+
return nil
48+
}
49+
50+
func (s *Semaphore) removeHolder(h string) error {
51+
loc := sort.SearchStrings(s.Holders, h)
52+
if loc < len(s.Holders) && s.Holders[loc] == h {
53+
s.Holders = append(s.Holders[:loc], s.Holders[loc+1:]...)
54+
} else {
55+
return ErrNotExist
56+
}
57+
58+
return nil
59+
}
60+
61+
func (s *Semaphore) Lock(h string) error {
62+
if s.Semaphore <= 0 {
63+
return fmt.Errorf("semaphore is at %v", s.Semaphore)
64+
}
65+
66+
if err := s.addHolder(h); err != nil {
67+
return err
68+
}
69+
70+
s.Semaphore = s.Semaphore - 1
71+
72+
return nil
73+
}
74+
75+
func (s *Semaphore) Unlock(h string) error {
76+
if err := s.removeHolder(h); err != nil {
77+
return err
78+
}
79+
80+
s.Semaphore = s.Semaphore + 1
81+
82+
return nil
83+
}
84+
85+
func newSemaphore() (sem *Semaphore) {
86+
return &Semaphore{0, 1, 1, nil}
87+
}
88+
89+
type holder struct {
90+
ID string `json:"-"`
91+
StartTime int64 `json:"startTime"`
92+
}

updatectl/instance.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import (
1515

1616
"code.google.com/p/go-uuid/uuid"
1717
"github.com/coreos/go-omaha/omaha"
18-
update "github.com/coreos/updatectl/client/update/v1"
18+
update "github.com/coreos/updateservicectl/client/update/v1"
1919
"github.com/deis/deisctl/client"
2020
"github.com/deis/deisctl/cmd"
2121
"github.com/deis/deisctl/constant"
22+
"github.com/deis/deisctl/lock"
2223
"github.com/deis/deisctl/utils"
2324
)
2425

@@ -71,6 +72,14 @@ func init() {
7172
cmdInstanceDeis.Flags.StringVar(&instanceFlags.version, "version", utils.GetVersion(), "Version to report.")
7273
}
7374

75+
func expBackoff(interval time.Duration) time.Duration {
76+
interval = interval * 2
77+
if interval > constant.MaxInterval {
78+
interval = constant.MaxInterval
79+
}
80+
return interval
81+
}
82+
7483
//+ downloadDir + "deis.tar.gz"
7584

7685
type serverConfig struct {
@@ -86,6 +95,7 @@ type Client struct {
8695
config *serverConfig
8796
errorRate int
8897
pingsRemaining int
98+
lock *lock.Lock
8999
}
90100

91101
func (c *Client) Log(format string, v ...interface{}) {
@@ -102,6 +112,19 @@ func (c *Client) getCodebaseUrl(uc *omaha.UpdateCheck) string {
102112
return uc.Urls.Urls[0].CodeBase + uc.Manifest.Packages.Packages[0].Name
103113
}
104114

115+
func (c *Client) RequestLock() {
116+
elc, err := lock.NewEtcdLockClient(nil)
117+
if err != nil {
118+
fmt.Fprintln(os.Stderr, "Error initializing etcd client:", err)
119+
}
120+
var mID string
121+
mID = utils.GetMachineID("/")
122+
if mID == "" {
123+
fmt.Fprintln(os.Stderr, "Cannot read machine-id")
124+
}
125+
c.lock = lock.New(mID, elc)
126+
}
127+
105128
func (c *Client) updateservice() (err error) {
106129
fmt.Println("starting systemd units")
107130
// files, _ := utils.ListFiles(constant.UnitsDir + "*.service")
@@ -111,7 +134,6 @@ func (c *Client) updateservice() (err error) {
111134
Services := utils.GetServices()
112135
if localServices.Len() == 0 {
113136
fmt.Println("no local services")
114-
return
115137
}
116138
for _, service := range localServices {
117139
if strings.HasSuffix(service, "-data.service") {
@@ -122,8 +144,10 @@ func (c *Client) updateservice() (err error) {
122144
if err != nil {
123145
return err
124146
}
125-
time.Sleep(1 * time.Second)
126-
err = cmd.Install(deis, []string{localService})
147+
148+
err = utils.Timeout("Install unit"+service, 300*time.Second, func() {
149+
cmd.Install(deis, []string{localService})
150+
})
127151
if err != nil {
128152
return err
129153
}
@@ -284,6 +308,7 @@ func (c *Client) SetVersion(resp *omaha.Response) {
284308

285309
// Sleep between n and m seconds
286310
func (c *Client) Loop(n, m int) {
311+
interval := constant.InitialInterval
287312
for {
288313
randSleep(n, m)
289314
resp, err := c.MakeRequest("3", "2", true, false)
@@ -312,7 +337,27 @@ func (c *Client) Loop(n, m int) {
312337
continue
313338
}
314339
c.MakeRequest("14", "1", false, false)
340+
c.RequestLock()
341+
for {
342+
err = c.lock.Lock()
343+
if err != nil && err != lock.ErrExist {
344+
interval = expBackoff(interval)
345+
fmt.Printf("Retrying in %v. Error locking: %v\n", interval, err)
346+
time.Sleep(interval)
347+
continue
348+
} else {
349+
break
350+
}
351+
}
315352
c.SetVersion(resp)
353+
err = c.lock.Unlock()
354+
if err == lock.ErrNotExist {
355+
fmt.Println("no lock found")
356+
} else if err == nil {
357+
fmt.Println("Unlocked existing lock for this machine")
358+
} else {
359+
fmt.Fprintln(os.Stderr, "Error unlocking:", err)
360+
}
316361
}
317362
}
318363
}

updatectl/update.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package updatectl
33
import (
44
"flag"
55
"fmt"
6-
"github.com/coreos/updatectl/auth"
7-
"github.com/coreos/updatectl/client/update/v1"
6+
"github.com/coreos/updateservicectl/auth"
7+
"github.com/coreos/updateservicectl/client/update/v1"
88
"log"
99
"net/http"
1010
"os"

0 commit comments

Comments
 (0)