Skip to content

Commit 124e427

Browse files
committed
fix(logger): avoid hitting etcd for each log line
1 parent 3d705a0 commit 124e427

3 files changed

Lines changed: 57 additions & 42 deletions

File tree

logger/drain/drain.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,8 @@ import (
66
"net"
77
"net/url"
88
"os"
9-
10-
"github.com/coreos/go-etcd/etcd"
119
)
1210

13-
func GetDrain() string {
14-
host := getopt("HOST", "127.0.0.1")
15-
16-
etcdPort := getopt("ETCD_PORT", "4001")
17-
etcdPath := getopt("ETCD_PATH", "/deis/logs")
18-
19-
client := etcd.NewClient([]string{"http://" + host + ":" + etcdPort})
20-
21-
s, err := client.Get(etcdPath+"/drain", true, false)
22-
if err != nil {
23-
return ""
24-
}
25-
26-
return s.Node.Value
27-
}
28-
2911
func SendToDrain(m string, drain string) error {
3012
u, err := url.Parse(drain)
3113
if err != nil {

logger/main.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
var (
1818
logAddr string
1919
logPort int
20+
drainURI string
2021
enablePublish bool
2122
publishHost string
2223
publishPath string
@@ -28,6 +29,7 @@ var (
2829
func init() {
2930
flag.StringVar(&logAddr, "log-addr", "0.0.0.0", "bind address for the logger")
3031
flag.IntVar(&logPort, "log-port", 514, "bind port for the logger")
32+
flag.StringVar(&drainURI, "drain-uri", "", "default drainURI, once set in etcd, this has no effect.")
3133
flag.StringVar(&syslogd.LogRoot, "log-root", "/data/logs", "log path to store logs")
3234
flag.BoolVar(&enablePublish, "enable-publish", false, "enable publishing to service discovery")
3335
flag.StringVar(&publishHost, "publish-host", getopt("HOST", "127.0.0.1"), "service discovery hostname")
@@ -42,26 +44,46 @@ func main() {
4244

4345
client := etcd.NewClient([]string{"http://" + publishHost + ":" + publishPort})
4446

45-
// Wait for terminating signal
46-
exitChan := make(chan os.Signal, 2)
47+
signalChan := make(chan os.Signal, 1)
48+
drainChan := make(chan *etcd.Response)
49+
stopChan := make(chan bool)
50+
exitChan := make(chan bool)
4751
cleanupChan := make(chan bool)
48-
signal.Notify(exitChan, syscall.SIGTERM, syscall.SIGINT)
52+
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
4953

50-
go syslogd.Listen(exitChan, cleanupChan, fmt.Sprintf("%s:%d", logAddr, logPort))
54+
// ensure the drain key exists in etcd.
55+
if _, err := client.Get(publishPath+"/drain", false, false); err != nil {
56+
setEtcd(client, publishPath+"/drain", drainURI, 0)
57+
}
5158

59+
go client.Watch(publishPath+"/drain", 0, false, drainChan, stopChan)
60+
go syslogd.Listen(exitChan, cleanupChan, drainChan, fmt.Sprintf("%s:%d", logAddr, logPort))
5261
if enablePublish {
53-
go publishService(client, publishHost, publishPath, strconv.Itoa(logPort), uint64(time.Duration(publishTTL).Seconds()))
62+
go publishService(exitChan, client, publishHost, publishPath, strconv.Itoa(logPort), uint64(time.Duration(publishTTL).Seconds()))
5463
}
5564

56-
// Wait for the proper shutdown of the syslog server before exit
57-
<-cleanupChan
65+
for {
66+
select {
67+
case <-signalChan:
68+
close(exitChan)
69+
stopChan <- true
70+
case <-cleanupChan:
71+
return
72+
}
73+
}
5874
}
5975

60-
func publishService(client *etcd.Client, host string, etcdPath string, port string, ttl uint64) {
76+
func publishService(exitChan chan bool, client *etcd.Client, host string, etcdPath string, port string, ttl uint64) {
77+
t := time.NewTicker(time.Duration(publishInterval))
78+
6179
for {
62-
setEtcd(client, etcdPath+"/host", host, ttl)
63-
setEtcd(client, etcdPath+"/port", port, ttl)
64-
time.Sleep(time.Duration(publishInterval))
80+
select {
81+
case <-t.C:
82+
setEtcd(client, etcdPath+"/host", host, ttl)
83+
setEtcd(client, etcdPath+"/port", port, ttl)
84+
case <-exitChan:
85+
return
86+
}
6587
}
6688
}
6789

logger/syslogd/syslogd.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@ import (
88
"path"
99
"regexp"
1010

11+
"github.com/coreos/go-etcd/etcd"
1112
"github.com/deis/deis/logger/syslog"
1213

1314
"github.com/deis/deis/logger/drain"
1415
)
1516

17+
// LogRoot is the log path to store logs.
1618
var LogRoot string
1719

1820
type handler struct {
1921
// To simplify implementation of our handler we embed helper
2022
// syslog.BaseHandler struct.
2123
*syslog.BaseHandler
24+
drainURI string
2225
}
2326

2427
// Simple fiter for named/bind messages which can be used with BaseHandler
@@ -27,8 +30,11 @@ func filter(m syslog.SyslogMessage) bool {
2730
}
2831

2932
func newHandler() *handler {
30-
h := handler{syslog.NewBaseHandler(5, filter, false)}
31-
go h.mainLoop() // BaseHandler needs some gorutine that reads from its queue
33+
h := handler{
34+
BaseHandler: syslog.NewBaseHandler(5, filter, false),
35+
}
36+
37+
go h.mainLoop() // BaseHandler needs some goroutine that reads from its queue
3238
return &h
3339
}
3440

@@ -84,9 +90,8 @@ func (h *handler) mainLoop() {
8490
if m == nil {
8591
break
8692
}
87-
d := drain.GetDrain()
88-
if d != "" {
89-
drain.SendToDrain(m.String(), d)
93+
if h.drainURI != "" {
94+
drain.SendToDrain(m.String(), h.drainURI)
9095
}
9196
err := writeToDisk(m)
9297
if err != nil {
@@ -97,7 +102,7 @@ func (h *handler) mainLoop() {
97102
}
98103

99104
// Listen starts a new syslog server which runs until it receives a signal.
100-
func Listen(signalChan chan os.Signal, cleanupDone chan bool, bindAddr string) {
105+
func Listen(exitChan, cleanupDone chan bool, drainChan chan *etcd.Response, bindAddr string) {
101106
fmt.Println("Starting syslog...")
102107
// If LogRoot doesn't exist, create it
103108
// equivalent to Python's `if not os.path.exists(filename)`
@@ -106,18 +111,24 @@ func Listen(signalChan chan os.Signal, cleanupDone chan bool, bindAddr string) {
106111
log.Fatalf("unable to create LogRoot at %s: %v", LogRoot, err)
107112
}
108113
}
109-
// Create a server with one handler and run one listen gorutine
114+
// Create a server with one handler and run one listen goroutine
110115
s := syslog.NewServer()
111-
s.AddHandler(newHandler())
116+
h := newHandler()
117+
s.AddHandler(h)
112118
s.Listen(bindAddr)
113119
fmt.Println("Syslog server started...")
114120
fmt.Println("deis-logger running")
115121

116122
// Wait for terminating signal
117-
for _ = range signalChan {
118-
// Shutdown the server
119-
fmt.Println("Shutting down...")
120-
s.Shutdown()
121-
cleanupDone <- true
123+
for {
124+
select {
125+
case <-exitChan:
126+
// Shutdown the server
127+
fmt.Println("Shutting down...")
128+
s.Shutdown()
129+
cleanupDone <- true
130+
case er := <-drainChan:
131+
h.drainURI = er.Node.Value
132+
}
122133
}
123134
}

0 commit comments

Comments
 (0)