Skip to content

Commit 6fdcf08

Browse files
author
smothiki
committed
feat(lock): add timeout to repository lock feature
1 parent ab10f74 commit 6fdcf08

6 files changed

Lines changed: 71 additions & 44 deletions

File tree

boot.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func main() {
5353
}
5454
fs := sys.RealFS()
5555
env := sys.RealEnv()
56-
pushLock := sshd.NewInMemoryRepositoryLock()
56+
pushLock := sshd.NewInMemoryRepositoryLock(cnf.GitLockTimeout())
5757
circ := sshd.NewCircuit()
5858

5959
storageParams, err := conf.GetStorageParams(env)

pkg/sshd/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,15 @@ type Config struct {
1414
StorageType string `envconfig:"BUILDER_STORAGE" default:"minio"`
1515
SlugBuilderImagePullPolicy string `envconfig:"SLUG_BUILDER_IMAGE_PULL_POLICY" default:"Always"`
1616
DockerBuilderImagePullPolicy string `envconfig:"DOCKER_BUILDER_IMAGE_PULL_POLICY" default:"Always"`
17+
LockTimeout int `envconfig:"GIT_LOCK_TIMEOUT" default:"10"`
1718
}
1819

1920
// CleanerPollSleepDuration returns c.CleanerPollSleepDurationSec as a time.Duration.
2021
func (c Config) CleanerPollSleepDuration() time.Duration {
2122
return time.Duration(c.CleanerPollSleepDurationSec) * time.Second
2223
}
24+
25+
//GitLockTimeout return LockTimeout in minutes
26+
func (c Config) GitLockTimeout() time.Duration {
27+
return time.Duration(c.LockTimeout) * time.Minute
28+
}

