Skip to content

Commit af6c307

Browse files
author
Matthew Fisher
committed
Merge pull request #2413 from bacongobbler/1149-etcd-publish
fix(publisher): do not publish older app versions
2 parents 51d09e3 + 75536b5 commit af6c307

5 files changed

Lines changed: 252 additions & 140 deletions

File tree

publisher/Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ start: check-deisctl
4545
stop: check-deisctl
4646
deisctl stop publisher
4747

48-
test:
49-
@echo no unit tests
48+
test: test-unit
49+
50+
test-unit:
51+
godep go test -v ./...
5052

5153
uninstall: check-deisctl
5254
deisctl uninstall publisher

publisher/main.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"os"
6+
"time"
7+
8+
"github.com/coreos/go-etcd/etcd"
9+
"github.com/fsouza/go-dockerclient"
10+
11+
"github.com/deis/deis/publisher/server"
12+
)
13+
14+
const (
15+
timeout time.Duration = 10 * time.Second
16+
etcdTTL time.Duration = timeout * 2
17+
)
18+
19+
func getopt(name, dfault string) string {
20+
value := os.Getenv(name)
21+
if value == "" {
22+
value = dfault
23+
}
24+
return value
25+
}
26+
27+
func main() {
28+
endpoint := getopt("DOCKER_HOST", "unix:///var/run/docker.sock")
29+
etcdHost := getopt("ETCD_HOST", "127.0.0.1")
30+
31+
client, err := docker.NewClient(endpoint)
32+
if err != nil {
33+
log.Fatal(err)
34+
}
35+
etcdClient := etcd.NewClient([]string{"http://" + etcdHost + ":4001"})
36+
37+
server := &server.Server{client, etcdClient}
38+
39+
go server.Listen(etcdTTL)
40+
41+
for {
42+
go server.Poll(etcdTTL)
43+
time.Sleep(timeout)
44+
}
45+
}

publisher/publisher.go

Lines changed: 0 additions & 138 deletions
This file was deleted.

