Skip to content

Commit 62c628f

Browse files
author
Matthew Fisher
committed
fix(publisher): do not publish older app versions
As soon as a newer version of an application is published to etcd, we should not be publishing any older application versions. This prevents containers which are orphaned by fleet or systemd to be exposed by the router.
1 parent 896b5d3 commit 62c628f

5 files changed

Lines changed: 239 additions & 140 deletions

File tree

publisher/Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ start: check-deisctl
4545
stop: check-deisctl
4646
deisctl stop publisher
4747

48-
test:
49-
@echo no unit tests
48+
test: test-unit
49+
50+
test-unit:
51+
godep go test -v ./...
5052

5153
uninstall: check-deisctl
5254
deisctl uninstall publisher

publisher/main.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"os"
6+
"time"
7+
8+
"github.com/coreos/go-etcd/etcd"
9+
"github.com/fsouza/go-dockerclient"
10+
11+
"github.com/deis/deis/publisher/publisher"
12+
)
13+
14+
const (
15+
timeout time.Duration = 10 * time.Second
16+
etcdTTL time.Duration = timeout * 2
17+
)
18+
19+
func getopt(name, dfault string) string {
20+
value := os.Getenv(name)
21+
if value == "" {
22+
value = dfault
23+
}
24+
return value
25+
}
26+
27+
func main() {
28+
endpoint := getopt("DOCKER_HOST", "unix:///var/run/docker.sock")
29+
etcdHost := getopt("ETCD_HOST", "127.0.0.1")
30+
31+
client, err := docker.NewClient(endpoint)
32+
if err != nil {
33+
log.Fatal(err)
34+
}
35+
etcdClient := etcd.NewClient([]string{"http://" + etcdHost + ":4001"})
36+
37+
server := &publisher.Server{client, etcdClient}
38+
39+
go server.Listen(etcdTTL)
40+
41+
for {
42+
go server.Poll(etcdTTL)
43+
time.Sleep(timeout)
44+
}
45+
}

publisher/publisher.go

Lines changed: 0 additions & 138 deletions
This file was deleted.