pkg/sshd/lock.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,57 @@ var (
1414
// RepositoryLock interface that allows the creation of a lock associated
1515
// with a repository name to avoid simultaneous git operations.
1616
type RepositoryLock interface {
17-
// Lock acquires a lock for a repository. In the case the repository is already locked
18-
// it waits until a timeout to get the lock. If it was not possible to get the
19-
// lock after the timeout an error is returned.
20-
Lock(repoName string, timeout time.Duration) error
17+
// Lock acquires a lock for a repository.
18+
Lock(repoName string) error
2119
// Unlock releases the lock for a repository or returns an error if the specified
22-
// name doesn't exist. In the case the repository is already locked it waits until
23-
// a timeout to get the lock. If it was not possible to get the lock after the timeout
24-
// an error is returned.
25-
Unlock(repoName string, timeout time.Duration) error
20+
// name doesn't exist.
21+
Unlock(repoName string) error
22+
// Timeout returns the time duration for which it has to hold the lock
23+
Timeout() time.Duration
2624
}
2725

28-
func wrapInLock(lck RepositoryLock, repoName string, timeout time.Duration, fn func() error) error {
29-
if err := lck.Lock(repoName, timeout); err != nil {
26+
func wrapInLock(lck RepositoryLock, repoName string, fn func() error) error {
27+
if err := lck.Lock(repoName); err != nil {
3028
return errAlreadyLocked
3129
}
32-
defer lck.Unlock(repoName, timeout)
33-
return fn()
30+
timer := time.NewTimer(lck.Timeout())
31+
defer timer.Stop()
32+
doneCh := make(chan struct{})
33+
fnCh := make(chan error)
34+
go func() {
35+
err := fn()
36+
select {
37+
case fnCh <- err:
38+
case <-doneCh:
39+
}
40+
}()
41+
select {
42+
case <-timer.C:
43+
lck.Unlock(repoName)
44+
defer close(doneCh)
45+
return fmt.Errorf("%s lock exceeded timout", repoName)
46+
case err := <-fnCh:
47+
return err
48+
}
3449
}
3550

3651
// NewInMemoryRepositoryLock returns a new instance of a RepositoryLock.
37-
func NewInMemoryRepositoryLock() RepositoryLock {
52+
func NewInMemoryRepositoryLock(timeout time.Duration) RepositoryLock {
3853
return &inMemoryRepoLock{
3954
mutex: &sync.RWMutex{},
4055
dataMap: make(map[string]bool),
56+
timeout: timeout,
4157
}
4258
}
4359

4460
type inMemoryRepoLock struct {
4561
mutex *sync.RWMutex
4662
dataMap map[string]bool
63+
timeout time.Duration
4764
}
4865

4966
// Lock aquires a lock associated with the specified name.
50-
// This implementation ignores the timeout.
51-
func (rl *inMemoryRepoLock) Lock(repoName string, timeout time.Duration) error {
67+
func (rl *inMemoryRepoLock) Lock(repoName string) error {
5268
rl.mutex.Lock()
5369
defer rl.mutex.Unlock()
5470

@@ -62,8 +78,8 @@ func (rl *inMemoryRepoLock) Lock(repoName string, timeout time.Duration) error {
6278
}
6379

6480
// Unlock releases the lock for a repository or returns an error if the specified name doesn't
65-
// exist. This implementation ignores the timeout.
66-
func (rl *inMemoryRepoLock) Unlock(repoName string, timeout time.Duration) error {
81+
// exist.
82+
func (rl *inMemoryRepoLock) Unlock(repoName string) error {
6783
rl.mutex.Lock()
6884
defer rl.mutex.Unlock()
6985

@@ -78,3 +94,8 @@ func (rl *inMemoryRepoLock) Unlock(repoName string, timeout time.Duration) error
7894

7995
return nil
8096
}
97+
98+
// Timeout returns the time duration for which a gitpush should hold the lock
99+
func (rl *inMemoryRepoLock) Timeout() time.Duration {
100+
return rl.timeout
101+
}

pkg/sshd/lock_test.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,59 +15,59 @@ const (
1515
func TestMultipleSameRepoLocks(t *testing.T) {
1616
var wg sync.WaitGroup
1717
const repo = "repo1"
18-
const numTries = 100
19-
lck := NewInMemoryRepositoryLock()
20-
assert.NoErr(t, lck.Lock(repo, 0*time.Second))
18+
const numTries = 0
19+
lck := NewInMemoryRepositoryLock(0)
20+
assert.NoErr(t, lck.Lock(repo))
2121
for i := 0; i < numTries; i++ {
2222
wg.Add(1)
2323
go func() {
2424
defer wg.Done()
25-
assert.True(t, lck.Lock(repo, 0*time.Second) != nil, "lock of already locked repo should return error")
25+
assert.True(t, lck.Lock(repo) != nil, "lock of already locked repo should return error")
2626
}()
2727
}
2828
assert.NoErr(t, waitWithTimeout(&wg, 1*time.Second))
29-
assert.NoErr(t, lck.Unlock(repo, 0*time.Second))
29+
assert.NoErr(t, lck.Unlock(repo))
3030
for i := 0; i < numTries; i++ {
3131
wg.Add(1)
3232
go func() {
3333
defer wg.Done()
34-
assert.True(t, lck.Unlock(repo, 0*time.Second) != nil, "unlock of already unlocked repo should return error")
34+
assert.True(t, lck.Unlock(repo) != nil, "unlock of already unlocked repo should return error")
3535
}()
3636
}
3737
assert.NoErr(t, waitWithTimeout(&wg, 1*time.Second))
3838
}
3939

4040
func TestSingleLock(t *testing.T) {
41-
rl := NewInMemoryRepositoryLock()
41+
rl := NewInMemoryRepositoryLock(0)
4242
key := "fakeid"
4343
callbackCh := make(chan interface{})
4444
go lockAndCallback(rl, key, callbackCh)
4545
verifyCallbackHappens(t, callbackCh)
4646
}
4747

4848
func TestSingleLockUnlock(t *testing.T) {
49-
rl := NewInMemoryRepositoryLock()
49+
rl := NewInMemoryRepositoryLock(0)
5050
key := "fakeid"
5151
callbackCh := make(chan interface{})
5252
go lockAndCallback(rl, key, callbackCh)
5353
verifyCallbackHappens(t, callbackCh)
54-
err := rl.Unlock(key, time.Duration(0))
54+
err := rl.Unlock(key)
5555
if err != nil {
5656
t.Fatalf("unexpected error %v", err)
5757
}
5858
}
5959

6060
func TestInvalidUnlock(t *testing.T) {
61-
rl := NewInMemoryRepositoryLock()
61+
rl := NewInMemoryRepositoryLock(0)
6262
key := "fakeid"
63-
err := rl.Unlock(key, time.Duration(0))
63+
err := rl.Unlock(key)
6464
if err == nil {
6565
t.Fatal("expected error but returned nil")
6666
}
6767
}
6868

6969
func TestDoubleLockUnlock(t *testing.T) {
70-
rl := NewInMemoryRepositoryLock()
70+
rl := NewInMemoryRepositoryLock(0)
7171
key := "fakeid"
7272
callbackCh1stLock := make(chan interface{})
7373
callbackCh2ndLock := make(chan interface{})
@@ -76,29 +76,29 @@ func TestDoubleLockUnlock(t *testing.T) {
7676
verifyCallbackHappens(t, callbackCh1stLock)
7777
go lockAndCallback(rl, key, callbackCh2ndLock)
7878
verifyCallbackDoesntHappens(t, callbackCh2ndLock)
79-
err := rl.Unlock(key, time.Duration(0))
79+
err := rl.Unlock(key)
8080
if err != nil {
8181
t.Fatalf("unexpected error %v", err)
8282
}
83-
err = rl.Unlock(key, time.Duration(0))
83+
err = rl.Unlock(key)
8484
if err == nil {
8585
t.Fatalf("expected error but returned nil")
8686
}
8787
}
8888

8989
func TestWrapInLock(t *testing.T) {
90-
lck := NewInMemoryRepositoryLock()
91-
assert.NoErr(t, wrapInLock(lck, "repo", 0*time.Second, func() error {
90+
lck := NewInMemoryRepositoryLock(0)
91+
assert.NoErr(t, wrapInLock(lck, "repo", func() error {
9292
return nil
9393
}))
94-
lck.Lock("repo", 0*time.Second)
95-
assert.Err(t, errAlreadyLocked, wrapInLock(lck, "repo", 0*time.Second, func() error {
94+
lck.Lock("repo")
95+
assert.Err(t, errAlreadyLocked, wrapInLock(lck, "repo", func() error {
9696
return nil
9797
}))
9898
}
9999

100100
func lockAndCallback(rl RepositoryLock, id string, callbackCh chan<- interface{}) {
101-
if err := rl.Lock(id, time.Duration(0)); err == nil {
101+
if err := rl.Lock(id); err == nil {
102102
callbackCh <- true
103103
}
104104
}

pkg/sshd/server.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"io/ioutil"
1111
"net"
1212
"strings"
13-
"time"
1413

1514
"github.com/deis/builder/pkg/controller"
1615
"github.com/deis/builder/pkg/git"
@@ -248,7 +247,7 @@ func (s *server) answer(channel ssh.Channel, requests <-chan *ssh.Request, conda
248247
channel.Stderr().Write([]byte("No repo given"))
249248
return err
250249
}
251-
wrapErr := wrapInLock(s.pushLock, repoName, time.Duration(0), s.runReceive(req, sshconn, channel, repoName, parts, condata))
250+
wrapErr := wrapInLock(s.pushLock, repoName, s.runReceive(req, sshconn, channel, repoName, parts, condata))
252251
if wrapErr == errAlreadyLocked {
253252
log.Info(multiplePush)
254253
// The error must be in git format
@@ -262,7 +261,7 @@ func (s *server) answer(channel ssh.Channel, requests <-chan *ssh.Request, conda
262261

263262
var xs uint32
264263
if wrapErr != nil {
265-
log.Err("Failed git receive: %v", err)
264+
log.Err("Failed git receive: %v", wrapErr)
266265
xs = 1
267266
}
268267
sendExitStatus(xs, channel)

pkg/sshd/server_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func TestGitPktLine(t *testing.T) {
2121
str := "hello world"
2222
err := gitPktLine(b, str)
2323
assert.NoErr(t, err)
24+
2425
outStr := string(b.Bytes())
2526
assert.True(t, len(outStr) > 4, "output string <= 4 chars")
2627
assert.Equal(t, outStr[:4], fmt.Sprintf("%04x", len(str)+4), "hex prefix")
@@ -62,7 +63,7 @@ func TestReceive(t *testing.T) {
6263
cfg.AddHostKey(key)
6364

6465
c := NewCircuit()
65-
pushLock := NewInMemoryRepositoryLock()
66+
pushLock := NewInMemoryRepositoryLock(0)
6667
runServer(cfg, c, pushLock, testingServerAddr, time.Duration(0), t)
6768

6869
// Give server time to initialize.
@@ -116,7 +117,7 @@ func TestPushInvalidArgsLength(t *testing.T) {
116117
cfg.AddHostKey(key)
117118

118119
c := NewCircuit()
119-
pushLock := NewInMemoryRepositoryLock()
120+
pushLock := NewInMemoryRepositoryLock(0)
120121
runServer(cfg, c, pushLock, testingServerAddr, 0*time.Second, t)
121122

122123
// Give server time to initialize.
@@ -152,7 +153,7 @@ func TestConcurrentPushSameRepo(t *testing.T) {
152153
cfg.AddHostKey(key)
153154

154155
c := NewCircuit()
155-
pushLock := NewInMemoryRepositoryLock()
156+
pushLock := NewInMemoryRepositoryLock(0)
156157
runServer(cfg, c, pushLock, testingServerAddr, 2*time.Second, t)
157158

158159
// Give server time to initialize.
@@ -216,7 +217,7 @@ func TestConcurrentPushDifferentRepo(t *testing.T) {
216217
assert.NoErr(t, err)
217218
cfg.AddHostKey(key)
218219
c := NewCircuit()
219-
pushLock := NewInMemoryRepositoryLock()
220+
pushLock := NewInMemoryRepositoryLock(time.Duration(1 * time.Minute))
220221
runServer(cfg, c, pushLock, testingServerAddr, time.Duration(0), t)
221222
time.Sleep(200 * time.Millisecond)
222223
assert.Equal(t, c.State(), ClosedState, "circuit state")

0 commit comments

Comments
 (0)