-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconfigurer.go
More file actions
137 lines (124 loc) · 3.86 KB
/
configurer.go
File metadata and controls
137 lines (124 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package configurer
import (
"fmt"
"log"
"time"
"github.com/coreos/go-etcd/etcd"
"github.com/deis/deis/logger/drain"
"github.com/deis/deis/logger/storage"
"github.com/deis/deis/logger/syslogish"
)
// Exported so it can be set by an external agent-- namely main.go, which does some flag parsing.
var DefaultDrainURI string
// Configurer takes responsibility for dynamically reconfiguring a syslogish.Server based on
// changes in etcd.
type Configurer struct {
etcdClient *etcd.Client
etcdPath string
ticker *time.Ticker
syslogishServer *syslogish.Server
running bool
currentStorageAdapterType string
currentDrainURL string
}
// NewConfigurer returns a pointer to a new Configurer instance.
func NewConfigurer(etcdHost string, etcdPort int, etcdPath string, configInterval int,
syslogishServer *syslogish.Server) (*Configurer, error) {
etcdClient := etcd.NewClient([]string{fmt.Sprintf("http://%s:%d", etcdHost, etcdPort)})
ticker := time.NewTicker(time.Duration(configInterval) * time.Second)
configurer := &Configurer{
etcdClient: etcdClient,
etcdPath: etcdPath,
syslogishServer: syslogishServer,
ticker: ticker,
}
// Support legacy behavior that allows default drain uri to be specified using a drain-uri flag
if _, err := etcdClient.Get(etcdPath+"/drain", false, false); err != nil {
etcdErr, ok := err.(*etcd.EtcdError)
// Error code 100 is key not found
if ok && etcdErr.ErrorCode == 100 {
configurer.setEtcd("/drain", DefaultDrainURI)
} else {
log.Println(err)
}
}
return configurer, nil
}
// Start begins the configurer's main loop.
func (c *Configurer) Start() {
// Should only ever be called once
if !c.running {
c.running = true
go c.configure()
log.Println("configurer running")
}
}
func (c *Configurer) configure() {
for {
<-c.ticker.C
c.manageStorageAdapter()
c.manageDrain()
}
}
func (c *Configurer) manageStorageAdapter() {
newStorageAdapterType, err := c.getEtcd("/storageAdapterType", "file")
if err != nil {
log.Println("configurer: Error retrieving storage adapter type from etcd. Skipping.", err)
return
}
if newStorageAdapterType == c.currentStorageAdapterType {
return
}
newStorageAdapter, err := storage.NewAdapter(newStorageAdapterType)
if err != nil {
log.Println("configurer: Error creating new storage adapter. Skipping.", err)
return
}
c.syslogishServer.SetStorageAdapter(newStorageAdapter)
c.currentStorageAdapterType = newStorageAdapterType
log.Printf("configurer: Activated new storage adapter: %s", newStorageAdapterType)
}
func (c *Configurer) manageDrain() {
newDrainURL, err := c.getEtcd("/drain", "")
if err != nil {
log.Println("configurer: Error retrieving drain URL from etcd. Skipping.", err)
return
}
if newDrainURL == c.currentDrainURL {
return
}
newDrain, err := drain.NewDrain(newDrainURL)
if err != nil {
log.Println("configurer: Error creating new drain. Skipping.", err)
return
}
c.syslogishServer.SetDrain(newDrain)
c.currentDrainURL = newDrainURL
if newDrainURL == "" {
log.Println("configurer: Deactivated drain")
} else {
log.Printf("configurer: Activated new drain: %s", newDrainURL)
}
}
func (c *Configurer) getEtcd(key string, defaultValue string) (string, error) {
resp, err := c.etcdClient.Get(fmt.Sprintf("%s%s", c.etcdPath, key), false, false)
if err != nil {
etcdErr, ok := err.(*etcd.EtcdError)
// Error code 100 is key not found
if ok && etcdErr.ErrorCode == 100 {
return defaultValue, nil
}
return "", err
}
return resp.Node.Value, nil
}
func (c *Configurer) setEtcd(key string, value string) {
_, err := c.etcdClient.Set(fmt.Sprintf("%s%s", c.etcdPath, key), value, 0)
if err != nil {
etcdErr, ok := err.(*etcd.EtcdError)
// Error code 105 is key already exists
if !ok || etcdErr.ErrorCode != 105 {
log.Println(err)
}
}
}