publisher/publisher/publisher.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package publisher
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"log"
7+
"os"
8+
"regexp"
9+
"strconv"
10+
"time"
11+
12+
"github.com/coreos/go-etcd/etcd"
13+
"github.com/fsouza/go-dockerclient"
14+
)
15+
16+
const (
17+
appNameRegex string = `([a-z0-9-]+)_v([1-9][0-9]*).(cmd|web).([1-9][0-9])*`
18+
)
19+
20+
type Server struct {
21+
DockerClient *docker.Client
22+
EtcdClient *etcd.Client
23+
}
24+
25+
func (s *Server) Listen(ttl time.Duration) {
26+
listener := make(chan *docker.APIEvents)
27+
// TODO: figure out why we need to sleep for 10 milliseconds
28+
// https://github.com/fsouza/go-dockerclient/blob/0236a64c6c4bd563ec277ba00e370cc753e1677c/event_test.go#L43
29+
defer func() { time.Sleep(10 * time.Millisecond); s.DockerClient.RemoveEventListener(listener) }()
30+
err := s.DockerClient.AddEventListener(listener)
31+
if err != nil {
32+
log.Fatal(err)
33+
}
34+
for {
35+
select {
36+
case event := <-listener:
37+
if event.Status == "start" {
38+
container, err := s.GetContainer(event.ID)
39+
if err != nil {
40+
log.Println(err)
41+
continue
42+
}
43+
s.PublishContainer(container, ttl)
44+
}
45+
}
46+
}
47+
}
48+
49+
func (s *Server) Poll(ttl time.Duration) {
50+
containers, err := s.DockerClient.ListContainers(docker.ListContainersOptions{})
51+
if err != nil {
52+
log.Fatal(err)
53+
}
54+
for _, container := range containers {
55+
// send container to channel for processing
56+
s.PublishContainer(&container, ttl)
57+
}
58+
}
59+
60+
func (s *Server) GetContainer(id string) (*docker.APIContainers, error) {
61+
containers, err := s.DockerClient.ListContainers(docker.ListContainersOptions{})
62+
if err != nil {
63+
return nil, err
64+
}
65+
for _, container := range containers {
66+
// send container to channel for processing
67+
if container.ID == id {
68+
return &container, nil
69+
}
70+
}
71+
return nil, errors.New("could not find container")
72+
}
73+
74+
func (s *Server) PublishContainer(container *docker.APIContainers, ttl time.Duration) {
75+
r := regexp.MustCompile(appNameRegex)
76+
for _, name := range container.Names {
77+
// HACK: remove slash from container name
78+
// see https://github.com/docker/docker/issues/7519
79+
containerName := name[1:]
80+
match := r.FindStringSubmatch(containerName)
81+
if match == nil {
82+
continue
83+
}
84+
appName := match[1]
85+
keyPath := fmt.Sprintf("/deis/services/%s/%s", appName, containerName)
86+
for _, p := range container.Ports {
87+
host := os.Getenv("HOST")
88+
port := strconv.Itoa(int(p.PublicPort))
89+
if s.IsPublishableApp(containerName) {
90+
s.setEtcd(keyPath, host+":"+port, uint64(ttl.Seconds()))
91+
}
92+
// TODO: support multiple exposed ports
93+
break
94+
}
95+
}
96+
}
97+
98+
// isPublishableApp determines if the application should be published to etcd.
99+
func (s *Server) IsPublishableApp(name string) bool {
100+
r := regexp.MustCompile(appNameRegex)
101+
match := r.FindStringSubmatch(name)
102+
if match == nil {
103+
return false
104+
}
105+
appName := match[1]
106+
version, _ := strconv.Atoi(match[2])
107+
if version >= latestRunningVersion(s.EtcdClient, appName) {
108+
return true
109+
} else {
110+
return false
111+
}
112+
}
113+
114+
// latestRunningVersion retrieves the highest version of the application published
115+
// to etcd. If no app has been published, returns 0.
116+
func latestRunningVersion(client *etcd.Client, appName string) int {
117+
r := regexp.MustCompile(appNameRegex)
118+
if client == nil {
119+
// TODO: refactor for tests
120+
if appName == "test" {
121+
return 3
122+
}
123+
return 0
124+
}
125+
resp, err := client.Get(fmt.Sprintf("/deis/services/%s", appName), false, true)
126+
if err != nil {
127+
// no app has been published here (key not found) or there was an error
128+
return 0
129+
}
130+
var versions []int
131+
for _, node := range resp.Node.Nodes {
132+
match := r.FindStringSubmatch(node.Key)
133+
// account for keys that may not be an application container
134+
if match == nil {
135+
continue
136+
}
137+
version, _ := strconv.Atoi(match[2])
138+
versions = append(versions, version)
139+
}
140+
return max(versions)
141+
}
142+
143+
func max(n []int) int {
144+
val := 0
145+
for _, i := range n {
146+
if i > val {
147+
val = i
148+
}
149+
}
150+
return val
151+
}
152+
153+
func (s *Server) setEtcd(key, value string, ttl uint64) {
154+
_, err := s.EtcdClient.Set(key, value, ttl)
155+
if err != nil {
156+
log.Println(err)
157+
}
158+
log.Println("set", key, "->", value)
159+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package publisher
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func TestIsPublishableApp(t *testing.T) {
8+
s := &Server{nil, nil}
9+
appName := "go_v2.web.1"
10+
if !s.IsPublishableApp(appName) {
11+
t.Errorf("%s should be publishable", appName)
12+
}
13+
badAppName := "go_v2"
14+
if s.IsPublishableApp(badAppName) {
15+
t.Errorf("%s should not be publishable", badAppName)
16+
}
17+
// publisher assumes that an app name of "test" with a null etcd client
18+
// has v3 running
19+
oldVersion := "test_v2.web.1"
20+
if s.IsPublishableApp(oldVersion) {
21+
t.Errorf("%s should not be publishable", oldVersion)
22+
}
23+
currentVersion := "test_v3.web.1"
24+
if !s.IsPublishableApp(currentVersion) {
25+
t.Errorf("%s should be publishable", currentVersion)
26+
}
27+
futureVersion := "test_v4.web.1"
28+
if !s.IsPublishableApp(futureVersion) {
29+
t.Errorf("%s should be publishable", futureVersion)
30+
}
31+
}

0 commit comments

Comments
 (0)