Skip to content

Commit fb6f45c

Browse files
committed
bug(builder): avoid simultaneous git push
1 parent 3d5feb3 commit fb6f45c

3 files changed

Lines changed: 183 additions & 0 deletions

File tree

pkg/sshd/lock.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package sshd
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
)
8+
9+
// RepositoryLock interface that allows the creation of a lock associated
10+
// with a repository name to avoid simultaneous git operations
11+
type RepositoryLock interface {
12+
// Lock aquires a lock for a repository. In the case the repository is already locked
13+
// it waits until a timeout to get the lock. If it was not possible to get the
14+
// lock after the timeout an error is returned
15+
Lock(repoName string, timeout time.Duration) error
16+
// Unlock releases the lock for a repository or returns an error if the specified
17+
// name doesn't exist. In the case the repository is already locked it waits until
18+
// a timeout to get the lock. If it was not possible to get the lock after the timeout
19+
// an error is returned
20+
Unlock(repoName string, timeout time.Duration) error
21+
}
22+
23+
// NewInMemoryRepositoryLock returns a new instance of a RepositoryLock
24+
func NewInMemoryRepositoryLock() RepositoryLock {
25+
return &inMemoryRepoLock{
26+
mutex: &sync.RWMutex{},
27+
dataMap: make(map[string]bool),
28+
}
29+
}
30+
31+
type inMemoryRepoLock struct {
32+
mutex *sync.RWMutex
33+
dataMap map[string]bool
34+
}
35+
36+
// Lock aquires a lock associated with the specified name.
37+
// This implementation do not uses the timeout
38+
func (rl *inMemoryRepoLock) Lock(repoName string, timeout time.Duration) error {
39+
rl.mutex.Lock()
40+
defer rl.mutex.Unlock()
41+
42+
_, exists := rl.dataMap[repoName]
43+
if !exists {
44+
rl.dataMap[repoName] = true
45+
return nil
46+
}
47+
48+
return fmt.Errorf("repository %q already locked", repoName)
49+
}
50+
51+
// Unlock releases the lock for a repository or returns
52+
// an error if the specified name doesn't exist.
53+
// This implementation do not uses the timeout
54+
func (rl *inMemoryRepoLock) Unlock(repoName string, timeout time.Duration) error {
55+
rl.mutex.Lock()
56+
defer rl.mutex.Unlock()
57+
58+
locked, exists := rl.dataMap[repoName]
59+
if !exists {
60+
return fmt.Errorf("repository %q not found", repoName)
61+
}
62+
63+
if locked {
64+
delete(rl.dataMap, repoName)
65+
}
66+
67+
return nil
68+
}

pkg/sshd/lock_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package sshd
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
const (
9+
callbackTimeout = 1 * time.Second
10+
)
11+
12+
func TestSingleLock(t *testing.T) {
13+
rl := NewInMemoryRepositoryLock()
14+
key := "fakeid"
15+
callbackCh := make(chan interface{})
16+
go lockAndCallback(rl, key, callbackCh)
17+
verifyCallbackHappens(t, callbackCh)
18+
}
19+
20+
func TestSingleLockUnlock(t *testing.T) {
21+
rl := NewInMemoryRepositoryLock()
22+
key := "fakeid"
23+
callbackCh := make(chan interface{})
24+
go lockAndCallback(rl, key, callbackCh)
25+
verifyCallbackHappens(t, callbackCh)
26+
err := rl.Unlock(key, time.Duration(0))
27+
if err != nil {
28+
t.Fatalf("unexpected error %v", err)
29+
}
30+
}
31+
32+
func TestInvalidUnlock(t *testing.T) {
33+
rl := NewInMemoryRepositoryLock()
34+
key := "fakeid"
35+
err := rl.Unlock(key, time.Duration(0))
36+
if err == nil {
37+
t.Fatalf("expected error but returned nil", err)
38+
}
39+
}
40+
41+
func TestDoubleLockUnlock(t *testing.T) {
42+
rl := NewInMemoryRepositoryLock()
43+
key := "fakeid"
44+
callbackCh1stLock := make(chan interface{})
45+
callbackCh2ndLock := make(chan interface{})
46+
47+
go lockAndCallback(rl, key, callbackCh1stLock)
48+
verifyCallbackHappens(t, callbackCh1stLock)
49+
go lockAndCallback(rl, key, callbackCh2ndLock)
50+
verifyCallbackDoesntHappens(t, callbackCh2ndLock)
51+
err := rl.Unlock(key, time.Duration(0))
52+
if err != nil {
53+
t.Fatalf("unexpected error %v", err)
54+
}
55+
err = rl.Unlock(key, time.Duration(0))
56+
if err == nil {
57+
t.Fatalf("expected error but returned nil")
58+
}
59+
}
60+
61+
func lockAndCallback(rl RepositoryLock, id string, callbackCh chan<- interface{}) {
62+
if err := rl.Lock(id, time.Duration(0)); err == nil {
63+
callbackCh <- true
64+
}
65+
}
66+
67+
func verifyCallbackHappens(t *testing.T, callbackCh <-chan interface{}) bool {
68+
select {
69+
case <-callbackCh:
70+
return true
71+
case <-time.After(callbackTimeout):
72+
t.Fatalf("Timed out waiting for callback.")
73+
return false
74+
}
75+
}
76+
77+
func verifyCallbackDoesntHappens(t *testing.T, callbackCh <-chan interface{}) bool {
78+
select {
79+
case <-callbackCh:
80+
t.Fatalf("Unexpected callback.")
81+
return false
82+
case <-time.After(callbackTimeout):
83+
return true
84+
}
85+
}