publisher/server/publisher.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package server
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"log"
7+
"os"
8+
"regexp"
9+
"strconv"
10+
"time"
11+
12+
"github.com/coreos/go-etcd/etcd"
13+
"github.com/fsouza/go-dockerclient"
14+
)
15+
16+
const (
17+
appNameRegex string = `([a-z0-9-]+)_v([1-9][0-9]*).(cmd|web).([1-9][0-9])*`
18+
)
19+
20+
// Server is the main entrypoint for a publisher. It listens on a docker client for events
21+
// and publishes their host:port to the etcd client.
22+
type Server struct {
23+
DockerClient *docker.Client
24+
EtcdClient *etcd.Client
25+
}
26+
27+
// Listen adds an event listener to the docker client and publishes containers that were started.
28+
func (s *Server) Listen(ttl time.Duration) {
29+
listener := make(chan *docker.APIEvents)
30+
// TODO: figure out why we need to sleep for 10 milliseconds
31+
// https://github.com/fsouza/go-dockerclient/blob/0236a64c6c4bd563ec277ba00e370cc753e1677c/event_test.go#L43
32+
defer func() { time.Sleep(10 * time.Millisecond); s.DockerClient.RemoveEventListener(listener) }()
33+
if err := s.DockerClient.AddEventListener(listener); err != nil {
34+
log.Fatal(err)
35+
}
36+
for {
37+
select {
38+
case event := <-listener:
39+
if event.Status == "start" {
40+
container, err := s.getContainer(event.ID)
41+
if err != nil {
42+
log.Println(err)
43+
continue
44+
}
45+
s.publishContainer(container, ttl)
46+
}
47+
}
48+
}
49+
}
50+
51+
// Poll lists all containers from the docker client every time the TTL comes up and publishes them to etcd
52+
func (s *Server) Poll(ttl time.Duration) {
53+
containers, err := s.DockerClient.ListContainers(docker.ListContainersOptions{})
54+
if err != nil {
55+
log.Fatal(err)
56+
}
57+
for _, container := range containers {
58+
// send container to channel for processing
59+
s.publishContainer(&container, ttl)
60+
}
61+
}
62+
63+
// getContainer retrieves a container from the docker client based on id
64+
func (s *Server) getContainer(id string) (*docker.APIContainers, error) {
65+
containers, err := s.DockerClient.ListContainers(docker.ListContainersOptions{})
66+
if err != nil {
67+
return nil, err
68+
}
69+
for _, container := range containers {
70+
// send container to channel for processing
71+
if container.ID == id {
72+
return &container, nil
73+
}
74+
}
75+
return nil, errors.New("could not find container")
76+
}
77+
78+
// publishContainer publishes the docker container to etcd.
79+
func (s *Server) publishContainer(container *docker.APIContainers, ttl time.Duration) {
80+
r := regexp.MustCompile(appNameRegex)
81+
host := os.Getenv("HOST")
82+
for _, name := range container.Names {
83+
// HACK: remove slash from container name
84+
// see https://github.com/docker/docker/issues/7519
85+
containerName := name[1:]
86+
match := r.FindStringSubmatch(containerName)
87+
if match == nil {
88+
continue
89+
}
90+
appName := match[1]
91+
keyPath := fmt.Sprintf("/deis/services/%s/%s", appName, containerName)
92+
for _, p := range container.Ports {
93+
port := strconv.Itoa(int(p.PublicPort))
94+
if s.IsPublishableApp(containerName) {
95+
s.setEtcd(keyPath, host+":"+port, uint64(ttl.Seconds()))
96+
}
97+
// TODO: support multiple exposed ports
98+
break
99+
}
100+
}
101+
}
102+
103+
// isPublishableApp determines if the application should be published to etcd.
104+
func (s *Server) IsPublishableApp(name string) bool {
105+
r := regexp.MustCompile(appNameRegex)
106+
match := r.FindStringSubmatch(name)
107+
if match == nil {
108+
return false
109+
}
110+
appName := match[1]
111+
version, err := strconv.Atoi(match[2])
112+
if err != nil {
113+
log.Println(err)
114+
return false
115+
}
116+
if version >= latestRunningVersion(s.EtcdClient, appName) {
117+
return true
118+
} else {
119+
return false
120+
}
121+
}
122+
123+
// latestRunningVersion retrieves the highest version of the application published
124+
// to etcd. If no app has been published, returns 0.
125+
func latestRunningVersion(client *etcd.Client, appName string) int {
126+
r := regexp.MustCompile(appNameRegex)
127+
if client == nil {
128+
// FIXME: client should only be nil during tests. This should be properly refactored.
129+
if appName == "ceci-nest-pas-une-app" {
130+
return 3
131+
}
132+
return 0
133+
}
134+
resp, err := client.Get(fmt.Sprintf("/deis/services/%s", appName), false, true)
135+
if err != nil {
136+
// no app has been published here (key not found) or there was an error
137+
return 0
138+
}
139+
var versions []int
140+
for _, node := range resp.Node.Nodes {
141+
match := r.FindStringSubmatch(node.Key)
142+
// account for keys that may not be an application container
143+
if match == nil {
144+
continue
145+
}
146+
version, err := strconv.Atoi(match[2])
147+
if err != nil {
148+
log.Println(err)
149+
return 0
150+
}
151+
versions = append(versions, version)
152+
}
153+
return max(versions)
154+
}
155+
156+
// max returns the maximum value in n
157+
func max(n []int) int {
158+
val := 0
159+
for _, i := range n {
160+
if i > val {
161+
val = i
162+
}
163+
}
164+
return val
165+
}
166+
167+
// setEtcd sets the corresponding etcd key with the value and ttl
168+
func (s *Server) setEtcd(key, value string, ttl uint64) {
169+
if _, err := s.EtcdClient.Set(key, value, ttl); err != nil {
170+
log.Println(err)
171+
}
172+
log.Println("set", key, "->", value)
173+
}

0 commit comments

Comments
 (0)