Skip to content

Commit be660e0

Browse files
committed
fix(publisher): use a mutex to protect against concurrent access errors
1 parent 7455c2f commit be660e0

1 file changed

Lines changed: 18 additions & 20 deletions

File tree

publisher/server/publisher.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package server
33
import (
44
"fmt"
55
"log"
6+
"net"
67
"os"
78
"regexp"
89
"strconv"
10+
"sync"
911
"time"
1012

1113
"github.com/coreos/go-etcd/etcd"
@@ -23,6 +25,11 @@ type Server struct {
2325
EtcdClient *etcd.Client
2426
}
2527

28+
var safeMap = struct {
29+
sync.RWMutex
30+
data map[string]string
31+
}{data: make(map[string]string)}
32+
2633
// Listen adds an event listener to the docker client and publishes containers that were started.
2734
func (s *Server) Listen(ttl time.Duration) {
2835
listener := make(chan *docker.APIEvents)
@@ -96,7 +103,9 @@ func (s *Server) publishContainer(container *docker.APIContainers, ttl time.Dura
96103
hostAndPort := host + ":" + port
97104
if s.IsPublishableApp(containerName) && s.IsPortOpen(hostAndPort) {
98105
s.setEtcd(keyPath, hostAndPort, uint64(ttl.Seconds()))
99-
s.setEtcd("/deis/publisher/containers/"+container.ID, appPath, uint64(ttl.Seconds()))
106+
safeMap.Lock()
107+
safeMap.data[container.ID] = appPath
108+
safeMap.Unlock()
100109
}
101110
// TODO: support multiple exposed ports
102111
break
@@ -106,16 +115,15 @@ func (s *Server) publishContainer(container *docker.APIContainers, ttl time.Dura
106115

107116
// removeContainer remove a container published by this component
108117
func (s *Server) removeContainer(event string) {
109-
containerPath := "/deis/publisher/containers/" + event
110-
appPath, err := s.getEtcd(containerPath)
111-
if err != nil {
112-
log.Println(err)
113-
return
114-
}
118+
safeMap.RLock()
119+
appPath := safeMap.data[event]
120+
safeMap.RUnlock()
115121

116-
keyPath := fmt.Sprintf("/deis/services/%s", appPath)
117-
s.removeEtcd(keyPath, false)
118-
s.removeEtcd(containerPath, false)
122+
if appPath != "" {
123+
keyPath := fmt.Sprintf("/deis/services/%s", appPath)
124+
log.Printf("stopped %s\n", keyPath)
125+
s.removeEtcd(keyPath, false)
126+
}
119127
}
120128

121129
// IsPublishableApp determines if the application should be published to etcd.
@@ -200,16 +208,6 @@ func (s *Server) setEtcd(key, value string, ttl uint64) {
200208
log.Println("set", key, "->", value)
201209
}
202210

203-
func (s *Server) getEtcd(key string) (string, error) {
204-
result, err := s.EtcdClient.Get(key, false, false)
205-
if err != nil {
206-
return "", err
207-
}
208-
209-
log.Println("get", key, "->", result.Node.Value)
210-
return result.Node.Value, nil
211-
}
212-
213211
// removeEtcd removes the corresponding etcd key
214212
func (s *Server) removeEtcd(key string, recursive bool) {
215213
if _, err := s.EtcdClient.Delete(key, recursive); err != nil {

0 commit comments

Comments
 (0)