pkg/sshd/server.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ package sshd
99

1010
import (
1111
"fmt"
12+
"io"
1213
"net"
1314
"strings"
1415
"sync"
1516
"text/template"
17+
"time"
1618

1719
"github.com/Masterminds/cookoo"
1820
"github.com/Masterminds/cookoo/log"
@@ -27,6 +29,12 @@ const (
2729
Address string = "ssh.Address"
2830
// ServerConfig is the context key for ServerConfig object.
2931
ServerConfig string = "ssh.ServerConfig"
32+
33+
multiplePush string = "Another git push is ongoing"
34+
)
35+
36+
var (
37+
buildingRepos = NewInMemoryRepositoryLock()
3038
)
3139

3240
// Serve starts a native SSH server.
@@ -215,6 +223,19 @@ func (s *server) answer(channel ssh.Channel, requests <-chan *ssh.Request, sshCo
215223
req.Reply(ok, nil)
216224
break
217225
}
226+
227+
repoName := parts[1]
228+
if err := buildingRepos.Lock(repoName, time.Duration(0)); err != nil {
229+
log.Errf(s.c, multiplePush)
230+
// The error must be in git format
231+
if err := gitPktLine(channel, fmt.Sprintf("ERR %v\n", multiplePush)); err != nil {
232+
log.Errf(s.c, "Failed to write to channel: %s", err)
233+
}
234+
sendExitStatus(1, channel)
235+
req.Reply(false, nil)
236+
return nil
237+
}
238+
218239
req.Reply(true, nil) // We processed. Yay.
219240

220241
cxt.Put("channel", channel)
@@ -223,12 +244,14 @@ func (s *server) answer(channel ssh.Channel, requests <-chan *ssh.Request, sshCo
223244
cxt.Put("repository", parts[1])
224245
sshGitReceive := cxt.Get("route.sshd.sshGitReceive", "sshGitReceive").(string)
225246
err := router.HandleRequest(sshGitReceive, cxt, true)
247+
buildingRepos.Unlock(repoName, time.Duration(0))
226248
var xs uint32
227249
if err != nil {
228250
log.Errf(s.c, "Failed git receive: %v", err)
229251
xs = 1
230252
}
231253
sendExitStatus(xs, channel)
254+
232255
return nil
233256
default:
234257
log.Warnf(s.c, "Illegal command is '%s'\n", clean)
@@ -296,3 +319,10 @@ func Ping(c cookoo.Context, p *cookoo.Params) (interface{}, cookoo.Interrupt) {
296319
req.Reply(true, nil)
297320
return nil, nil
298321
}
322+
323+
// gitPktLine writes a line following the git protocol
324+
// https://github.com/git/git/blob/master/Documentation/technical/pack-protocol.txt
325+
func gitPktLine(w io.Writer, s string) error {
326+
_, err := fmt.Fprintf(w, "%04x%s", len(s)+4, s)
327+
return err
328+
}

0 commit comments

Comments
 (0)